drbd: Receiving of delay_probes
authorPhilipp Reisner <philipp.reisner@linbit.com>
Fri, 30 Apr 2010 13:26:20 +0000 (15:26 +0200)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Mon, 17 May 2010 23:22:11 +0000 (01:22 +0200)
Delay_probes are new packets in the DRBD protocol, which allow
DRBD to know the current delay packets have on the data socket.
(relative to the meta data socket)

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_receiver.c

index 2409de1..fd7615f 100644 (file)
@@ -209,8 +209,11 @@ enum drbd_packets {
        P_RS_IS_IN_SYNC       = 0x22, /* meta socket */
        P_SYNC_PARAM89        = 0x23, /* data socket, protocol version 89 replacement for P_SYNC_PARAM */
        P_COMPRESSED_BITMAP   = 0x24, /* compressed or otherwise encoded bitmap transfer */
+       /* P_CKPT_FENCE_REQ      = 0x25, * currently reserved for protocol D */
+       /* P_CKPT_DISABLE_REQ    = 0x26, * currently reserved for protocol D */
+       P_DELAY_PROBE         = 0x27, /* is used on BOTH sockets */
 
-       P_MAX_CMD             = 0x25,
+       P_MAX_CMD             = 0x28,
        P_MAY_IGNORE          = 0x100, /* Flag to test if (cmd > P_MAY_IGNORE) ... */
        P_MAX_OPT_CMD         = 0x101,
 
@@ -540,6 +543,18 @@ struct p_compressed_bm {
        u8 code[0];
 } __packed;
 
+struct p_delay_probe {
+       struct p_header head;
+       u32     seq_num; /* sequence number to match the two probe packets */
+       u32     offset;  /* usecs the probe got sent after the reference time point */
+} __packed;
+
+struct delay_probe {
+       struct list_head list;
+       int seq_num;
+       struct timeval time;
+};
+
 /* DCBP: Drbd Compressed Bitmap Packet ... */
 static inline enum drbd_bitmap_code
 DCBP_get_code(struct p_compressed_bm *p)
@@ -1028,6 +1043,9 @@ struct drbd_conf {
        u64 ed_uuid; /* UUID of the exposed data */
        struct mutex state_mutex;
        char congestion_reason;  /* Why we where congested... */
+       struct list_head delay_probes; /* protected by peer_seq_lock */
+       int data_delay;   /* Delay of packets on the data-sock behind meta-sock */
+       atomic_t delay_seq; /* To generate sequence numbers of delay probes */
 };
 
 static inline struct drbd_conf *minor_to_mdev(unsigned int minor)
index 7468d2c..3d5fe30 100644 (file)
@@ -2608,6 +2608,7 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        atomic_set(&mdev->net_cnt, 0);
        atomic_set(&mdev->packet_seq, 0);
        atomic_set(&mdev->pp_in_use, 0);
+       atomic_set(&mdev->delay_seq, 0);
 
        mutex_init(&mdev->md_io_mutex);
        mutex_init(&mdev->data.mutex);
@@ -2636,6 +2637,8 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        INIT_LIST_HEAD(&mdev->unplug_work.list);
        INIT_LIST_HEAD(&mdev->md_sync_work.list);
        INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
+       INIT_LIST_HEAD(&mdev->delay_probes);
+
        mdev->resync_work.cb  = w_resync_inactive;
        mdev->unplug_work.cb  = w_send_write_hint;
        mdev->md_sync_work.cb = w_md_sync;
index b27f4dd..fee0d24 100644 (file)
@@ -3501,6 +3501,92 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
+static void timeval_sub_us(struct timeval* tv, unsigned int us)
+{
+       tv->tv_sec -= us / 1000000;
+       us = us % 1000000;
+       if (tv->tv_usec > us) {
+               tv->tv_usec += 1000000;
+               tv->tv_sec--;
+       }
+       tv->tv_usec -= us;
+}
+
+static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
+{
+       struct delay_probe *dp;
+       struct list_head *le;
+       struct timeval now;
+       int seq_num;
+       int offset;
+       int data_delay;
+
+       seq_num = be32_to_cpu(p->seq_num);
+       offset  = be32_to_cpu(p->offset);
+
+       spin_lock(&mdev->peer_seq_lock);
+       if (!list_empty(&mdev->delay_probes)) {
+               if (from == USE_DATA_SOCKET)
+                       le = mdev->delay_probes.next;
+               else
+                       le = mdev->delay_probes.prev;
+
+               dp = list_entry(le, struct delay_probe, list);
+
+               if (dp->seq_num == seq_num) {
+                       list_del(le);
+                       spin_unlock(&mdev->peer_seq_lock);
+                       do_gettimeofday(&now);
+                       timeval_sub_us(&now, offset);
+                       data_delay =
+                               now.tv_usec - dp->time.tv_usec +
+                               (now.tv_sec - dp->time.tv_sec) * 1000000;
+
+                       if (data_delay > 0)
+                               mdev->data_delay = data_delay;
+
+                       kfree(dp);
+                       return;
+               }
+
+               if (dp->seq_num > seq_num) {
+                       spin_unlock(&mdev->peer_seq_lock);
+                       dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
+                       return; /* Do not alloca a struct delay_probe.... */
+               }
+       }
+       spin_unlock(&mdev->peer_seq_lock);
+
+       dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
+       if (!dp) {
+               dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
+               return;
+       }
+
+       dp->seq_num = seq_num;
+       do_gettimeofday(&dp->time);
+       timeval_sub_us(&dp->time, offset);
+
+       spin_lock(&mdev->peer_seq_lock);
+       if (from == USE_DATA_SOCKET)
+               list_add(&dp->list, &mdev->delay_probes);
+       else
+               list_add_tail(&dp->list, &mdev->delay_probes);
+       spin_unlock(&mdev->peer_seq_lock);
+}
+
+static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
+{
+       struct p_delay_probe *p = (struct p_delay_probe *)h;
+
+       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
+       if (drbd_recv(mdev, h->payload, h->length) != h->length)
+               return FALSE;
+
+       got_delay_probe(mdev, USE_DATA_SOCKET, p);
+       return TRUE;
+}
+
 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
 
 static drbd_cmd_handler_f drbd_default_handler[] = {
@@ -3524,6 +3610,7 @@ static drbd_cmd_handler_f drbd_default_handler[] = {
        [P_OV_REQUEST]      = receive_DataRequest,
        [P_OV_REPLY]        = receive_DataRequest,
        [P_CSUM_RS_REQUEST]    = receive_DataRequest,
+       [P_DELAY_PROBE]     = receive_delay_probe,
        /* anything missing from this table is in
         * the asender_tbl, see get_asender_cmd */
        [P_MAX_CMD]         = NULL,
@@ -4300,6 +4387,14 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
+static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
+{
+       struct p_delay_probe *p = (struct p_delay_probe *)h;
+
+       got_delay_probe(mdev, USE_META_SOCKET, p);
+       return TRUE;
+}
+
 struct asender_cmd {
        size_t pkt_size;
        int (*process)(struct drbd_conf *mdev, struct p_header *h);
@@ -4324,6 +4419,7 @@ static struct asender_cmd *get_asender_cmd(int cmd)
        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
+       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_delay_probe_m },
        [P_MAX_CMD]         = { 0, NULL },
        };
        if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)