drbd: Starting with protocol 96 we can allow app-IO while receiving the bitmap
authorPhilipp Reisner <philipp.reisner@linbit.com>
Wed, 10 Nov 2010 11:08:37 +0000 (12:08 +0100)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Thu, 10 Mar 2011 10:35:06 +0000 (11:35 +0100)
* C_STARTING_SYNC_S, C_STARTING_SYNC_T In these states the bitmap gets
  written to disk. Locking out of app-IO is done by using the
  drbd_queue_bitmap_io() and drbd_bitmap_io() functions these days.
  It is no longer necessary to lock out app-IO based on the connection
  state.
  App-IO that may come in after the BITMAP_IO flag got cleared before the
  state transition to C_SYNC_(SOURCE|TARGET) does not get mirrored, sets
  a bit in the local bitmap, that is already set, therefore changes nothing.

* C_WF_BITMAP_S In this state we send updates (P_OUT_OF_SYNC packets).
  With that we make sure they have the same number of bits when going
  into the C_SYNC_(SOURCE|TARGET) connection state.

* C_UNCONNECTED: The receiver starts, no need to lock out IO.

* C_DISCONNECTING: in drbd_disconnect() we had a wait_event()
  to wait until ap_bio_cnt reaches 0. Removed that.

* C_TIMEOUT, C_BROKEN_PIPE, C_NETWORK_FAILURE
  C_PROTOCOL_ERROR, C_TEAR_DOWN: Same as C_DISCONNECTING

* C_WF_REPORT_PARAMS: IO still possible since that is still
  like C_WF_CONNECTION.

And we do not need to send barriers in C_WF_BITMAP_S connection state.

Allow concurrent accesses to the bitmap when receiving the bitmap.
Everything gets ORed anyways.

A drbd_free_tl_hash() is in after_state_chg_work(). At that point
all the work items of the last connections must have been processed.

Introduced a call to drbd_free_tl_hash() into drbd_free_mdev()
for paranoia reasons.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_receiver.c
drivers/block/drbd/drbd_req.c

index 9a94460..38bbaba 100644 (file)
@@ -2213,8 +2213,9 @@ static inline int drbd_get_max_buffers(struct drbd_conf *mdev)
        return mxb;
 }
 
