Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-2.6
[pandora-kernel.git] / drivers / block / drbd / drbd_nl.c
index fe81c85..515bcd9 100644 (file)
@@ -272,9 +272,28 @@ static int _try_outdate_peer_async(void *data)
 {
        struct drbd_conf *mdev = (struct drbd_conf *)data;
        enum drbd_disk_state nps;
+       union drbd_state ns;
 
        nps = drbd_try_outdate_peer(mdev);
-       drbd_request_state(mdev, NS(pdsk, nps));
+
+       /* Not using
+          drbd_request_state(mdev, NS(pdsk, nps));
+          here, because we might were able to re-establish the connection
+          in the meantime. This can only partially be solved in the state's
+          engine is_valid_state() and is_valid_state_transition()
+          functions.
+
+          nps can be D_INCONSISTENT, D_OUTDATED or D_UNKNOWN.
+          pdsk == D_INCONSISTENT while conn >= C_CONNECTED is valid,
+          therefore we have to have the pre state change check here.
+       */
+       spin_lock_irq(&mdev->req_lock);
+       ns = mdev->state;
+       if (ns.conn < C_WF_REPORT_PARAMS) {
+               ns.pdsk = nps;
+               _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
+       }
+       spin_unlock_irq(&mdev->req_lock);
 
        return 0;
 }
@@ -288,10 +307,11 @@ void drbd_try_outdate_peer_async(struct drbd_conf *mdev)
                dev_err(DEV, "out of mem, failed to invoke fence-peer helper\n");
 }
 
