n_gsm: Fix length handling
[pandora-kernel.git] / drivers / block / drbd / drbd_receiver.c
index 081522d..efd6169 100644 (file)
@@ -241,7 +241,7 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &reclaimed, w.list)
-               drbd_free_ee(mdev, e);
+               drbd_free_net_ee(mdev, e);
 }
 
 /**
@@ -298,9 +298,11 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool
  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
  * Either links the page chain back to the global pool,
  * or returns all pages to the system. */
-static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
+static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
 {
+       atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
        int i;
+
        if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
                i = page_chain_free(page);
        else {
@@ -311,10 +313,10 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
                drbd_pp_vacant += i;
                spin_unlock(&drbd_pp_lock);
        }
-       atomic_sub(i, &mdev->pp_in_use);
-       i = atomic_read(&mdev->pp_in_use);
+       i = atomic_sub_return(i, a);
        if (i < 0)
-               dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
+               dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
+                       is_net ? "pp_in_use_by_net" : "pp_in_use", i);
        wake_up(&drbd_pp_wait);
 }
 
@@ -365,7 +367,6 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
        e->size = data_size;
        e->flags = 0;
        e->sector = sector;
-       e->sector = sector;
        e->block_id = id;
 
        return e;
@@ -375,9 +376,11 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
        return NULL;
 }
 
-void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
+void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
 {
-       drbd_pp_free(mdev, e->pages);
+       if (e->flags & EE_HAS_DIGEST)
+               kfree(e->digest);
+       drbd_pp_free(mdev, e->pages, is_net);
        D_ASSERT(atomic_read(&e->pending_bios) == 0);
        D_ASSERT(hlist_unhashed(&e->colision));
        mempool_free(e, drbd_ee_mempool);
@@ -388,13 +391,14 @@ int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
        LIST_HEAD(work_list);
        struct drbd_epoch_entry *e, *t;
        int count = 0;
+       int is_net = list == &mdev->net_ee;
 
        spin_lock_irq(&mdev->req_lock);
        list_splice_init(list, &work_list);
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &work_list, w.list) {
-               drbd_free_ee(mdev, e);
+               drbd_free_some_ee(mdev, e, is_net);
                count++;
        }
        return count;
@@ -423,7 +427,7 @@ static int drbd_process_done_ee(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &reclaimed, w.list)
-               drbd_free_ee(mdev, e);
+               drbd_free_net_ee(mdev, e);
 
        /* possible callbacks here:
         * e_end_block, and e_end_resync_block, e_send_discard_ack.
@@ -719,14 +723,14 @@ out:
 static int drbd_send_fp(struct drbd_conf *mdev,
        struct socket *sock, enum drbd_packets cmd)
 {
-       struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
+       struct p_header80 *h = &mdev->data.sbuf.header.h80;
 
        return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
 }
 
 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
 {
-       struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
+       struct p_header80 *h = &mdev->data.rbuf.header.h80;
        int rr;
 
        rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
@@ -776,9 +780,6 @@ static int drbd_connect(struct drbd_conf *mdev)
 
        D_ASSERT(!mdev->data.socket);
 
-       if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
-               dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
-
        if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
                return -2;
 
@@ -927,6 +928,11 @@ retry:
 
        drbd_thread_start(&mdev->asender);
 
+       if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
+               drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
+               put_ldev(mdev);
+       }
+
        if (!drbd_send_protocol(mdev))
                return -1;
        drbd_send_sync_param(mdev, &mdev->sync_conf);
@@ -946,22 +952,28 @@ out_release_sockets:
        return -1;
 }
 
-static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
+static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
 {
+       union p_header *h = &mdev->data.rbuf.header;
        int r;
 
        r = drbd_recv(mdev, h, sizeof(*h));
-
        if (unlikely(r != sizeof(*h))) {
                dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
                return FALSE;
-       };
-       h->command = be16_to_cpu(h->command);
-       h->length  = be16_to_cpu(h->length);
-       if (unlikely(h->magic != BE_DRBD_MAGIC)) {
-               dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
-                   (long)be32_to_cpu(h->magic),
-                   h->command, h->length);
+       }
+
+       if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
+               *cmd = be16_to_cpu(h->h80.command);
+               *packet_size = be16_to_cpu(h->h80.length);
+       } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
+               *cmd = be16_to_cpu(h->h95.command);
+               *packet_size = be32_to_cpu(h->h95.length);
+       } else {
+               dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
+                   be32_to_cpu(h->h80.magic),
+                   be16_to_cpu(h->h80.command),
+                   be16_to_cpu(h->h80.length));
                return FALSE;
        }
        mdev->last_received = jiffies;
@@ -975,7 +987,7 @@ static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct d
 
        if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
                rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
-                                       NULL, BLKDEV_IFL_WAIT);
+                                       NULL);
                if (rv) {
                        dev_err(DEV, "local disk flush failed with status %d\n", rv);
                        /* would rather check on EOPNOTSUPP, but that is not reliable.
@@ -1268,17 +1280,12 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
        return 1;
 }
 
-static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
+static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        int rv, issue_flush;
-       struct p_barrier *p = (struct p_barrier *)h;
+       struct p_barrier *p = &mdev->data.rbuf.barrier;
        struct drbd_epoch *epoch;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-
-       rv = drbd_recv(mdev, h->payload, h->length);
-       ERR_IF(rv != h->length) return FALSE;
-
        inc_unacked(mdev);
 
        if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
@@ -1457,7 +1464,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
                data_size -= rr;
        }
        kunmap(page);
-       drbd_pp_free(mdev, page);
+       drbd_pp_free(mdev, page, 0);
        return rv;
 }
 
@@ -1562,30 +1569,29 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
        list_add(&e->w.list, &mdev->sync_ee);
        spin_unlock_irq(&mdev->req_lock);
 
+       atomic_add(data_size >> 9, &mdev->rs_sect_ev);
        if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
                return TRUE;
 
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       spin_unlock_irq(&mdev->req_lock);
+
        drbd_free_ee(mdev, e);
 fail:
        put_ldev(mdev);
        return FALSE;
 }
 
-static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
+static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        struct drbd_request *req;
        sector_t sector;
-       unsigned int header_size, data_size;
        int ok;
-       struct p_data *p = (struct p_data *)h;
-
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
+       struct p_data *p = &mdev->data.rbuf.data;
 
        sector = be64_to_cpu(p->sector);
 
@@ -1611,20 +1617,11 @@ static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
        return ok;
 }
 
-static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
+static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        sector_t sector;
-       unsigned int header_size, data_size;
        int ok;
-       struct p_data *p = (struct p_data *)h;
-
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
+       struct p_data *p = &mdev->data.rbuf.data;
 
        sector = be64_to_cpu(p->sector);
        D_ASSERT(p->block_id == ID_SYNCER);
@@ -1640,9 +1637,11 @@ static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
 
                ok = drbd_drain_block(mdev, data_size);
 
-               drbd_send_ack_dp(mdev, P_NEG_ACK, p);
+               drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
        }
 
+       atomic_add(data_size >> 9, &mdev->rs_sect_in);
+
        return ok;
 }
 
@@ -1765,24 +1764,27 @@ static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
        return ret;
 }
 
+static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
+{
+       if (mdev->agreed_pro_version >= 95)
+               return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
+                       (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) |
+                       (dpf & DP_FUA ? REQ_FUA : 0) |
+                       (dpf & DP_FLUSH ? REQ_FUA : 0) |
+                       (dpf & DP_DISCARD ? REQ_DISCARD : 0);
+       else
+               return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0;
+}
+
 /* mirrored write */