-static inline int drbd_state_is_stable(union drbd_state s)
+static inline int drbd_state_is_stable(struct drbd_conf *mdev)
 {
+       union drbd_state s = mdev->state;
 
        /* DO NOT add a default clause, we want the compiler to warn us
         * for any newly introduced state we may have forgotten to add here */
@@ -2233,11 +2234,7 @@ static inline int drbd_state_is_stable(union drbd_state s)
        case C_PAUSED_SYNC_T:
        case C_AHEAD:
        case C_BEHIND:
-               /* maybe stable, look at the disk state */
-               break;
-
-       /* no new io accepted during tansitional states
-        * like handshake or teardown */
+               /* transitional states, IO allowed */
        case C_DISCONNECTING:
        case C_UNCONNECTED:
        case C_TIMEOUT:
@@ -2248,7 +2245,15 @@ static inline int drbd_state_is_stable(union drbd_state s)
        case C_WF_REPORT_PARAMS:
        case C_STARTING_SYNC_S:
        case C_STARTING_SYNC_T:
+               break;
+
+               /* Allow IO in BM exchange states with new protocols */
        case C_WF_BITMAP_S:
+               if (mdev->agreed_pro_version < 96)
+                       return 0;
+               break;
+
+               /* no new io accepted in these states */
        case C_WF_BITMAP_T:
        case C_WF_SYNC_UUID:
        case C_MASK:
@@ -2297,7 +2302,7 @@ static inline int __inc_ap_bio_cond(struct drbd_conf *mdev)
         * to start during "stable" states. */
 
        /* no new io accepted when attaching or detaching the disk */
-       if (!drbd_state_is_stable(mdev->state))
+       if (!drbd_state_is_stable(mdev))
                return 0;
 
        /* since some older kernels don't have atomic_add_unless,
index 74a6d55..14afbd4 100644 (file)
@@ -3334,6 +3334,7 @@ void drbd_free_mdev(struct drbd_conf *mdev)
        put_disk(mdev->vdisk);
        blk_cleanup_queue(mdev->rq_queue);
        free_cpumask_var(mdev->cpu_mask);
+       drbd_free_tl_hash(mdev);
        kfree(mdev);
 }
 
index 0630a2e..f4aba9f 100644 (file)
@@ -3468,9 +3468,7 @@ static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigne
        int ok = FALSE;
        struct p_header80 *h = &mdev->data.rbuf.header.h80;
 
-       wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
-
-       drbd_bm_lock(mdev, "receive bitmap");
+       /* drbd_bm_lock(mdev, "receive bitmap"); By intention no bm_lock */
 
        /* maybe we should use some per thread scratch page,
         * and allocate that during initial device creation? */
@@ -3542,7 +3540,7 @@ static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigne
 
        ok = TRUE;
  out:
-       drbd_bm_unlock(mdev);
+       /* drbd_bm_unlock(mdev); by intention no lock */
        if (ok && mdev->state.conn == C_WF_BITMAP_S)
                drbd_start_resync(mdev, C_SYNC_SOURCE);
        free_page((unsigned long) buffer);
@@ -3804,13 +3802,6 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        if (os.conn == C_DISCONNECTING) {
                wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
 
-               if (!is_susp(mdev->state)) {
-                       /* we must not free the tl_hash
-                        * while application io is still on the fly */
-                       wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
-                       drbd_free_tl_hash(mdev);
-               }
-
                crypto_free_hash(mdev->cram_hmac_tfm);
                mdev->cram_hmac_tfm = NULL;
 
index eff0fbf..4cb8247 100644 (file)
@@ -142,7 +142,7 @@ static void _about_to_complete_local_write(struct drbd_conf *mdev,
 
        /* before we can signal completion to the upper layers,
         * we may need to close the current epoch */
-       if (mdev->state.conn >= C_CONNECTED && mdev->state.conn < C_AHEAD &&
+       if (mdev->state.conn >= C_WF_BITMAP_T && mdev->state.conn < C_AHEAD &&
            req->epoch == mdev->newest_tle->br_number)
                queue_barrier(mdev);
 
@@ -757,6 +757,23 @@ static int drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int s
        return 0 == drbd_bm_count_bits(mdev, sbnr, ebnr);
 }
 
+static int drbd_should_do_remote(struct drbd_conf *mdev)
+{
+       union drbd_state s = mdev->state;
+
+       return s.pdsk == D_UP_TO_DATE ||
+               (s.pdsk >= D_INCONSISTENT &&
+                s.conn >= C_WF_BITMAP_T &&
+                s.conn < C_AHEAD);
+}
+static int drbd_should_send_oos(struct drbd_conf *mdev)
+{
+       union drbd_state s = mdev->state;
+
+       return s.pdsk >= D_INCONSISTENT &&
+               (s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S);
+}
+
 static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
 {
        const int rw = bio_rw(bio);
@@ -828,12 +845,9 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, uns
                drbd_al_begin_io(mdev, sector);
        }
 
-       remote = remote && (mdev->state.pdsk == D_UP_TO_DATE ||
-                           (mdev->state.pdsk >= D_INCONSISTENT &&
-                            mdev->state.conn >= C_CONNECTED &&
-                            mdev->state.conn < C_AHEAD));
-       send_oos = (rw == WRITE && mdev->state.conn == C_AHEAD &&
-                   mdev->state.pdsk >= D_INCONSISTENT);
+       remote = remote && drbd_should_do_remote(mdev);
+       send_oos = rw == WRITE && drbd_should_send_oos(mdev);
+       D_ASSERT(!(remote && send_oos));
 
        if (!(local || remote) && !is_susp(mdev->state)) {
                if (__ratelimit(&drbd_ratelimit_state))
@@ -873,12 +887,9 @@ allocate_barrier:
        }
 
        if (remote || send_oos) {
-               remote = (mdev->state.pdsk == D_UP_TO_DATE ||
-                           (mdev->state.pdsk >= D_INCONSISTENT &&
-                            mdev->state.conn >= C_CONNECTED &&
-                            mdev->state.conn < C_AHEAD));
-               send_oos = (rw == WRITE && mdev->state.conn == C_AHEAD &&
-                           mdev->state.pdsk >= D_INCONSISTENT);
+               remote = drbd_should_do_remote(mdev);
+               send_oos = rw == WRITE && drbd_should_send_oos(mdev);
+               D_ASSERT(!(remote && send_oos));
 
                if (!(remote || send_oos))
                        dev_warn(DEV, "lost connection while grabbing the req_lock!\n");