-int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
+enum drbd_state_rv
+drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
 {
        const int max_tries = 4;
-       int r = 0;
+       enum drbd_state_rv rv = SS_UNKNOWN_ERROR;
        int try = 0;
        int forced = 0;
        union drbd_state mask, val;
@@ -306,17 +326,17 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
        val.i  = 0; val.role  = new_role;
 
        while (try++ < max_tries) {
-               r = _drbd_request_state(mdev, mask, val, CS_WAIT_COMPLETE);
+               rv = _drbd_request_state(mdev, mask, val, CS_WAIT_COMPLETE);
 
                /* in case we first succeeded to outdate,
                 * but now suddenly could establish a connection */
-               if (r == SS_CW_FAILED_BY_PEER && mask.pdsk != 0) {
+               if (rv == SS_CW_FAILED_BY_PEER && mask.pdsk != 0) {
                        val.pdsk = 0;
                        mask.pdsk = 0;
                        continue;
                }
 
-               if (r == SS_NO_UP_TO_DATE_DISK && force &&
+               if (rv == SS_NO_UP_TO_DATE_DISK && force &&
                    (mdev->state.disk < D_UP_TO_DATE &&
                     mdev->state.disk >= D_INCONSISTENT)) {
                        mask.disk = D_MASK;
@@ -325,7 +345,7 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
                        continue;
                }
 
-               if (r == SS_NO_UP_TO_DATE_DISK &&
+               if (rv == SS_NO_UP_TO_DATE_DISK &&
                    mdev->state.disk == D_CONSISTENT && mask.pdsk == 0) {
                        D_ASSERT(mdev->state.pdsk == D_UNKNOWN);
                        nps = drbd_try_outdate_peer(mdev);
@@ -341,9 +361,9 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
                        continue;
                }
 
-               if (r == SS_NOTHING_TO_DO)
+               if (rv == SS_NOTHING_TO_DO)
                        goto fail;
-               if (r == SS_PRIMARY_NOP && mask.pdsk == 0) {
+               if (rv == SS_PRIMARY_NOP && mask.pdsk == 0) {
                        nps = drbd_try_outdate_peer(mdev);
 
                        if (force && nps > D_OUTDATED) {
@@ -356,25 +376,24 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
 
                        continue;
                }
-               if (r == SS_TWO_PRIMARIES) {
+               if (rv == SS_TWO_PRIMARIES) {
                        /* Maybe the peer is detected as dead very soon...
                           retry at most once more in this case. */
-                       __set_current_state(TASK_INTERRUPTIBLE);
-                       schedule_timeout((mdev->net_conf->ping_timeo+1)*HZ/10);
+                       schedule_timeout_interruptible((mdev->net_conf->ping_timeo+1)*HZ/10);
                        if (try < max_tries)
                                try = max_tries - 1;
                        continue;
                }
-               if (r < SS_SUCCESS) {
-                       r = _drbd_request_state(mdev, mask, val,
+               if (rv < SS_SUCCESS) {
+                       rv = _drbd_request_state(mdev, mask, val,
                                                CS_VERBOSE + CS_WAIT_COMPLETE);
-                       if (r < SS_SUCCESS)
+                       if (rv < SS_SUCCESS)
                                goto fail;
                }
                break;
        }
 
-       if (r < SS_SUCCESS)
+       if (rv < SS_SUCCESS)
                goto fail;
 
        if (forced)
@@ -384,7 +403,7 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
        wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0);
 
        if (new_role == R_SECONDARY) {
-               set_disk_ro(mdev->vdisk, TRUE);
+               set_disk_ro(mdev->vdisk, true);
                if (get_ldev(mdev)) {
                        mdev->ldev->md.uuid[UI_CURRENT] &= ~(u64)1;
                        put_ldev(mdev);
@@ -394,7 +413,7 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
                        mdev->net_conf->want_lose = 0;
                        put_net_conf(mdev);
                }
-               set_disk_ro(mdev->vdisk, FALSE);
+               set_disk_ro(mdev->vdisk, false);
                if (get_ldev(mdev)) {
                        if (((mdev->state.conn < C_CONNECTED ||
                               mdev->state.pdsk <= D_FAILED)
@@ -406,10 +425,8 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
                }
        }
 
-       if ((new_role == R_SECONDARY) && get_ldev(mdev)) {
-               drbd_al_to_on_disk_bm(mdev);
-               put_ldev(mdev);
-       }
+       /* writeout of activity log covered areas of the bitmap
+        * to stable storage done in after state change already */
 
        if (mdev->state.conn >= C_WF_REPORT_PARAMS) {
                /* if this was forced, we should consider sync */
@@ -423,7 +440,7 @@ int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force)
        kobject_uevent(&disk_to_dev(mdev->vdisk)->kobj, KOBJ_CHANGE);
  fail:
        mutex_unlock(&mdev->state_mutex);
-       return r;
+       return rv;
 }
 
 static struct drbd_conf *ensure_mdev(int minor, int create)
@@ -528,17 +545,19 @@ static void drbd_md_set_sector_offsets(struct drbd_conf *mdev,
        }
 }
 
+/* input size is expected to be in KB */
 char *ppsize(char *buf, unsigned long long size)
 {
-       /* Needs 9 bytes at max. */
+       /* Needs 9 bytes at max including trailing NUL:
+        * -1ULL ==> "16384 EB" */
        static char units[] = { 'K', 'M', 'G', 'T', 'P', 'E' };
        int base = 0;
-       while (size >= 10000) {
+       while (size >= 10000 && base < sizeof(units)-1) {
                /* shift + round */
                size = (size >> 10) + !!(size & (1<<9));
                base++;
        }
-       sprintf(buf, "%lu %cB", (long)size, units[base]);
+       sprintf(buf, "%u %cB", (unsigned)size, units[base]);
 
        return buf;
 }
@@ -577,7 +596,7 @@ void drbd_resume_io(struct drbd_conf *mdev)
  * Returns 0 on success, negative return values indicate errors.
  * You should call drbd_md_sync() after calling this function.
  */
-enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *mdev, enum dds_flags flags) __must_hold(local)
+enum determine_dev_size drbd_determine_dev_size(struct drbd_conf *mdev, enum dds_flags flags) __must_hold(local)
 {
        sector_t prev_first_sect, prev_size; /* previous meta location */
        sector_t la_size;
@@ -642,11 +661,19 @@ enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *mdev, enum dds_
                || prev_size       != mdev->ldev->md.md_size_sect;
 
        if (la_size_changed || md_moved) {
+               int err;
+
                drbd_al_shrink(mdev); /* All extents inactive. */
                dev_info(DEV, "Writing the whole bitmap, %s\n",
                         la_size_changed && md_moved ? "size changed and md moved" :
                         la_size_changed ? "size changed" : "md moved");
-               rv = drbd_bitmap_io(mdev, &drbd_bm_write, "size changed"); /* does drbd_resume_io() ! */
+               /* next line implicitly does drbd_suspend_io()+drbd_resume_io() */
+               err = drbd_bitmap_io(mdev, &drbd_bm_write,
+                               "size changed", BM_LOCKED_MASK);
+               if (err) {
+                       rv = dev_size_error;
+                       goto out;
+               }
                drbd_md_mark_dirty(mdev);
        }
 
@@ -765,31 +792,78 @@ static int drbd_check_al_size(struct drbd_conf *mdev)
        return 0;
 }
 
-void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_seg_s) __must_hold(local)
+static void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_bio_size)
 {
        struct request_queue * const q = mdev->rq_queue;
-       struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
-       int max_segments = mdev->ldev->dc.max_bio_bvecs;
+       int max_hw_sectors = max_bio_size >> 9;
+       int max_segments = 0;
 
-       max_seg_s = min(queue_max_sectors(b) * queue_logical_block_size(b), max_seg_s);
+       if (get_ldev_if_state(mdev, D_ATTACHING)) {
+               struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
+
+               max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9);
+               max_segments = mdev->ldev->dc.max_bio_bvecs;
+               put_ldev(mdev);
+       }
 
-       blk_queue_max_hw_sectors(q, max_seg_s >> 9);
-       blk_queue_max_segments(q, max_segments ? max_segments : BLK_MAX_SEGMENTS);
-       blk_queue_max_segment_size(q, max_seg_s);
        blk_queue_logical_block_size(q, 512);
-       blk_queue_segment_boundary(q, PAGE_SIZE-1);
-       blk_stack_limits(&q->limits, &b->limits, 0);
+       blk_queue_max_hw_sectors(q, max_hw_sectors);
+       /* This is the workaround for "bio would need to, but cannot, be split" */
+       blk_queue_max_segments(q, max_segments ? max_segments : BLK_MAX_SEGMENTS);
+       blk_queue_segment_boundary(q, PAGE_CACHE_SIZE-1);
+
+       if (get_ldev_if_state(mdev, D_ATTACHING)) {
+               struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
 
-       dev_info(DEV, "max_segment_size ( = BIO size ) = %u\n", queue_max_segment_size(q));
+               blk_queue_stack_limits(q, b);
 
-       if (q->backing_dev_info.ra_pages != b->backing_dev_info.ra_pages) {
-               dev_info(DEV, "Adjusting my ra_pages to backing device's (%lu -> %lu)\n",
-                    q->backing_dev_info.ra_pages,
-                    b->backing_dev_info.ra_pages);
-               q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
+               if (q->backing_dev_info.ra_pages != b->backing_dev_info.ra_pages) {
+                       dev_info(DEV, "Adjusting my ra_pages to backing device's (%lu -> %lu)\n",
+                                q->backing_dev_info.ra_pages,
+                                b->backing_dev_info.ra_pages);
+                       q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
+               }
+               put_ldev(mdev);
        }
 }
 
+void drbd_reconsider_max_bio_size(struct drbd_conf *mdev)
+{
+       int now, new, local, peer;
+
+       now = queue_max_hw_sectors(mdev->rq_queue) << 9;
+       local = mdev->local_max_bio_size; /* Eventually last known value, from volatile memory */
+       peer = mdev->peer_max_bio_size; /* Eventually last known value, from meta data */
+
+       if (get_ldev_if_state(mdev, D_ATTACHING)) {
+               local = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
+               mdev->local_max_bio_size = local;
+               put_ldev(mdev);
+       }
+
+       /* We may ignore peer limits if the peer is modern enough.
+          Because new from 8.3.8 onwards the peer can use multiple
+          BIOs for a single peer_request */
+       if (mdev->state.conn >= C_CONNECTED) {
+               if (mdev->agreed_pro_version < 94)
+                       peer = mdev->peer_max_bio_size;
+               else if (mdev->agreed_pro_version == 94)
+                       peer = DRBD_MAX_SIZE_H80_PACKET;
+               else /* drbd 8.3.8 onwards */
+                       peer = DRBD_MAX_BIO_SIZE;
+       }
+
+       new = min_t(int, local, peer);
+
+       if (mdev->state.role == R_PRIMARY && new < now)
+               dev_err(DEV, "ASSERT FAILED new < now; (%d < %d)\n", new, now);
+
+       if (new != now)
+               dev_info(DEV, "max BIO size = %u\n", new);
+
+       drbd_setup_queue_param(mdev, new);
+}
+
 /* serialize deconfig (worker exiting, doing cleanup)
  * and reconfig (drbdsetup disk, drbdsetup net)
  *
@@ -850,7 +924,7 @@ static void drbd_suspend_al(struct drbd_conf *mdev)
 static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
                             struct drbd_nl_cfg_reply *reply)
 {
-       enum drbd_ret_codes retcode;
+       enum drbd_ret_code retcode;
        enum determine_dev_size dd;
        sector_t max_possible_sectors;
        sector_t min_md_device_sectors;
@@ -858,8 +932,7 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
        struct block_device *bdev;
        struct lru_cache *resync_lru = NULL;
        union drbd_state ns, os;
-       unsigned int max_seg_s;
-       int rv;
+       enum drbd_state_rv rv;
        int cp_discovered = 0;
        int logical_block_size;
 
@@ -1005,9 +1078,10 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
        /* and for any other previously queued work */
        drbd_flush_workqueue(mdev);
 
-       retcode = _drbd_request_state(mdev, NS(disk, D_ATTACHING), CS_VERBOSE);
+       rv = _drbd_request_state(mdev, NS(disk, D_ATTACHING), CS_VERBOSE);
+       retcode = rv;  /* FIXME: Type mismatch. */
        drbd_resume_io(mdev);
-       if (retcode < SS_SUCCESS)
+       if (rv < SS_SUCCESS)
                goto fail;
 
        if (!get_ldev_if_state(mdev, D_ATTACHING))
@@ -1109,20 +1183,7 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
        mdev->read_cnt = 0;
        mdev->writ_cnt = 0;
 
-       max_seg_s = DRBD_MAX_SEGMENT_SIZE;
-       if (mdev->state.conn == C_CONNECTED) {
-               /* We are Primary, Connected, and now attach a new local
-                * backing store. We must not increase the user visible maximum
-                * bio size on this device to something the peer may not be
-                * able to handle. */
-               if (mdev->agreed_pro_version < 94)
-                       max_seg_s = queue_max_segment_size(mdev->rq_queue);
-               else if (mdev->agreed_pro_version == 94)
-                       max_seg_s = DRBD_MAX_SIZE_H80_PACKET;
-               /* else: drbd 8.3.9 and later, stay with default */
-       }
-
-       drbd_setup_queue_param(mdev, max_seg_s);
+       drbd_reconsider_max_bio_size(mdev);
 
        /* If I am currently not R_PRIMARY,
         * but meta data primary indicator is set,
@@ -1144,7 +1205,7 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
            !drbd_md_test_flag(mdev->ldev, MDF_CONNECTED_IND))
                set_bit(USE_DEGR_WFC_T, &mdev->flags);
 
-       dd = drbd_determin_dev_size(mdev, 0);
+       dd = drbd_determine_dev_size(mdev, 0);
        if (dd == dev_size_error) {
                retcode = ERR_NOMEM_BITMAP;
                goto force_diskless_dec;
@@ -1154,12 +1215,14 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
        if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
                dev_info(DEV, "Assuming that all blocks are out of sync "
                     "(aka FullSync)\n");
-               if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from attaching")) {
+               if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write,
+                       "set_n_write from attaching", BM_LOCKED_MASK)) {
                        retcode = ERR_IO_MD_DISK;
                        goto force_diskless_dec;
                }
        } else {
-               if (drbd_bitmap_io(mdev, &drbd_bm_read, "read from attaching") < 0) {
+               if (drbd_bitmap_io(mdev, &drbd_bm_read,
+                       "read from attaching", BM_LOCKED_MASK) < 0) {
                        retcode = ERR_IO_MD_DISK;
                        goto force_diskless_dec;
                }
@@ -1167,7 +1230,11 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
 
        if (cp_discovered) {
                drbd_al_apply_to_bm(mdev);
-               drbd_al_to_on_disk_bm(mdev);
+               if (drbd_bitmap_io(mdev, &drbd_bm_write,
+                       "crashed primary apply AL", BM_LOCKED_MASK)) {
+                       retcode = ERR_IO_MD_DISK;
+                       goto force_diskless_dec;
+               }
        }
 
        if (_drbd_bm_total_weight(mdev) == drbd_bm_bits(mdev))
@@ -1267,11 +1334,19 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
 static int drbd_nl_detach(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
                          struct drbd_nl_cfg_reply *reply)
 {
+       enum drbd_ret_code retcode;
+       int ret;
        drbd_suspend_io(mdev); /* so no-one is stuck in drbd_al_begin_io */
-       reply->ret_code = drbd_request_state(mdev, NS(disk, D_DISKLESS));
-       if (mdev->state.disk == D_DISKLESS)
-               wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+       retcode = drbd_request_state(mdev, NS(disk, D_FAILED));
+       /* D_FAILED will transition to DISKLESS. */
+       ret = wait_event_interruptible(mdev->misc_wait,
+                       mdev->state.disk != D_FAILED);
        drbd_resume_io(mdev);
+       if ((int)retcode == (int)SS_IS_DISKLESS)
+               retcode = SS_NOTHING_TO_DO;
+       if (ret)
+               retcode = ERR_INTR;
+       reply->ret_code = retcode;
        return 0;
 }
 
@@ -1279,7 +1354,7 @@ static int drbd_nl_net_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
                            struct drbd_nl_cfg_reply *reply)
 {
        int i, ns;
-       enum drbd_ret_codes retcode;
+       enum drbd_ret_code retcode;
        struct net_conf *new_conf = NULL;
        struct crypto_hash *tfm = NULL;
        struct crypto_hash *integrity_w_tfm = NULL;
@@ -1324,6 +1399,8 @@ static int drbd_nl_net_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
        new_conf->wire_protocol    = DRBD_PROT_C;
        new_conf->ping_timeo       = DRBD_PING_TIMEO_DEF;
        new_conf->rr_conflict      = DRBD_RR_CONFLICT_DEF;
+       new_conf->on_congestion    = DRBD_ON_CONGESTION_DEF;
+       new_conf->cong_extents     = DRBD_CONG_EXTENTS_DEF;
 
        if (!net_conf_from_tags(mdev, nlp->tag_list, new_conf)) {
                retcode = ERR_MANDATORY_TAG;
@@ -1345,6 +1422,11 @@ static int drbd_nl_net_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
                }
        }
 
+       if (new_conf->on_congestion != OC_BLOCK && new_conf->wire_protocol != DRBD_PROT_A) {
+               retcode = ERR_CONG_NOT_PROTO_A;
+               goto fail;
+       }
+
        if (mdev->state.role == R_PRIMARY && new_conf->want_lose) {
                retcode = ERR_DISCARD;
                goto fail;
@@ -1525,6 +1607,21 @@ static int drbd_nl_disconnect(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nl
                              struct drbd_nl_cfg_reply *reply)
 {
        int retcode;
+       struct disconnect dc;
+
+       memset(&dc, 0, sizeof(struct disconnect));
+       if (!disconnect_from_tags(mdev, nlp->tag_list, &dc)) {
+               retcode = ERR_MANDATORY_TAG;
+               goto fail;
+       }
+
+       if (dc.force) {
+               spin_lock_irq(&mdev->req_lock);
+               if (mdev->state.conn >= C_WF_CONNECTION)
+                       _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), CS_HARD, NULL);
+               spin_unlock_irq(&mdev->req_lock);
+               goto done;
+       }
 
        retcode = _drbd_request_state(mdev, NS(conn, C_DISCONNECTING), CS_ORDERED);
 
@@ -1622,7 +1719,7 @@ static int drbd_nl_resize(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
 
        mdev->ldev->dc.disk_size = (sector_t)rs.resize_size;
        ddsf = (rs.resize_force ? DDSF_FORCED : 0) | (rs.no_resync ? DDSF_NO_RESYNC : 0);
-       dd = drbd_determin_dev_size(mdev, ddsf);
+       dd = drbd_determine_dev_size(mdev, ddsf);
        drbd_md_sync(mdev);
        put_ldev(mdev);
        if (dd == dev_size_error) {
@@ -1842,6 +1939,10 @@ static int drbd_nl_invalidate(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nl
 {
        int retcode;
 
+       /* If there is still bitmap IO pending, probably because of a previous
+        * resync just being finished, wait for it before requesting a new resync. */
+       wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
+
        retcode = _drbd_request_state(mdev, NS(conn, C_STARTING_SYNC_T), CS_ORDERED);
 
        if (retcode < SS_SUCCESS && retcode != SS_NEED_CONNECTION)
@@ -1877,6 +1978,10 @@ static int drbd_nl_invalidate_peer(struct drbd_conf *mdev, struct drbd_nl_cfg_re
 {
        int retcode;
 
+       /* If there is still bitmap IO pending, probably because of a previous
+        * resync just being finished, wait for it before requesting a new resync. */
+       wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
+
        retcode = _drbd_request_state(mdev, NS(conn, C_STARTING_SYNC_S), CS_ORDERED);
 
        if (retcode < SS_SUCCESS) {
@@ -1885,9 +1990,9 @@ static int drbd_nl_invalidate_peer(struct drbd_conf *mdev, struct drbd_nl_cfg_re
                           into a full resync. */
                        retcode = drbd_request_state(mdev, NS(pdsk, D_INCONSISTENT));
                        if (retcode >= SS_SUCCESS) {
-                               /* open coded drbd_bitmap_io() */
                                if (drbd_bitmap_io(mdev, &drbd_bmio_set_susp_al,
-                                                  "set_n_write from invalidate_peer"))
+                                       "set_n_write from invalidate_peer",
+                                       BM_LOCKED_SET_ALLOWED))
                                        retcode = ERR_IO_MD_DISK;
                        }
                } else
@@ -1914,9 +2019,17 @@ static int drbd_nl_resume_sync(struct drbd_conf *mdev, struct drbd_nl_cfg_req *n
                               struct drbd_nl_cfg_reply *reply)
 {
        int retcode = NO_ERROR;
+       union drbd_state s;
 
-       if (drbd_request_state(mdev, NS(user_isp, 0)) == SS_NOTHING_TO_DO)
-               retcode = ERR_PAUSE_IS_CLEAR;
+       if (drbd_request_state(mdev, NS(user_isp, 0)) == SS_NOTHING_TO_DO) {
+               s = mdev->state;
+               if (s.conn == C_PAUSED_SYNC_S || s.conn == C_PAUSED_SYNC_T) {
+                       retcode = s.aftr_isp ? ERR_PIC_AFTER_DEP :
+                                 s.peer_isp ? ERR_PIC_PEER_DEP : ERR_PAUSE_IS_CLEAR;
+               } else {
+                       retcode = ERR_PAUSE_IS_CLEAR;
+               }
+       }
 
        reply->ret_code = retcode;
        return 0;
@@ -2054,6 +2167,11 @@ static int drbd_nl_start_ov(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
                reply->ret_code = ERR_MANDATORY_TAG;
                return 0;
        }
+
+       /* If there is still bitmap IO pending, e.g. previous resync or verify
+        * just being finished, wait for it before requesting a new resync. */
+       wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
+
        /* w_make_ov_request expects position to be aligned */
        mdev->ov_start_sector = args.start_sector & ~BM_SECT_PER_BIT;
        reply->ret_code = drbd_request_state(mdev,NS(conn,C_VERIFY_S));
@@ -2097,7 +2215,8 @@ static int drbd_nl_new_c_uuid(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nl
        drbd_uuid_new_current(mdev); /* New current, previous to UI_BITMAP */
 
        if (args.clear_bm) {
-               err = drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write, "clear_n_write from new_c_uuid");
+               err = drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
+                       "clear_n_write from new_c_uuid", BM_LOCKED_MASK);
                if (err) {
                        dev_err(DEV, "Writing bitmap failed with %d\n",err);
                        retcode = ERR_IO_MD_DISK;
@@ -2105,6 +2224,7 @@ static int drbd_nl_new_c_uuid(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nl
                if (skip_initial_sync) {
                        drbd_send_uuids_skip_initial_sync(mdev);
                        _drbd_uuid_set(mdev, UI_BITMAP, 0);
+                       drbd_print_uuids(mdev, "cleared bitmap UUID");
                        spin_lock_irq(&mdev->req_lock);
                        _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
                                        CS_VERBOSE, NULL);
@@ -2189,7 +2309,8 @@ static void drbd_connector_callback(struct cn_msg *req, struct netlink_skb_parms
                goto fail;
        }
 
-       if (nlp->packet_type >= P_nl_after_last_packet) {
+       if (nlp->packet_type >= P_nl_after_last_packet ||
+           nlp->packet_type == P_return_code_only) {
                retcode = ERR_PACKET_NR;
                goto fail;
        }
@@ -2205,7 +2326,7 @@ static void drbd_connector_callback(struct cn_msg *req, struct netlink_skb_parms
        reply_size += cm->reply_body_size;
 
        /* allocation not in the IO path, cqueue thread context */
-       cn_reply = kmalloc(reply_size, GFP_KERNEL);
+       cn_reply = kzalloc(reply_size, GFP_KERNEL);
        if (!cn_reply) {
                retcode = ERR_NOMEM;
                goto fail;
@@ -2213,7 +2334,7 @@ static void drbd_connector_callback(struct cn_msg *req, struct netlink_skb_parms
        reply = (struct drbd_nl_cfg_reply *) cn_reply->data;
 
        reply->packet_type =
-               cm->reply_body_size ? nlp->packet_type : P_nl_after_last_packet;
+               cm->reply_body_size ? nlp->packet_type : P_return_code_only;
        reply->minor = nlp->drbd_minor;
        reply->ret_code = NO_ERROR; /* Might by modified by cm->function. */
        /* reply->tag_list; might be modified by cm->function. */
@@ -2376,7 +2497,7 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
        /* receiver thread context, which is not in the writeout path (of this node),
         * but may be in the writeout path of the _other_ node.
         * GFP_NOIO to avoid potential "distributed deadlock". */
-       cn_reply = kmalloc(
+       cn_reply = kzalloc(
                sizeof(struct cn_msg)+
                sizeof(struct drbd_nl_cfg_reply)+
                sizeof(struct dump_ee_tag_len_struct)+
@@ -2398,10 +2519,11 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
        tl = tl_add_int(tl, T_ee_sector, &e->sector);
        tl = tl_add_int(tl, T_ee_block_id, &e->block_id);
 
+       /* dump the first 32k */
+       len = min_t(unsigned, e->size, 32 << 10);
        put_unaligned(T_ee_data, tl++);
-       put_unaligned(e->size, tl++);
+       put_unaligned(len, tl++);
 
-       len = e->size;
        page = e->pages;
        page_chain_for_each(page) {
                void *d = kmap_atomic(page, KM_USER0);
@@ -2410,6 +2532,8 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
                kunmap_atomic(d, KM_USER0);
                tl = (unsigned short*)((char*)tl + l);
                len -= l;
+               if (len == 0)
+                       break;
        }
        put_unaligned(TT_END, tl++); /* Close the tag list */
 
@@ -2508,6 +2632,7 @@ void drbd_nl_send_reply(struct cn_msg *req, int ret_code)
                (struct drbd_nl_cfg_reply *)cn_reply->data;
        int rr;
 
+       memset(buffer, 0, sizeof(buffer));
        cn_reply->id = req->id;
 
        cn_reply->seq = req->seq;
@@ -2515,6 +2640,7 @@ void drbd_nl_send_reply(struct cn_msg *req, int ret_code)
        cn_reply->len = sizeof(struct drbd_nl_cfg_reply);
        cn_reply->flags = 0;
 
+       reply->packet_type = P_return_code_only;
        reply->minor = ((struct drbd_nl_cfg_req *)req->data)->drbd_minor;
        reply->ret_code = ret_code;