-static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
+static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        sector_t sector;
        struct drbd_epoch_entry *e;
-       struct p_data *p = (struct p_data *)h;
-       int header_size, data_size;
+       struct p_data *p = &mdev->data.rbuf.data;
        int rw = WRITE;
        u32 dp_flags;
 
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
-
        if (!get_ldev(mdev)) {
                if (__ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "Can not write mirrored data block "
@@ -1792,7 +1794,7 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
                        mdev->peer_seq++;
                spin_unlock(&mdev->peer_seq_lock);
 
-               drbd_send_ack_dp(mdev, P_NEG_ACK, p);
+               drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
                atomic_inc(&mdev->current_epoch->epoch_size);
                return drbd_drain_block(mdev, data_size);
        }
@@ -1839,12 +1841,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
        spin_unlock(&mdev->epoch_lock);
 
        dp_flags = be32_to_cpu(p->dp_flags);
-       if (dp_flags & DP_HARDBARRIER) {
-               dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
-               /* rw |= REQ_HARDBARRIER; */
-       }
-       if (dp_flags & DP_RW_SYNC)
-               rw |= REQ_SYNC | REQ_UNPLUG;
+       rw |= write_flags_to_bio(mdev, dp_flags);
+
        if (dp_flags & DP_MAY_SET_IN_SYNC)
                e->flags |= EE_MAY_SET_IN_SYNC;
 
@@ -2007,6 +2005,16 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
        if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
                return TRUE;
 
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       hlist_del_init(&e->colision);
+       spin_unlock_irq(&mdev->req_lock);
+       if (e->flags & EE_CALL_AL_COMPLETE_IO)
+               drbd_al_complete_io(mdev, e->sector);
+
 out_interrupted:
        /* yes, the epoch_size now is imbalanced.
         * but we drop the connection anyways, so we don't have a chance to
@@ -2016,20 +2024,64 @@ out_interrupted:
        return FALSE;
 }
 
-static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
+/* We may throttle resync, if the lower device seems to be busy,
+ * and current sync rate is above c_min_rate.
+ *
+ * To decide whether or not the lower device is busy, we use a scheme similar
+ * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
+ * (more than 64 sectors) of activity we cannot account for with our own resync
+ * activity, it obviously is "busy".
+ *
+ * The current sync rate used here uses only the most recent two step marks,
+ * to have a short time average so we can react faster.
+ */
+int drbd_rs_should_slow_down(struct drbd_conf *mdev)
+{
+       struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
+       unsigned long db, dt, dbdt;
+       int curr_events;
+       int throttle = 0;
+
+       /* feature disabled? */
+       if (mdev->sync_conf.c_min_rate == 0)
+               return 0;
+
+       curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
+                     (int)part_stat_read(&disk->part0, sectors[1]) -
+                       atomic_read(&mdev->rs_sect_ev);
+       if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
+               unsigned long rs_left;
+               int i;
+
+               mdev->rs_last_events = curr_events;
+
+               /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
+                * approx. */
+               i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS;
+               rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
+
+               dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
+               if (!dt)
+                       dt++;
+               db = mdev->rs_mark_left[i] - rs_left;
+               dbdt = Bit2KB(db/dt);
+
+               if (dbdt > mdev->sync_conf.c_min_rate)
+                       throttle = 1;
+       }
+       return throttle;
+}
+
+
+static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
 {
        sector_t sector;
        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
        struct drbd_epoch_entry *e;
        struct digest_info *di = NULL;
-       int size, digest_size;
+       int size, verb;
        unsigned int fault_type;
-       struct p_block_req *p =
-               (struct p_block_req *)h;
-       const int brps = sizeof(*p)-sizeof(*h);
-
-       if (drbd_recv(mdev, h->payload, brps) != brps)
-               return FALSE;
+       struct p_block_req *p = &mdev->data.rbuf.block_req;
 
        sector = be64_to_cpu(p->sector);
        size   = be32_to_cpu(p->blksize);
@@ -2046,12 +2098,31 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
        }
 
        if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
-               if (__ratelimit(&drbd_ratelimit_state))
+               verb = 1;
+               switch (cmd) {
+               case P_DATA_REQUEST:
+                       drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
+                       break;
+               case P_RS_DATA_REQUEST:
+               case P_CSUM_RS_REQUEST:
+               case P_OV_REQUEST:
+                       drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
+                       break;
+               case P_OV_REPLY:
+                       verb = 0;
+                       dec_rs_pending(mdev);
+                       drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
+                       break;
+               default:
+                       dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
+                               cmdname(cmd));
+               }
+               if (verb && __ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "Can not satisfy peer's read request, "
                            "no local data.\n");
-               drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
-                                P_NEG_RS_DREPLY , p);
-               return drbd_drain_block(mdev, h->length - brps);
+
+               /* drain possibly payload */
+               return drbd_drain_block(mdev, digest_size);
        }
 
        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
@@ -2063,31 +2134,21 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       switch (h->command) {
+       switch (cmd) {
        case P_DATA_REQUEST:
                e->w.cb = w_e_end_data_req;
                fault_type = DRBD_FAULT_DT_RD;
-               break;
+               /* application IO, don't drbd_rs_begin_io */
+               goto submit;
+
        case P_RS_DATA_REQUEST:
                e->w.cb = w_e_end_rsdata_req;
                fault_type = DRBD_FAULT_RS_RD;
-               /* Eventually this should become asynchronously. Currently it
-                * blocks the whole receiver just to delay the reading of a
-                * resync data block.
-                * the drbd_work_queue mechanism is made for this...
-                */
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted,
-                        * probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
-               }
                break;
 
        case P_OV_REPLY:
        case P_CSUM_RS_REQUEST:
                fault_type = DRBD_FAULT_RS_RD;
-               digest_size = h->length - brps ;
                di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
                if (!di)
                        goto out_free_e;
@@ -2095,31 +2156,25 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                di->digest_size = digest_size;
                di->digest = (((char *)di)+sizeof(struct digest_info));
 
+               e->digest = di;
+               e->flags |= EE_HAS_DIGEST;
+
                if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
                        goto out_free_e;
 
-               e->block_id = (u64)(unsigned long)di;
-               if (h->command == P_CSUM_RS_REQUEST) {
+               if (cmd == P_CSUM_RS_REQUEST) {
                        D_ASSERT(mdev->agreed_pro_version >= 89);
                        e->w.cb = w_e_end_csum_rs_req;
-               } else if (h->command == P_OV_REPLY) {
+               } else if (cmd == P_OV_REPLY) {
                        e->w.cb = w_e_end_ov_reply;
                        dec_rs_pending(mdev);
-                       break;
-               }
-
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted, probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
+                       /* drbd_rs_begin_io done when we sent this request,
+                        * but accounting still needs to be done. */
+                       goto submit_for_resync;
                }
                break;
 
        case P_OV_REQUEST:
-               if (mdev->state.conn >= C_CONNECTED &&
-                   mdev->state.conn != C_VERIFY_T)
-                       dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
-                               drbd_conn_str(mdev->state.conn));
                if (mdev->ov_start_sector == ~(sector_t)0 &&
                    mdev->agreed_pro_version >= 90) {
                        mdev->ov_start_sector = sector;
@@ -2130,37 +2185,63 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                }
                e->w.cb = w_e_end_ov_req;
                fault_type = DRBD_FAULT_RS_RD;
-               /* Eventually this should become asynchronous. Currently it
-                * blocks the whole receiver just to delay the reading of a
-                * resync data block.
-                * the drbd_work_queue mechanism is made for this...
-                */
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted,
-                        * probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
-               }
                break;
 
-
        default:
                dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
-                   cmdname(h->command));
+                   cmdname(cmd));
                fault_type = DRBD_FAULT_MAX;
+               goto out_free_e;
        }
 
-       spin_lock_irq(&mdev->req_lock);
-       list_add(&e->w.list, &mdev->read_ee);
-       spin_unlock_irq(&mdev->req_lock);
+       /* Throttle, drbd_rs_begin_io and submit should become asynchronous
+        * wrt the receiver, but it is not as straightforward as it may seem.
+        * Various places in the resync start and stop logic assume resync
+        * requests are processed in order, requeuing this on the worker thread
+        * introduces a bunch of new code for synchronization between threads.
+        *
+        * Unlimited throttling before drbd_rs_begin_io may stall the resync
+        * "forever", throttling after drbd_rs_begin_io will lock that extent
+        * for application writes for the same time.  For now, just throttle
+        * here, where the rest of the code expects the receiver to sleep for
+        * a while, anyways.
+        */
+
+       /* Throttle before drbd_rs_begin_io, as that locks out application IO;
+        * this defers syncer requests for some time, before letting at least
+        * on request through.  The resync controller on the receiving side
+        * will adapt to the incoming rate accordingly.
+        *
+        * We cannot throttle here if remote is Primary/SyncTarget:
+        * we would also throttle its application reads.
+        * In that case, throttling is done on the SyncTarget only.
+        */
+       if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev))
+               msleep(100);
+       if (drbd_rs_begin_io(mdev, e->sector))
+               goto out_free_e;
 
+submit_for_resync:
+       atomic_add(size >> 9, &mdev->rs_sect_ev);
+
+submit:
        inc_unacked(mdev);
+       spin_lock_irq(&mdev->req_lock);
+       list_add_tail(&e->w.list, &mdev->read_ee);
+       spin_unlock_irq(&mdev->req_lock);
 
        if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
                return TRUE;
 
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       spin_unlock_irq(&mdev->req_lock);
+       /* no drbd_rs_complete_io(), we are dropping the connection anyways */
+
 out_free_e:
-       kfree(di);
        put_ldev(mdev);
        drbd_free_ee(mdev, e);
        return FALSE;
@@ -2699,20 +2780,13 @@ static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
        return 1;
 }
 
-static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
+static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_protocol *p = (struct p_protocol *)h;
-       int header_size, data_size;
+       struct p_protocol *p = &mdev->data.rbuf.protocol;
        int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
        int p_want_lose, p_two_primaries, cf;
        char p_integrity_alg[SHARED_SECRET_MAX] = "";
 
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
-
        p_proto         = be32_to_cpu(p->protocol);
        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
@@ -2805,39 +2879,46 @@ struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
        return tfm;
 }
 
-static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
+static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
 {
        int ok = TRUE;
-       struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
+       struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
        unsigned int header_size, data_size, exp_max_sz;
        struct crypto_hash *verify_tfm = NULL;
        struct crypto_hash *csums_tfm = NULL;
        const int apv = mdev->agreed_pro_version;
+       int *rs_plan_s = NULL;
+       int fifo_size = 0;
 
        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
                    : apv == 88 ? sizeof(struct p_rs_param)
                                        + SHARED_SECRET_MAX
-                   : /* 89 */    sizeof(struct p_rs_param_89);
+                   : apv <= 94 ? sizeof(struct p_rs_param_89)
+                   : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 
-       if (h->length > exp_max_sz) {
+       if (packet_size > exp_max_sz) {
                dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
-                   h->length, exp_max_sz);
+                   packet_size, exp_max_sz);
                return FALSE;
        }
 
        if (apv <= 88) {
-               header_size = sizeof(struct p_rs_param) - sizeof(*h);
-               data_size   = h->length  - header_size;
-       } else /* apv >= 89 */ {
-               header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
-               data_size   = h->length  - header_size;
+               header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
+       } else if (apv <= 94) {
+               header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
+               D_ASSERT(data_size == 0);
+       } else {
+               header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
                D_ASSERT(data_size == 0);
        }
 
        /* initialize verify_alg and csums_alg */
        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
+       if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
                return FALSE;
 
        mdev->sync_conf.rate      = be32_to_cpu(p->rate);
@@ -2896,6 +2977,22 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
                        }
                }
 
+               if (apv > 94) {
+                       mdev->sync_conf.rate      = be32_to_cpu(p->rate);
+                       mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
+                       mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
+                       mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
+                       mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
+
+                       fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
+                       if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
+                               rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
+                               if (!rs_plan_s) {
+                                       dev_err(DEV, "kmalloc of fifo_buffer failed");
+                                       goto disconnect;
+                               }
+                       }
+               }
 
                spin_lock(&mdev->peer_seq_lock);
                /* lock against drbd_nl_syncer_conf() */
@@ -2913,6 +3010,12 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
                        mdev->csums_tfm = csums_tfm;
                        dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
                }
+               if (fifo_size != mdev->rs_plan_s.size) {
+                       kfree(mdev->rs_plan_s.values);
+                       mdev->rs_plan_s.values = rs_plan_s;
+                       mdev->rs_plan_s.size   = fifo_size;
+                       mdev->rs_planed = 0;
+               }
                spin_unlock(&mdev->peer_seq_lock);
        }
 
@@ -2946,19 +3049,15 @@ static void warn_if_differ_considerably(struct drbd_conf *mdev,
                     (unsigned long long)a, (unsigned long long)b);
 }
 
-static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
+static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_sizes *p = (struct p_sizes *)h;
+       struct p_sizes *p = &mdev->data.rbuf.sizes;
        enum determine_dev_size dd = unchanged;
        unsigned int max_seg_s;
        sector_t p_size, p_usize, my_usize;
        int ldsc = 0; /* local disk size changed */
        enum dds_flags ddsf;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        p_size = be64_to_cpu(p->d_size);
        p_usize = be64_to_cpu(p->u_size);
 
@@ -2972,7 +3071,6 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
         * we still need to figure out whether we accept that. */
        mdev->p_size = p_size;
 
-#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
        if (get_ldev(mdev)) {
                warn_if_differ_considerably(mdev, "lower level device sizes",
                           p_size, drbd_get_max_capacity(mdev->ldev));
@@ -3029,6 +3127,8 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
 
                if (mdev->agreed_pro_version < 94)
                        max_seg_s = be32_to_cpu(p->max_segment_size);
+               else if (mdev->agreed_pro_version == 94)
+                       max_seg_s = DRBD_MAX_SIZE_H80_PACKET;
                else /* drbd 8.3.8 onwards */
                        max_seg_s = DRBD_MAX_SEGMENT_SIZE;
 
@@ -3062,16 +3162,12 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
+static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_uuids *p = (struct p_uuids *)h;
+       struct p_uuids *p = &mdev->data.rbuf.uuids;
        u64 *p_uuid;
        int i;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
 
        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
@@ -3107,6 +3203,11 @@ static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
                        drbd_md_sync(mdev);
                }
                put_ldev(mdev);
+       } else if (mdev->state.disk < D_INCONSISTENT &&
+                  mdev->state.role == R_PRIMARY) {
+               /* I am a diskless primary, the peer just created a new current UUID
+                  for me. */
+               drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
        }
 
        /* Before we test for the disk state, we should wait until an eventually
@@ -3150,16 +3251,12 @@ static union drbd_state convert_state(union drbd_state ps)
        return ms;
 }
 
-static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
+static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_req_state *p = (struct p_req_state *)h;
+       struct p_req_state *p = &mdev->data.rbuf.req_state;
        union drbd_state mask, val;
        int rv;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        mask.i = be32_to_cpu(p->mask);
        val.i = be32_to_cpu(p->val);
 
@@ -3180,20 +3277,14 @@ static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_state(struct drbd_conf *mdev, struct p_header *h)
+static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_state *p = (struct p_state *)h;
-       enum drbd_conns nconn, oconn;
-       union drbd_state ns, peer_state;
+       struct p_state *p = &mdev->data.rbuf.state;
+       union drbd_state os, ns, peer_state;
        enum drbd_disk_state real_peer_disk;
+       enum chg_state_flags cs_flags;
        int rv;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
-               return FALSE;
-
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        peer_state.i = be32_to_cpu(p->state);
 
        real_peer_disk = peer_state.disk;
@@ -3204,38 +3295,72 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
 
        spin_lock_irq(&mdev->req_lock);
  retry:
-       oconn = nconn = mdev->state.conn;
+       os = ns = mdev->state;
        spin_unlock_irq(&mdev->req_lock);
 
-       if (nconn == C_WF_REPORT_PARAMS)
-               nconn = C_CONNECTED;
+       /* peer says his disk is uptodate, while we think it is inconsistent,
+        * and this happens while we think we have a sync going on. */
+       if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
+           os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
+               /* If we are (becoming) SyncSource, but peer is still in sync
+                * preparation, ignore its uptodate-ness to avoid flapping, it
+                * will change to inconsistent once the peer reaches active
+                * syncing states.
+                * It may have changed syncer-paused flags, however, so we
+                * cannot ignore this completely. */
+               if (peer_state.conn > C_CONNECTED &&
+                   peer_state.conn < C_SYNC_SOURCE)
+                       real_peer_disk = D_INCONSISTENT;
+
+               /* if peer_state changes to connected at the same time,
+                * it explicitly notifies us that it finished resync.
+                * Maybe we should finish it up, too? */
+               else if (os.conn >= C_SYNC_SOURCE &&
+                        peer_state.conn == C_CONNECTED) {
+                       if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
+                               drbd_resync_finished(mdev);
+                       return TRUE;
+               }
+       }
+
+       /* peer says his disk is inconsistent, while we think it is uptodate,
+        * and this happens while the peer still thinks we have a sync going on,
+        * but we think we are already done with the sync.
+        * We ignore this to avoid flapping pdsk.
+        * This should not happen, if the peer is a recent version of drbd. */
+       if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
+           os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
+               real_peer_disk = D_UP_TO_DATE;
+
+       if (ns.conn == C_WF_REPORT_PARAMS)
+               ns.conn = C_CONNECTED;
 
        if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
            get_ldev_if_state(mdev, D_NEGOTIATING)) {
                int cr; /* consider resync */
 
                /* if we established a new connection */
-               cr  = (oconn < C_CONNECTED);
+               cr  = (os.conn < C_CONNECTED);
                /* if we had an established connection
                 * and one of the nodes newly attaches a disk */
-               cr |= (oconn == C_CONNECTED &&
+               cr |= (os.conn == C_CONNECTED &&
                       (peer_state.disk == D_NEGOTIATING ||
-                       mdev->state.disk == D_NEGOTIATING));
+                       os.disk == D_NEGOTIATING));
                /* if we have both been inconsistent, and the peer has been
                 * forced to be UpToDate with --overwrite-data */
                cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
                /* if we had been plain connected, and the admin requested to
                 * start a sync by "invalidate" or "invalidate-remote" */
-               cr |= (oconn == C_CONNECTED &&
+               cr |= (os.conn == C_CONNECTED &&
                                (peer_state.conn >= C_STARTING_SYNC_S &&
                                 peer_state.conn <= C_WF_BITMAP_T));
 
                if (cr)
-                       nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
+                       ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
 
                put_ldev(mdev);
-               if (nconn == C_MASK) {
-                       nconn = C_CONNECTED;
+               if (ns.conn == C_MASK) {
+                       ns.conn = C_CONNECTED;
                        if (mdev->state.disk == D_NEGOTIATING) {
                                drbd_force_state(mdev, NS(disk, D_DISKLESS));
                        } else if (peer_state.disk == D_NEGOTIATING) {
@@ -3245,7 +3370,7 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
                        } else {
                                if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
                                        return FALSE;
-                               D_ASSERT(oconn == C_WF_REPORT_PARAMS);
+                               D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
                                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
                                return FALSE;
                        }
@@ -3253,18 +3378,28 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
        }
 
        spin_lock_irq(&mdev->req_lock);
-       if (mdev->state.conn != oconn)
+       if (mdev->state.i != os.i)
                goto retry;
        clear_bit(CONSIDER_RESYNC, &mdev->flags);
-       ns.i = mdev->state.i;
-       ns.conn = nconn;
        ns.peer = peer_state.role;
        ns.pdsk = real_peer_disk;
        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
-       if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
+       if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
                ns.disk = mdev->new_state_tmp.disk;
-
-       rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
+       cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
+       if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
+           test_bit(NEW_CUR_UUID, &mdev->flags)) {
+               /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
+                  for temporal network outages! */
+               spin_unlock_irq(&mdev->req_lock);
+               dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
+               tl_clear(mdev);
+               drbd_uuid_new_current(mdev);
+               clear_bit(NEW_CUR_UUID, &mdev->flags);
+               drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
+               return FALSE;
+       }
+       rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
        ns = mdev->state;
        spin_unlock_irq(&mdev->req_lock);
 
@@ -3273,8 +3408,8 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       if (oconn > C_WF_REPORT_PARAMS) {
-               if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
+       if (os.conn > C_WF_REPORT_PARAMS) {
+               if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
                    peer_state.disk != D_NEGOTIATING ) {
                        /* we want resync, peer has not yet decided to sync... */
                        /* Nowadays only used when forcing a node into primary role and
@@ -3291,9 +3426,9 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
+static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_rs_uuid *p = (struct p_rs_uuid *)h;
+       struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
 
        wait_event(mdev->misc_wait,
                   mdev->state.conn == C_WF_SYNC_UUID ||
@@ -3302,10 +3437,6 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
 
        /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        /* Here the _drbd_uuid_ functions are right, current should
           _not_ be rotated into the history */
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
@@ -3324,14 +3455,14 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
 enum receive_bitmap_ret { OK, DONE, FAILED };
 
 static enum receive_bitmap_ret
-receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
-       unsigned long *buffer, struct bm_xfer_ctx *c)
+receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
+                    unsigned long *buffer, struct bm_xfer_ctx *c)
 {
        unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
        unsigned want = num_words * sizeof(long);
 
-       if (want != h->length) {
-               dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
+       if (want != data_size) {
+               dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
                return FAILED;
        }
        if (want == 0)
@@ -3360,7 +3491,7 @@ recv_bm_rle_bits(struct drbd_conf *mdev,
        u64 tmp;
        unsigned long s = c->bit_offset;
        unsigned long e;
-       int len = p->head.length - (sizeof(*p) - sizeof(p->head));
+       int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
        int toggle = DCBP_get_start(p);
        int have;
        int bits;
@@ -3429,7 +3560,7 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev,
                const char *direction, struct bm_xfer_ctx *c)
 {
        /* what would it take to transfer it "plaintext" */
-       unsigned plain = sizeof(struct p_header) *
+       unsigned plain = sizeof(struct p_header80) *
                ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
                + c->bm_words * sizeof(long);
        unsigned total = c->bytes[0] + c->bytes[1];
@@ -3467,12 +3598,13 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev,
    in order to be agnostic to the 32 vs 64 bits issue.
 
    returns 0 on failure, 1 if we successfully received it. */
-static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
+static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        struct bm_xfer_ctx c;
        void *buffer;
        enum receive_bitmap_ret ret;
        int ok = FALSE;
+       struct p_header80 *h = &mdev->data.rbuf.header.h80;
 
        wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
 
@@ -3492,39 +3624,39 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
        };
 
        do {
-               if (h->command == P_BITMAP) {
-                       ret = receive_bitmap_plain(mdev, h, buffer, &c);
-               } else if (h->command == P_COMPRESSED_BITMAP) {
+               if (cmd == P_BITMAP) {
+                       ret = receive_bitmap_plain(mdev, data_size, buffer, &c);
+               } else if (cmd == P_COMPRESSED_BITMAP) {
                        /* MAYBE: sanity check that we speak proto >= 90,
                         * and the feature is enabled! */
                        struct p_compressed_bm *p;
 
-                       if (h->length > BM_PACKET_PAYLOAD_BYTES) {
+                       if (data_size > BM_PACKET_PAYLOAD_BYTES) {
                                dev_err(DEV, "ReportCBitmap packet too large\n");
                                goto out;
                        }
                        /* use the page buff */
                        p = buffer;
                        memcpy(p, h, sizeof(*h));
-                       if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
+                       if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
                                goto out;
-                       if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
-                               dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
+                       if (data_size <= (sizeof(*p) - sizeof(p->head))) {
+                               dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
                                return FAILED;
                        }
                        ret = decode_bitmap_c(mdev, p, &c);
                } else {
-                       dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
+                       dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
                        goto out;
                }
 
-               c.packets[h->command == P_BITMAP]++;
-               c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
+               c.packets[cmd == P_BITMAP]++;
+               c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
 
                if (ret != OK)
                        break;
 
-               if (!drbd_recv_header(mdev, h))
+               if (!drbd_recv_header(mdev, &cmd, &data_size))
                        goto out;
        } while (ret == OK);
        if (ret == FAILED)
@@ -3555,17 +3687,16 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
        return ok;
 }
 
-static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
+static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        /* TODO zero copy sink :) */
        static char sink[128];
        int size, want, r;
 
-       if (!silent)
-               dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
-                    h->command, h->length);
+       dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
+                cmd, data_size);
 
-       size = h->length;
+       size = data_size;
        while (size > 0) {
                want = min_t(int, size, sizeof(sink));
                r = drbd_recv(mdev, sink, want);
@@ -3575,17 +3706,7 @@ static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
        return size == 0;
 }
 
-static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
-{
-       return receive_skip_(mdev, h, 0);
-}
-
-static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
-{
-       return receive_skip_(mdev, h, 1);
-}
-
-static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
+static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        if (mdev->state.disk >= D_INCONSISTENT)
                drbd_kick_lo(mdev);
@@ -3597,108 +3718,94 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
-
-static drbd_cmd_handler_f drbd_default_handler[] = {
-       [P_DATA]            = receive_Data,
-       [P_DATA_REPLY]      = receive_DataReply,
-       [P_RS_DATA_REPLY]   = receive_RSDataReply,
-       [P_BARRIER]         = receive_Barrier,
-       [P_BITMAP]          = receive_bitmap,
-       [P_COMPRESSED_BITMAP]    = receive_bitmap,
-       [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
-       [P_DATA_REQUEST]    = receive_DataRequest,
-       [P_RS_DATA_REQUEST] = receive_DataRequest,
-       [P_SYNC_PARAM]      = receive_SyncParam,
-       [P_SYNC_PARAM89]           = receive_SyncParam,
-       [P_PROTOCOL]        = receive_protocol,
-       [P_UUIDS]           = receive_uuids,
-       [P_SIZES]           = receive_sizes,
-       [P_STATE]           = receive_state,
-       [P_STATE_CHG_REQ]   = receive_req_state,
-       [P_SYNC_UUID]       = receive_sync_uuid,
-       [P_OV_REQUEST]      = receive_DataRequest,
-       [P_OV_REPLY]        = receive_DataRequest,
-       [P_CSUM_RS_REQUEST]    = receive_DataRequest,
-       [P_DELAY_PROBE]     = receive_skip_silent,
+typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
+
+struct data_cmd {
+       int expect_payload;
+       size_t pkt_size;
+       drbd_cmd_handler_f function;
+};
+
+static struct data_cmd drbd_cmd_handler[] = {
+       [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
+       [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
+       [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
+       [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
+       [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
+       [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
+       [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
+       [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
+       [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
+       [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
+       [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
+       [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
+       [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
+       [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
+       [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
+       [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
+       [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
+       [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
        /* anything missing from this table is in
         * the asender_tbl, see get_asender_cmd */
-       [P_MAX_CMD]         = NULL,
+       [P_MAX_CMD]         = { 0, 0, NULL },
 };
 
-static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
-static drbd_cmd_handler_f *drbd_opt_cmd_handler;
+/* All handler functions that expect a sub-header get that sub-heder in
+   mdev->data.rbuf.header.head.payload.
+
+   Usually in mdev->data.rbuf.header.head the callback can find the usual
+   p_header, but they may not rely on that. Since there is also p_header95 !
+ */
 
 static void drbdd(struct drbd_conf *mdev)
 {
-       drbd_cmd_handler_f handler;
-       struct p_header *header = &mdev->data.rbuf.header;
+       union p_header *header = &mdev->data.rbuf.header;
+       unsigned int packet_size;
+       enum drbd_packets cmd;
+       size_t shs; /* sub header size */
+       int rv;
 
        while (get_t_state(&mdev->receiver) == Running) {
                drbd_thread_current_set_cpu(mdev);
-               if (!drbd_recv_header(mdev, header)) {
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
-               }
+               if (!drbd_recv_header(mdev, &cmd, &packet_size))
+                       goto err_out;
 
-               if (header->command < P_MAX_CMD)
-                       handler = drbd_cmd_handler[header->command];
-               else if (P_MAY_IGNORE < header->command
-                    && header->command < P_MAX_OPT_CMD)
-                       handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
-               else if (header->command > P_MAX_OPT_CMD)
-                       handler = receive_skip;
-               else
-                       handler = NULL;
+               if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
+                       dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
+                       goto err_out;
+               }
 
-               if (unlikely(!handler)) {
-                       dev_err(DEV, "unknown packet type %d, l: %d!\n",
-                           header->command, header->length);
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
+               shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
+               rv = drbd_recv(mdev, &header->h80.payload, shs);
+               if (unlikely(rv != shs)) {
+                       dev_err(DEV, "short read while reading sub header: rv=%d\n", rv);
+                       goto err_out;
                }
-               if (unlikely(!handler(mdev, header))) {
-                       dev_err(DEV, "error receiving %s, l: %d!\n",
-                           cmdname(header->command), header->length);
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
+
+               if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
+                       dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
+                       goto err_out;
                }
-       }
-}
 
-static void drbd_fail_pending_reads(struct drbd_conf *mdev)
-{
-       struct hlist_head *slot;
-       struct hlist_node *pos;
-       struct hlist_node *tmp;
-       struct drbd_request *req;
-       int i;
+               rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
 
-       /*
-        * Application READ requests
-        */
-       spin_lock_irq(&mdev->req_lock);
-       for (i = 0; i < APP_R_HSIZE; i++) {
-               slot = mdev->app_reads_hash+i;
-               hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
-                       /* it may (but should not any longer!)
-                        * be on the work queue; if that assert triggers,
-                        * we need to also grab the
-                        * spin_lock_irq(&mdev->data.work.q_lock);
-                        * and list_del_init here. */
-                       D_ASSERT(list_empty(&req->w.list));
-                       /* It would be nice to complete outside of spinlock.
-                        * But this is easier for now. */
-                       _req_mod(req, connection_lost_while_pending);
+               if (unlikely(!rv)) {
+                       dev_err(DEV, "error receiving %s, l: %d!\n",
+                           cmdname(cmd), packet_size);
+                       goto err_out;
                }
        }
-       for (i = 0; i < APP_R_HSIZE; i++)
-               if (!hlist_empty(mdev->app_reads_hash+i))
-                       dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
-                               "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
 
-       memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
-       spin_unlock_irq(&mdev->req_lock);
+       if (0) {
+       err_out:
+               drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
+       }
+       /* If we leave here, we probably want to update at least the
+        * "Connected" indicator on stable storage. Do so explicitly here. */
+       drbd_md_sync(mdev);
 }
 
 void drbd_flush_workqueue(struct drbd_conf *mdev)
@@ -3711,6 +3818,36 @@ void drbd_flush_workqueue(struct drbd_conf *mdev)
        wait_for_completion(&barr.done);
 }
 
+void drbd_free_tl_hash(struct drbd_conf *mdev)
+{
+       struct hlist_head *h;
+
+       spin_lock_irq(&mdev->req_lock);
+
+       if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
+               spin_unlock_irq(&mdev->req_lock);
+               return;
+       }
+       /* paranoia code */
+       for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
+               if (h->first)
+                       dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
+                               (int)(h - mdev->ee_hash), h->first);
+       kfree(mdev->ee_hash);
+       mdev->ee_hash = NULL;
+       mdev->ee_hash_s = 0;
+
+       /* paranoia code */
+       for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
+               if (h->first)
+                       dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
+                               (int)(h - mdev->tl_hash), h->first);
+       kfree(mdev->tl_hash);
+       mdev->tl_hash = NULL;
+       mdev->tl_hash_s = 0;
+       spin_unlock_irq(&mdev->req_lock);
+}
+
 static void drbd_disconnect(struct drbd_conf *mdev)
 {
        enum drbd_fencing_p fp;
@@ -3728,6 +3865,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        drbd_thread_stop(&mdev->asender);
        drbd_free_sock(mdev);
 
+       /* wait for current activity to cease. */
        spin_lock_irq(&mdev->req_lock);
        _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
        _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
@@ -3752,7 +3890,6 @@ static void drbd_disconnect(struct drbd_conf *mdev)
 
        /* make sure syncer is stopped and w_resume_next_sg queued */
        del_timer_sync(&mdev->resync_timer);
-       set_bit(STOP_SYNC_TIMER, &mdev->flags);
        resync_timer_fn((unsigned long)mdev);
 
        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
@@ -3767,11 +3904,9 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        kfree(mdev->p_uuid);
        mdev->p_uuid = NULL;
 
-       if (!mdev->state.susp)
+       if (!is_susp(mdev->state))
                tl_clear(mdev);
 
-       drbd_fail_pending_reads(mdev);
-
        dev_info(DEV, "Connection closed\n");
 
        drbd_md_sync(mdev);
@@ -3782,12 +3917,8 @@ static void drbd_disconnect(struct drbd_conf *mdev)
                put_ldev(mdev);
        }
 
-       if (mdev->state.role == R_PRIMARY) {
-               if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
-                       enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
-                       drbd_request_state(mdev, NS(pdsk, nps));
-               }
-       }
+       if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
+               drbd_try_outdate_peer_async(mdev);
 
        spin_lock_irq(&mdev->req_lock);
        os = mdev->state;
@@ -3800,32 +3931,14 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        if (os.conn == C_DISCONNECTING) {
-               struct hlist_head *h;
-               wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
+               wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
 
-               /* we must not free the tl_hash
-                * while application io is still on the fly */
-               wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
-
-               spin_lock_irq(&mdev->req_lock);
-               /* paranoia code */
-               for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
-                       if (h->first)
-                               dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
-                                               (int)(h - mdev->ee_hash), h->first);
-               kfree(mdev->ee_hash);
-               mdev->ee_hash = NULL;
-               mdev->ee_hash_s = 0;
-
-               /* paranoia code */
-               for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
-                       if (h->first)
-                               dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
-                                               (int)(h - mdev->tl_hash), h->first);
-               kfree(mdev->tl_hash);
-               mdev->tl_hash = NULL;
-               mdev->tl_hash_s = 0;
-               spin_unlock_irq(&mdev->req_lock);
+               if (!is_susp(mdev->state)) {
+                       /* we must not free the tl_hash
+                        * while application io is still on the fly */
+                       wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
+                       drbd_free_tl_hash(mdev);
+               }
 
                crypto_free_hash(mdev->cram_hmac_tfm);
                mdev->cram_hmac_tfm = NULL;
@@ -3845,6 +3958,9 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        i = drbd_release_ee(mdev, &mdev->net_ee);
        if (i)
                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
+       i = atomic_read(&mdev->pp_in_use_by_net);
+       if (i)
+               dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
        i = atomic_read(&mdev->pp_in_use);
        if (i)
                dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
@@ -3888,7 +4004,7 @@ static int drbd_send_handshake(struct drbd_conf *mdev)
        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
        ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
-                            (struct p_header *)p, sizeof(*p), 0 );
+                            (struct p_header80 *)p, sizeof(*p), 0 );
        mutex_unlock(&mdev->data.mutex);
        return ok;
 }
@@ -3904,27 +4020,28 @@ static int drbd_do_handshake(struct drbd_conf *mdev)
 {
        /* ASSERT current == mdev->receiver ... */
        struct p_handshake *p = &mdev->data.rbuf.handshake;
-       const int expect = sizeof(struct p_handshake)
-                         -sizeof(struct p_header);
+       const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
+       unsigned int length;
+       enum drbd_packets cmd;
        int rv;
 
        rv = drbd_send_handshake(mdev);
        if (!rv)
                return 0;
 
-       rv = drbd_recv_header(mdev, &p->head);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                return 0;
 
-       if (p->head.command != P_HAND_SHAKE) {
+       if (cmd != P_HAND_SHAKE) {
                dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
-                    cmdname(p->head.command), p->head.command);
+                    cmdname(cmd), cmd);
                return -1;
        }
 
-       if (p->head.length != expect) {
+       if (length != expect) {
                dev_err(DEV, "expected HandShake length: %u, received: %u\n",
-                    expect, p->head.length);
+                    expect, length);
                return -1;
        }
 
@@ -3982,10 +4099,11 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        char *response = NULL;
        char *right_response = NULL;
        char *peers_ch = NULL;
-       struct p_header p;
        unsigned int key_len = strlen(mdev->net_conf->shared_secret);
        unsigned int resp_size;
        struct hash_desc desc;
+       enum drbd_packets cmd;
+       unsigned int length;
        int rv;
 
        desc.tfm = mdev->cram_hmac_tfm;
@@ -4005,33 +4123,33 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        if (!rv)
                goto fail;
 
-       rv = drbd_recv_header(mdev, &p);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                goto fail;
 
-       if (p.command != P_AUTH_CHALLENGE) {
+       if (cmd != P_AUTH_CHALLENGE) {
                dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
-                   cmdname(p.command), p.command);
+                   cmdname(cmd), cmd);
                rv = 0;
                goto fail;
        }
 
-       if (p.length > CHALLENGE_LEN*2) {
+       if (length > CHALLENGE_LEN * 2) {
                dev_err(DEV, "expected AuthChallenge payload too big.\n");
                rv = -1;
                goto fail;
        }
 
-       peers_ch = kmalloc(p.length, GFP_NOIO);
+       peers_ch = kmalloc(length, GFP_NOIO);
        if (peers_ch == NULL) {
                dev_err(DEV, "kmalloc of peers_ch failed\n");
                rv = -1;
                goto fail;
        }
 
-       rv = drbd_recv(mdev, peers_ch, p.length);
+       rv = drbd_recv(mdev, peers_ch, length);
 
-       if (rv != p.length) {
+       if (rv != length) {
                dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
                rv = 0;
                goto fail;
@@ -4046,7 +4164,7 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        }
 
        sg_init_table(&sg, 1);
-       sg_set_buf(&sg, peers_ch, p.length);
+       sg_set_buf(&sg, peers_ch, length);
 
        rv = crypto_hash_digest(&desc, &sg, sg.length, response);
        if (rv) {
@@ -4059,18 +4177,18 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        if (!rv)
                goto fail;
 
-       rv = drbd_recv_header(mdev, &p);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                goto fail;
 
-       if (p.command != P_AUTH_RESPONSE) {
+       if (cmd != P_AUTH_RESPONSE) {
                dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
-                   cmdname(p.command), p.command);
+                       cmdname(cmd), cmd);
                rv = 0;
                goto fail;
        }
 
-       if (p.length != resp_size) {
+       if (length != resp_size) {
                dev_err(DEV, "expected AuthResponse payload of wrong size\n");
                rv = 0;
                goto fail;
@@ -4155,7 +4273,7 @@ int drbdd_init(struct drbd_thread *thi)
 
 /* ********* acknowledge sender ******** */
 
-static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_req_state_reply *p = (struct p_req_state_reply *)h;
 
@@ -4173,13 +4291,13 @@ static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
+static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
 {
        return drbd_send_ping_ack(mdev);
 
 }
 
-static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        /* restore idle timeout */
        mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
@@ -4189,7 +4307,7 @@ static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
+static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4199,11 +4317,15 @@ static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
 
        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
 
-       drbd_rs_complete_io(mdev, sector);
-       drbd_set_in_sync(mdev, sector, blksize);
-       /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
-       mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
+       if (get_ldev(mdev)) {
+               drbd_rs_complete_io(mdev, sector);
+               drbd_set_in_sync(mdev, sector, blksize);
+               /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
+               mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
+               put_ldev(mdev);
+       }
        dec_rs_pending(mdev);
+       atomic_add(blksize >> 9, &mdev->rs_sect_in);
 
        return TRUE;
 }
@@ -4259,7 +4381,7 @@ static int validate_req_change_req_state(struct drbd_conf *mdev,
        return TRUE;
 }
 
-static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4299,7 +4421,7 @@ static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
                _ack_id_to_req, __func__ , what);
 }
 
-static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4319,7 +4441,7 @@ static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
                _ack_id_to_req, __func__ , neg_acked);
 }
 
-static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4332,7 +4454,7 @@ static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
                _ar_id_to_req, __func__ , neg_acked);
 }
 
-static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        sector_t sector;
        int size;
@@ -4354,7 +4476,7 @@ static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_barrier_ack *p = (struct p_barrier_ack *)h;
 
@@ -4363,7 +4485,7 @@ static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
+static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        struct drbd_work *w;
@@ -4380,6 +4502,9 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
        else
                ov_oos_print(mdev);
 
+       if (!get_ldev(mdev))
+               return TRUE;
+
        drbd_rs_complete_io(mdev, sector);
        dec_rs_pending(mdev);
 
@@ -4394,18 +4519,18 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
                        drbd_resync_finished(mdev);
                }
        }
+       put_ldev(mdev);
        return TRUE;
 }
 
-static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
+static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
 {
-       /* IGNORE */
        return TRUE;
 }
 
 struct asender_cmd {
        size_t pkt_size;
-       int (*process)(struct drbd_conf *mdev, struct p_header *h);
+       int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
 };
 
 static struct asender_cmd *get_asender_cmd(int cmd)
@@ -4414,8 +4539,8 @@ static struct asender_cmd *get_asender_cmd(int cmd)
                /* anything missing from this table is in
                 * the drbd_cmd_handler (drbd_default_handler) table,
                 * see the beginning of drbdd() */
-       [P_PING]            = { sizeof(struct p_header), got_Ping },
-       [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
+       [P_PING]            = { sizeof(struct p_header80), got_Ping },
+       [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
@@ -4427,7 +4552,7 @@ static struct asender_cmd *get_asender_cmd(int cmd)
        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
-       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
+       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
        [P_MAX_CMD]         = { 0, NULL },
        };
        if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
@@ -4438,13 +4563,13 @@ static struct asender_cmd *get_asender_cmd(int cmd)
 int drbd_asender(struct drbd_thread *thi)
 {
        struct drbd_conf *mdev = thi->mdev;
-       struct p_header *h = &mdev->meta.rbuf.header;
+       struct p_header80 *h = &mdev->meta.rbuf.header.h80;
        struct asender_cmd *cmd = NULL;
 
        int rv, len;
        void *buf    = h;
        int received = 0;
-       int expect   = sizeof(struct p_header);
+       int expect   = sizeof(struct p_header80);
        int empty;
 
        sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
@@ -4468,10 +4593,8 @@ int drbd_asender(struct drbd_thread *thi)
                while (1) {
                        clear_bit(SIGNAL_ASENDER, &mdev->flags);
                        flush_signals(current);
-                       if (!drbd_process_done_ee(mdev)) {
-                               dev_err(DEV, "process_done_ee() = NOT_OK\n");
+                       if (!drbd_process_done_ee(mdev))
                                goto reconnect;
-                       }
                        /* to avoid race with newly queued ACKs */
                        set_bit(SIGNAL_ASENDER, &mdev->flags);
                        spin_lock_irq(&mdev->req_lock);
@@ -4530,21 +4653,23 @@ int drbd_asender(struct drbd_thread *thi)
 
                if (received == expect && cmd == NULL) {
                        if (unlikely(h->magic != BE_DRBD_MAGIC)) {
-                               dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
-                                   (long)be32_to_cpu(h->magic),
-                                   h->command, h->length);
+                               dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
+                                   be32_to_cpu(h->magic),
+                                   be16_to_cpu(h->command),
+                                   be16_to_cpu(h->length));
                                goto reconnect;
                        }
                        cmd = get_asender_cmd(be16_to_cpu(h->command));
                        len = be16_to_cpu(h->length);
                        if (unlikely(cmd == NULL)) {
-                               dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
-                                   (long)be32_to_cpu(h->magic),
-                                   h->command, h->length);
+                               dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
+                                   be32_to_cpu(h->magic),
+                                   be16_to_cpu(h->command),
+                                   be16_to_cpu(h->length));
                                goto disconnect;
                        }
                        expect = cmd->pkt_size;
-                       ERR_IF(len != expect-sizeof(struct p_header))
+                       ERR_IF(len != expect-sizeof(struct p_header80))
                                goto reconnect;
                }
                if (received == expect) {
@@ -4554,7 +4679,7 @@ int drbd_asender(struct drbd_thread *thi)
 
                        buf      = h;
                        received = 0;
-                       expect   = sizeof(struct p_header);
+                       expect   = sizeof(struct p_header80);
                        cmd      = NULL;
                }
        }
@@ -4562,10 +4687,12 @@ int drbd_asender(struct drbd_thread *thi)
        if (0) {
 reconnect:
                drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
+               drbd_md_sync(mdev);
        }
        if (0) {
 disconnect:
                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
+               drbd_md_sync(mdev);
        }
        clear_bit(SIGNAL_ASENDER, &mdev->flags);