Merge branch 'for-2.6.37/drivers' of git://git.kernel.dk/linux-2.6-block
[pandora-kernel.git] / drivers / block / drbd / drbd_main.c
index e4b5611..c5dfe64 100644 (file)
@@ -78,6 +78,7 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
 static void md_sync_timer_fn(unsigned long data);
 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
+static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
 
 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
              "Lars Ellenberg <lars@linbit.com>");
@@ -200,7 +201,7 @@ static int tl_init(struct drbd_conf *mdev)
        INIT_LIST_HEAD(&b->w.list);
        b->next = NULL;
        b->br_number = 4711;
-       b->n_req = 0;
+       b->n_writes = 0;
        b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
 
        mdev->oldest_tle = b;
@@ -241,7 +242,7 @@ void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
        INIT_LIST_HEAD(&new->w.list);
        new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
        new->next = NULL;
-       new->n_req = 0;
+       new->n_writes = 0;
 
        newest_before = mdev->newest_tle;
        /* never send a barrier number == 0, because that is special-cased
@@ -285,9 +286,9 @@ void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
                        barrier_nr, b->br_number);
                goto bail;
        }
-       if (b->n_req != set_size) {
-               dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
-                       barrier_nr, set_size, b->n_req);
+       if (b->n_writes != set_size) {
+               dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
+                       barrier_nr, set_size, b->n_writes);
                goto bail;
        }
 
@@ -334,6 +335,82 @@ bail:
        drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
 }
 
+/**
+ * _tl_restart() - Walks the transfer log, and applies an action to all requests
+ * @mdev:      DRBD device.
+ * @what:       The action/event to perform with all request objects
+ *
+ * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
+ * restart_frozen_disk_io.
+ */
+static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
+{
+       struct drbd_tl_epoch *b, *tmp, **pn;
+       struct list_head *le, *tle, carry_reads;
+       struct drbd_request *req;
+       int rv, n_writes, n_reads;
+
+       b = mdev->oldest_tle;
+       pn = &mdev->oldest_tle;
+       while (b) {
+               n_writes = 0;
+               n_reads = 0;
+               INIT_LIST_HEAD(&carry_reads);
+               list_for_each_safe(le, tle, &b->requests) {
+                       req = list_entry(le, struct drbd_request, tl_requests);
+                       rv = _req_mod(req, what);
+
+                       n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
+                       n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
+               }
+               tmp = b->next;
+
+               if (n_writes) {
+                       if (what == resend) {
+                               b->n_writes = n_writes;
+                               if (b->w.cb == NULL) {
+                                       b->w.cb = w_send_barrier;
+                                       inc_ap_pending(mdev);
+                                       set_bit(CREATE_BARRIER, &mdev->flags);
+                               }
+
+                               drbd_queue_work(&mdev->data.work, &b->w);
+                       }
+                       pn = &b->next;
+               } else {
+                       if (n_reads)
+                               list_add(&carry_reads, &b->requests);
+                       /* there could still be requests on that ring list,
+                        * in case local io is still pending */
+                       list_del(&b->requests);
+
+                       /* dec_ap_pending corresponding to queue_barrier.
+                        * the newest barrier may not have been queued yet,
+                        * in which case w.cb is still NULL. */
+                       if (b->w.cb != NULL)
+                               dec_ap_pending(mdev);
+
+                       if (b == mdev->newest_tle) {
+                               /* recycle, but reinit! */
+                               D_ASSERT(tmp == NULL);
+                               INIT_LIST_HEAD(&b->requests);
+                               list_splice(&carry_reads, &b->requests);
+                               INIT_LIST_HEAD(&b->w.list);
+                               b->w.cb = NULL;
+                               b->br_number = net_random();
+                               b->n_writes = 0;
+
+                               *pn = b;
+                               break;
+                       }
+                       *pn = tmp;
+                       kfree(b);
+               }
+               b = tmp;
+               list_splice(&carry_reads, &b->requests);
+       }
+}
+
 
 /**
  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
@@ -345,48 +422,12 @@ bail:
  */
 void tl_clear(struct drbd_conf *mdev)
 {
-       struct drbd_tl_epoch *b, *tmp;
        struct list_head *le, *tle;
        struct drbd_request *r;
-       int new_initial_bnr = net_random();
 
        spin_lock_irq(&mdev->req_lock);
 
-       b = mdev->oldest_tle;
-       while (b) {
-               list_for_each_safe(le, tle, &b->requests) {
-                       r = list_entry(le, struct drbd_request, tl_requests);
-                       /* It would be nice to complete outside of spinlock.
-                        * But this is easier for now. */
-                       _req_mod(r, connection_lost_while_pending);
-               }
-               tmp = b->next;
-
-               /* there could still be requests on that ring list,
-                * in case local io is still pending */
-               list_del(&b->requests);
-
-               /* dec_ap_pending corresponding to queue_barrier.
-                * the newest barrier may not have been queued yet,
-                * in which case w.cb is still NULL. */
-               if (b->w.cb != NULL)
-                       dec_ap_pending(mdev);
-
-               if (b == mdev->newest_tle) {
-                       /* recycle, but reinit! */
-                       D_ASSERT(tmp == NULL);
-                       INIT_LIST_HEAD(&b->requests);
-                       INIT_LIST_HEAD(&b->w.list);
-                       b->w.cb = NULL;
-                       b->br_number = new_initial_bnr;
-                       b->n_req = 0;
-
-                       mdev->oldest_tle = b;
-                       break;
-               }
-               kfree(b);
-               b = tmp;
-       }
+       _tl_restart(mdev, connection_lost_while_pending);
 
        /* we expect this list to be empty. */
        D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
@@ -402,6 +443,15 @@ void tl_clear(struct drbd_conf *mdev)
        /* ensure bit indicating barrier is required is clear */
        clear_bit(CREATE_BARRIER, &mdev->flags);
 
+       memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
+
+       spin_unlock_irq(&mdev->req_lock);
+}
+
+void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
+{
+       spin_lock_irq(&mdev->req_lock);
+       _tl_restart(mdev, what);
        spin_unlock_irq(&mdev->req_lock);
 }
 
@@ -456,7 +506,7 @@ static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
 static int is_valid_state_transition(struct drbd_conf *,
                                     union drbd_state, union drbd_state);
 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
-                                      union drbd_state ns, int *warn_sync_abort);
+                                      union drbd_state ns, const char **warn_sync_abort);
 int drbd_send_state_req(struct drbd_conf *,
                        union drbd_state, union drbd_state);
 
@@ -606,7 +656,7 @@ static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
            drbd_role_str(ns.peer),
            drbd_disk_str(ns.disk),
            drbd_disk_str(ns.pdsk),
-           ns.susp ? 's' : 'r',
+           is_susp(ns) ? 's' : 'r',
            ns.aftr_isp ? 'a' : '-',
            ns.peer_isp ? 'p' : '-',
            ns.user_isp ? 'u' : '-'
@@ -764,7 +814,7 @@ static int is_valid_state_transition(struct drbd_conf *mdev,
  * to D_UNKNOWN. This rule and many more along those lines are in this function.
  */
 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
-                                      union drbd_state ns, int *warn_sync_abort)
+                                      union drbd_state ns, const char **warn_sync_abort)
 {
        enum drbd_fencing_p fp;
 
@@ -779,9 +829,10 @@ static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state
            os.conn <= C_DISCONNECTING)
                ns.conn = os.conn;
 
-       /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
+       /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
+        * If you try to go into some Sync* state, that shall fail (elsewhere). */
        if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
-           ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
+           ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
                ns.conn = os.conn;
 
        /* After C_DISCONNECTING only C_STANDALONE may follow */
@@ -799,14 +850,13 @@ static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state
        if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
                ns.aftr_isp = 0;
 
-       if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
-               ns.pdsk = D_UNKNOWN;
-
        /* Abort resync if a disk fails/detaches */
        if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
            (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
                if (warn_sync_abort)
-                       *warn_sync_abort = 1;
+                       *warn_sync_abort =
+                               os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
+                               "Online-verify" : "Resync";
                ns.conn = C_CONNECTED;
        }
 
@@ -877,7 +927,12 @@ static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state
        if (fp == FP_STONITH &&
            (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
            !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
-               ns.susp = 1;
+               ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
+
+       if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
+           (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
+           !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
+               ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
 
        if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
                if (ns.conn == C_SYNC_SOURCE)
@@ -913,6 +968,12 @@ static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
        }
 }
 
+static void drbd_resume_al(struct drbd_conf *mdev)
+{
+       if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
+               dev_info(DEV, "Resumed AL updates\n");
+}
+
 /**
  * __drbd_set_state() - Set a new DRBD state
  * @mdev:      DRBD device.
@@ -928,7 +989,7 @@ int __drbd_set_state(struct drbd_conf *mdev,
 {
        union drbd_state os;
        int rv = SS_SUCCESS;
-       int warn_sync_abort = 0;
+       const char *warn_sync_abort = NULL;
        struct after_state_chg_work *ascw;
 
        os = mdev->state;
@@ -947,14 +1008,8 @@ int __drbd_set_state(struct drbd_conf *mdev,
                        /* If the old state was illegal as well, then let
                           this happen...*/
 
-                       if (is_valid_state(mdev, os) == rv) {
-                               dev_err(DEV, "Considering state change from bad state. "
-                                   "Error would be: '%s'\n",
-                                   drbd_set_st_err_str(rv));
-                               print_st(mdev, "old", os);
-                               print_st(mdev, "new", ns);
+                       if (is_valid_state(mdev, os) == rv)
                                rv = is_valid_state_transition(mdev, ns, os);
-                       }
                } else
                        rv = is_valid_state_transition(mdev, ns, os);
        }
@@ -966,7 +1021,7 @@ int __drbd_set_state(struct drbd_conf *mdev,
        }
 
        if (warn_sync_abort)
-               dev_warn(DEV, "Resync aborted.\n");
+               dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
 
        {
                char *pbp, pb[300];
@@ -977,7 +1032,10 @@ int __drbd_set_state(struct drbd_conf *mdev,
                PSC(conn);
                PSC(disk);
                PSC(pdsk);
-               PSC(susp);
+               if (is_susp(ns) != is_susp(os))
+                       pbp += sprintf(pbp, "susp( %s -> %s ) ",
+                                      drbd_susp_str(is_susp(os)),
+                                      drbd_susp_str(is_susp(ns)));
                PSC(aftr_isp);
                PSC(peer_isp);
                PSC(user_isp);
@@ -1002,12 +1060,6 @@ int __drbd_set_state(struct drbd_conf *mdev,
        wake_up(&mdev->misc_wait);
        wake_up(&mdev->state_wait);
 
-       /*   post-state-change actions   */
-       if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
-               set_bit(STOP_SYNC_TIMER, &mdev->flags);
-               mod_timer(&mdev->resync_timer, jiffies);
-       }
-
        /* aborted verify run. log the last position */
        if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
            ns.conn < C_CONNECTED) {
@@ -1020,41 +1072,42 @@ int __drbd_set_state(struct drbd_conf *mdev,
        if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
            (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
                dev_info(DEV, "Syncer continues.\n");
-               mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
-               if (ns.conn == C_SYNC_TARGET) {
-                       if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
-                               mod_timer(&mdev->resync_timer, jiffies);
-                       /* This if (!test_bit) is only needed for the case
-                          that a device that has ceased to used its timer,
-                          i.e. it is already in drbd_resync_finished() gets
-                          paused and resumed. */
-               }
+               mdev->rs_paused += (long)jiffies
+                                 -(long)mdev->rs_mark_time[mdev->rs_last_mark];
+               if (ns.conn == C_SYNC_TARGET)
+                       mod_timer(&mdev->resync_timer, jiffies);
        }
 
        if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
            (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
                dev_info(DEV, "Resync suspended\n");
-               mdev->rs_mark_time = jiffies;
-               if (ns.conn == C_PAUSED_SYNC_T)
-                       set_bit(STOP_SYNC_TIMER, &mdev->flags);
+               mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
        }
 
        if (os.conn == C_CONNECTED &&
            (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
+               unsigned long now = jiffies;
+               int i;
+
                mdev->ov_position = 0;
-               mdev->rs_total =
-               mdev->rs_mark_left = drbd_bm_bits(mdev);
+               mdev->rs_total = drbd_bm_bits(mdev);
                if (mdev->agreed_pro_version >= 90)
                        set_ov_position(mdev, ns.conn);
                else
                        mdev->ov_start_sector = 0;
                mdev->ov_left = mdev->rs_total
                              - BM_SECT_TO_BIT(mdev->ov_position);
-               mdev->rs_start     =
-               mdev->rs_mark_time = jiffies;
+               mdev->rs_start = now;
+               mdev->rs_last_events = 0;
+               mdev->rs_last_sect_ev = 0;
                mdev->ov_last_oos_size = 0;
                mdev->ov_last_oos_start = 0;
 
+               for (i = 0; i < DRBD_SYNC_MARKS; i++) {
+                       mdev->rs_mark_left[i] = mdev->rs_total;
+                       mdev->rs_mark_time[i] = now;
+               }
+
                if (ns.conn == C_VERIFY_S) {
                        dev_info(DEV, "Starting Online Verify from sector %llu\n",
                                        (unsigned long long)mdev->ov_position);
@@ -1107,6 +1160,10 @@ int __drbd_set_state(struct drbd_conf *mdev,
            ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
                drbd_thread_restart_nowait(&mdev->receiver);
 
+       /* Resume AL writing if we get a connection */
+       if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
+               drbd_resume_al(mdev);
+
        ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
        if (ascw) {
                ascw->os = os;
@@ -1165,6 +1222,8 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
                           union drbd_state ns, enum chg_state_flags flags)
 {
        enum drbd_fencing_p fp;
+       enum drbd_req_event what = nothing;
+       union drbd_state nsm = (union drbd_state){ .i = -1 };
 
        if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
                clear_bit(CRASHED_PRIMARY, &mdev->flags);
@@ -1188,17 +1247,49 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
        /* Here we have the actions that are performed after a
           state change. This function might sleep */
 
-       if (fp == FP_STONITH && ns.susp) {
-               /* case1: The outdate peer handler is successful:
-                * case2: The connection was established again: */
-               if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
-                   (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
+       nsm.i = -1;
+       if (ns.susp_nod) {
+               if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
+                       if (ns.conn == C_CONNECTED)
+                               what = resend, nsm.susp_nod = 0;
+                       else /* ns.conn > C_CONNECTED */
+                               dev_err(DEV, "Unexpected Resynd going on!\n");
+               }
+
+               if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
+                       what = restart_frozen_disk_io, nsm.susp_nod = 0;
+
+       }
+
+       if (ns.susp_fen) {
+               /* case1: The outdate peer handler is successful: */
+               if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
                        tl_clear(mdev);
+                       if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
+                               drbd_uuid_new_current(mdev);
+                               clear_bit(NEW_CUR_UUID, &mdev->flags);
+                               drbd_md_sync(mdev);
+                       }
                        spin_lock_irq(&mdev->req_lock);
-                       _drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
+                       _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
                        spin_unlock_irq(&mdev->req_lock);
                }
+               /* case2: The connection was established again: */
+               if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
+                       clear_bit(NEW_CUR_UUID, &mdev->flags);
+                       what = resend;
+                       nsm.susp_fen = 0;
+               }
+       }
+
+       if (what != nothing) {
+               spin_lock_irq(&mdev->req_lock);
+               _tl_restart(mdev, what);
+               nsm.i &= mdev->state.i;
+               _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
+               spin_unlock_irq(&mdev->req_lock);
        }
+
        /* Do not change the order of the if above and the two below... */
        if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
                drbd_send_uuids(mdev);
@@ -1217,16 +1308,22 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
                if (get_ldev(mdev)) {
                        if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
                            mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
-                               drbd_uuid_new_current(mdev);
-                               drbd_send_uuids(mdev);
+                               if (is_susp(mdev->state)) {
+                                       set_bit(NEW_CUR_UUID, &mdev->flags);
+                               } else {
+                                       drbd_uuid_new_current(mdev);
+                                       drbd_send_uuids(mdev);
+                               }
                        }
                        put_ldev(mdev);
                }
        }
 
        if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
-               if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
+               if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
                        drbd_uuid_new_current(mdev);
+                       drbd_send_uuids(mdev);
+               }
 
                /* D_DISKLESS Peer becomes secondary */
                if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
@@ -1268,42 +1365,51 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
            os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
                drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
 
+       /* first half of local IO error */
        if (os.disk > D_FAILED && ns.disk == D_FAILED) {
-               enum drbd_io_error_p eh;
+               enum drbd_io_error_p eh = EP_PASS_ON;
+
+               if (drbd_send_state(mdev))
+                       dev_warn(DEV, "Notified peer that my disk is broken.\n");
+               else
+                       dev_err(DEV, "Sending state for drbd_io_error() failed\n");
+
+               drbd_rs_cancel_all(mdev);
 
-               eh = EP_PASS_ON;
                if (get_ldev_if_state(mdev, D_FAILED)) {
                        eh = mdev->ldev->dc.on_io_error;
                        put_ldev(mdev);
                }
+               if (eh == EP_CALL_HELPER)
+                       drbd_khelper(mdev, "local-io-error");
+       }
 
-               drbd_rs_cancel_all(mdev);
-               /* since get_ldev() only works as long as disk>=D_INCONSISTENT,
-                  and it is D_DISKLESS here, local_cnt can only go down, it can
-                  not increase... It will reach zero */
-               wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+
+       /* second half of local IO error handling,
+        * after local_cnt references have reached zero: */
+       if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
                mdev->rs_total = 0;
                mdev->rs_failed = 0;
                atomic_set(&mdev->rs_pending_cnt, 0);
-
-               spin_lock_irq(&mdev->req_lock);
-               _drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
-               spin_unlock_irq(&mdev->req_lock);
-
-               if (eh == EP_CALL_HELPER)
-                       drbd_khelper(mdev, "local-io-error");
        }
 
        if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
+               /* We must still be diskless,
+                * re-attach has to be serialized with this! */
+               if (mdev->state.disk != D_DISKLESS)
+                       dev_err(DEV,
+                               "ASSERT FAILED: disk is %s while going diskless\n",
+                               drbd_disk_str(mdev->state.disk));
+
+               /* we cannot assert local_cnt == 0 here, as get_ldev_if_state
+                * will inc/dec it frequently. Since we became D_DISKLESS, no
+                * one has touched the protected members anymore, though, so we
+                * are safe to free them here. */
+               if (drbd_send_state(mdev))
+                       dev_warn(DEV, "Notified peer that I detached my disk.\n");
+               else
+                       dev_err(DEV, "Sending state for detach failed\n");
 
-               if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
-                       if (drbd_send_state(mdev))
-                               dev_warn(DEV, "Notified peer that my disk is broken.\n");
-                       else
-                               dev_err(DEV, "Sending state in drbd_io_error() failed\n");
-               }
-
-               wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
                lc_destroy(mdev->resync);
                mdev->resync = NULL;
                lc_destroy(mdev->act_log);
@@ -1312,8 +1418,10 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
                        drbd_free_bc(mdev->ldev);
                        mdev->ldev = NULL;);
 
-               if (mdev->md_io_tmpp)
+               if (mdev->md_io_tmpp) {
                        __free_page(mdev->md_io_tmpp);
+                       mdev->md_io_tmpp = NULL;
+               }
        }
 
        /* Disks got bigger while they were detached */
@@ -1329,6 +1437,15 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
            (os.user_isp && !ns.user_isp))
                resume_next_sg(mdev);
 
+       /* sync target done with resync.  Explicitly notify peer, even though
+        * it should (at least for non-empty resyncs) already know itself. */
+       if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
+               drbd_send_state(mdev);
+
+       /* free tl_hash if we Got thawed and are C_STANDALONE */
+       if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
+               drbd_free_tl_hash(mdev);
+
        /* Upon network connection, we need to start the receiver */
        if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
                drbd_thread_start(&mdev->receiver);
@@ -1555,7 +1672,7 @@ void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
 
 /* the appropriate socket mutex must be held already */
 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
-                         enum drbd_packets cmd, struct p_header *h,
+                         enum drbd_packets cmd, struct p_header80 *h,
                          size_t size, unsigned msg_flags)
 {
        int sent, ok;
@@ -1565,7 +1682,7 @@ int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
 
        h->magic   = BE_DRBD_MAGIC;
        h->command = cpu_to_be16(cmd);
-       h->length  = cpu_to_be16(size-sizeof(struct p_header));
+       h->length  = cpu_to_be16(size-sizeof(struct p_header80));
 
        sent = drbd_send(mdev, sock, h, size, msg_flags);
 
@@ -1580,7 +1697,7 @@ int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
  * when we hold the appropriate socket mutex.
  */
 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
-                 enum drbd_packets cmd, struct p_header *h, size_t size)
+                 enum drbd_packets cmd, struct p_header80 *h, size_t size)
 {
        int ok = 0;
        struct socket *sock;
@@ -1608,7 +1725,7 @@ int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
                   size_t size)
 {
-       struct p_header h;
+       struct p_header80 h;
        int ok;
 
        h.magic   = BE_DRBD_MAGIC;
@@ -1630,7 +1747,7 @@ int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
 
 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
 {
-       struct p_rs_param_89 *p;
+       struct p_rs_param_95 *p;
        struct socket *sock;
        int size, rv;
        const int apv = mdev->agreed_pro_version;
@@ -1638,7 +1755,8 @@ int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
        size = apv <= 87 ? sizeof(struct p_rs_param)
                : apv == 88 ? sizeof(struct p_rs_param)
                        + strlen(mdev->sync_conf.verify_alg) + 1
-               : /* 89 */    sizeof(struct p_rs_param_89);
+               : apv <= 94 ? sizeof(struct p_rs_param_89)
+               : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 
        /* used from admin command context and receiver/worker context.
         * to avoid kmalloc, grab the socket right here,
@@ -1649,12 +1767,16 @@ int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
        if (likely(sock != NULL)) {
                enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
 
-               p = &mdev->data.sbuf.rs_param_89;
+               p = &mdev->data.sbuf.rs_param_95;
 
                /* initialize verify_alg and csums_alg */
                memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 
                p->rate = cpu_to_be32(sc->rate);
+               p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
+               p->c_delay_target = cpu_to_be32(sc->c_delay_target);
+               p->c_fill_target = cpu_to_be32(sc->c_fill_target);
+               p->c_max_rate = cpu_to_be32(sc->c_max_rate);
 
                if (apv >= 88)
                        strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
@@ -1710,7 +1832,7 @@ int drbd_send_protocol(struct drbd_conf *mdev)
                strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
 
        rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
-                          (struct p_header *)p, size);
+                          (struct p_header80 *)p, size);
        kfree(p);
        return rv;
 }
@@ -1736,7 +1858,7 @@ int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
        put_ldev(mdev);
 
        return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
-                            (struct p_header *)&p, sizeof(p));
+                            (struct p_header80 *)&p, sizeof(p));
 }
 
 int drbd_send_uuids(struct drbd_conf *mdev)
@@ -1757,7 +1879,7 @@ int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
        p.uuid = cpu_to_be64(val);
 
        return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
-                            (struct p_header *)&p, sizeof(p));
+                            (struct p_header80 *)&p, sizeof(p));
 }
 
 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
@@ -1787,7 +1909,7 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
        p.dds_flags = cpu_to_be16(flags);
 
        ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
-                          (struct p_header *)&p, sizeof(p));
+                          (struct p_header80 *)&p, sizeof(p));
        return ok;
 }
 
@@ -1812,7 +1934,7 @@ int drbd_send_state(struct drbd_conf *mdev)
 
        if (likely(sock != NULL)) {
                ok = _drbd_send_cmd(mdev, sock, P_STATE,
-                                   (struct p_header *)&p, sizeof(p), 0);
+                                   (struct p_header80 *)&p, sizeof(p), 0);
        }
 
        mutex_unlock(&mdev->data.mutex);
@@ -1830,7 +1952,7 @@ int drbd_send_state_req(struct drbd_conf *mdev,
        p.val     = cpu_to_be32(val.i);
 
        return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
-                            (struct p_header *)&p, sizeof(p));
+                            (struct p_header80 *)&p, sizeof(p));
 }
 
 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
@@ -1840,7 +1962,7 @@ int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
        p.retcode    = cpu_to_be32(retcode);
 
        return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
-                            (struct p_header *)&p, sizeof(p));
+                            (struct p_header80 *)&p, sizeof(p));
 }
 
 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
@@ -1939,7 +2061,7 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
 
 enum { OK, FAILED, DONE }
 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
-       struct p_header *h, struct bm_xfer_ctx *c)
+       struct p_header80 *h, struct bm_xfer_ctx *c)
 {
        struct p_compressed_bm *p = (void*)h;
        unsigned long num_words;
@@ -1969,12 +2091,12 @@ send_bitmap_rle_or_plain(struct drbd_conf *mdev,
                if (len)
                        drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
                ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
-                                  h, sizeof(struct p_header) + len, 0);
+                                  h, sizeof(struct p_header80) + len, 0);
                c->word_offset += num_words;
                c->bit_offset = c->word_offset * BITS_PER_LONG;
 
                c->packets[1]++;
-               c->bytes[1] += sizeof(struct p_header) + len;
+               c->bytes[1] += sizeof(struct p_header80) + len;
 
                if (c->bit_offset > c->bm_bits)
                        c->bit_offset = c->bm_bits;
@@ -1990,14 +2112,14 @@ send_bitmap_rle_or_plain(struct drbd_conf *mdev,
 int _drbd_send_bitmap(struct drbd_conf *mdev)
 {
        struct bm_xfer_ctx c;
-       struct p_header *p;
+       struct p_header80 *p;
        int ret;
 
        ERR_IF(!mdev->bitmap) return FALSE;
 
        /* maybe we should use some per thread scratch page,
         * and allocate that during initial device creation? */
-       p = (struct p_header *) __get_free_page(GFP_NOIO);
+       p = (struct p_header80 *) __get_free_page(GFP_NOIO);
        if (!p) {
                dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
                return FALSE;
@@ -2055,7 +2177,7 @@ int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
        if (mdev->state.conn < C_CONNECTED)
                return FALSE;
        ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
-                       (struct p_header *)&p, sizeof(p));
+                       (struct p_header80 *)&p, sizeof(p));
        return ok;
 }
 
@@ -2083,17 +2205,18 @@ static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
        if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
                return FALSE;
        ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
-                               (struct p_header *)&p, sizeof(p));
+                               (struct p_header80 *)&p, sizeof(p));
        return ok;
 }
 
+/* dp->sector and dp->block_id already/still in network byte order,
+ * data_size is payload size according to dp->head,
+ * and may need to be corrected for digest size. */
 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
-                    struct p_data *dp)
+                    struct p_data *dp, int data_size)
 {
-       const int header_size = sizeof(struct p_data)
-                             - sizeof(struct p_header);
-       int data_size  = ((struct p_header *)dp)->length - header_size;
-
+       data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
+               crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
        return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
                              dp->block_id);
 }
@@ -2141,7 +2264,7 @@ int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
        p.blksize  = cpu_to_be32(size);
 
        ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
-                               (struct p_header *)&p, sizeof(p));
+                               (struct p_header80 *)&p, sizeof(p));
        return ok;
 }
 
@@ -2159,7 +2282,7 @@ int drbd_send_drequest_csum(struct drbd_conf *mdev,
 
        p.head.magic   = BE_DRBD_MAGIC;
        p.head.command = cpu_to_be16(cmd);
-       p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
+       p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
 
        mutex_lock(&mdev->data.mutex);
 
@@ -2181,7 +2304,7 @@ int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
        p.blksize  = cpu_to_be32(size);
 
        ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
-                          (struct p_header *)&p, sizeof(p));
+                          (struct p_header80 *)&p, sizeof(p));
        return ok;
 }
 
@@ -2333,6 +2456,18 @@ static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
        return 1;
 }
 
+static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
+{
+       if (mdev->agreed_pro_version >= 95)
+               return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
+                       (bi_rw & REQ_UNPLUG ? DP_UNPLUG : 0) |
+                       (bi_rw & REQ_FUA ? DP_FUA : 0) |
+                       (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
+                       (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
+       else
+               return bi_rw & (REQ_SYNC | REQ_UNPLUG) ? DP_RW_SYNC : 0;
+}
+
 /* Used to send write requests
  * R_PRIMARY -> Peer   (P_DATA)
  */
@@ -2350,30 +2485,25 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
        dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
                crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
 
-       p.head.magic   = BE_DRBD_MAGIC;
-       p.head.command = cpu_to_be16(P_DATA);
-       p.head.length  =
-               cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
+       if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
+               p.head.h80.magic   = BE_DRBD_MAGIC;
+               p.head.h80.command = cpu_to_be16(P_DATA);
+               p.head.h80.length  =
+                       cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
+       } else {
+               p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
+               p.head.h95.command = cpu_to_be16(P_DATA);
+               p.head.h95.length  =
+                       cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
+       }
 
        p.sector   = cpu_to_be64(req->sector);
        p.block_id = (unsigned long)req;
        p.seq_num  = cpu_to_be32(req->seq_num =
                                 atomic_add_return(1, &mdev->packet_seq));
-       dp_flags = 0;
 
-       /* NOTE: no need to check if barriers supported here as we would
-        *       not pass the test in make_request_common in that case
-        */
-       if (req->master_bio->bi_rw & REQ_HARDBARRIER) {
-               dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
-               /* dp_flags |= DP_HARDBARRIER; */
-       }
-       if (req->master_bio->bi_rw & REQ_SYNC)
-               dp_flags |= DP_RW_SYNC;
-       /* for now handle SYNCIO and UNPLUG
-        * as if they still were one and the same flag */
-       if (req->master_bio->bi_rw & REQ_UNPLUG)
-               dp_flags |= DP_RW_SYNC;
+       dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
+
        if (mdev->state.conn >= C_SYNC_SOURCE &&
            mdev->state.conn <= C_PAUSED_SYNC_T)
                dp_flags |= DP_MAY_SET_IN_SYNC;
@@ -2414,10 +2544,17 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
        dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
                crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
 
-       p.head.magic   = BE_DRBD_MAGIC;
-       p.head.command = cpu_to_be16(cmd);
-       p.head.length  =
-               cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
+       if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
+               p.head.h80.magic   = BE_DRBD_MAGIC;
+               p.head.h80.command = cpu_to_be16(cmd);
+               p.head.h80.length  =
+                       cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
+       } else {
+               p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
+               p.head.h95.command = cpu_to_be16(cmd);
+               p.head.h95.length  =
+                       cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
+       }
 
        p.sector   = cpu_to_be64(e->sector);
        p.block_id = e->block_id;
@@ -2430,8 +2567,7 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
        if (!drbd_get_data_sock(mdev))
                return 0;
 
-       ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
-                                       sizeof(p), dgs ? MSG_MORE : 0);
+       ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
        if (ok && dgs) {
                dgb = mdev->int_dig_out;
                drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
@@ -2606,7 +2742,13 @@ static void drbd_set_defaults(struct drbd_conf *mdev)
                /* .verify_alg = */     {}, 0,
                /* .cpu_mask = */       {}, 0,
                /* .csums_alg = */      {}, 0,
-               /* .use_rle = */        0
+               /* .use_rle = */        0,
+               /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
+               /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
+               /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
+               /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
+               /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
+               /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
        };
 
        /* Have to use that way, because the layout differs between
@@ -2617,7 +2759,9 @@ static void drbd_set_defaults(struct drbd_conf *mdev)
                  .conn = C_STANDALONE,
                  .disk = D_DISKLESS,
                  .pdsk = D_UNKNOWN,
-                 .susp = 0
+                 .susp = 0,
+                 .susp_nod = 0,
+                 .susp_fen = 0
                } };
 }
 
@@ -2641,6 +2785,9 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        atomic_set(&mdev->net_cnt, 0);
        atomic_set(&mdev->packet_seq, 0);
        atomic_set(&mdev->pp_in_use, 0);
+       atomic_set(&mdev->pp_in_use_by_net, 0);
+       atomic_set(&mdev->rs_sect_in, 0);
+       atomic_set(&mdev->rs_sect_ev, 0);
 
        mutex_init(&mdev->md_io_mutex);
        mutex_init(&mdev->data.mutex);
@@ -2667,11 +2814,13 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        INIT_LIST_HEAD(&mdev->meta.work.q);
        INIT_LIST_HEAD(&mdev->resync_work.list);
        INIT_LIST_HEAD(&mdev->unplug_work.list);
+       INIT_LIST_HEAD(&mdev->go_diskless.list);
        INIT_LIST_HEAD(&mdev->md_sync_work.list);
        INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
 
        mdev->resync_work.cb  = w_resync_inactive;
        mdev->unplug_work.cb  = w_send_write_hint;
+       mdev->go_diskless.cb  = w_go_diskless;
        mdev->md_sync_work.cb = w_md_sync;
        mdev->bm_io_work.w.cb = w_bitmap_io;
        init_timer(&mdev->resync_timer);
@@ -2683,6 +2832,7 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
 
        init_waitqueue_head(&mdev->misc_wait);
        init_waitqueue_head(&mdev->state_wait);
+       init_waitqueue_head(&mdev->net_cnt_wait);
        init_waitqueue_head(&mdev->ee_wait);
        init_waitqueue_head(&mdev->al_wait);
        init_waitqueue_head(&mdev->seq_wait);
@@ -2698,6 +2848,7 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
 
 void drbd_mdev_cleanup(struct drbd_conf *mdev)
 {
+       int i;
        if (mdev->receiver.t_state != None)
                dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
                                mdev->receiver.t_state);
@@ -2714,9 +2865,13 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
        mdev->p_size       =
        mdev->rs_start     =
        mdev->rs_total     =
-       mdev->rs_failed    =
-       mdev->rs_mark_left =
-       mdev->rs_mark_time = 0;
+       mdev->rs_failed    = 0;
+       mdev->rs_last_events = 0;
+       mdev->rs_last_sect_ev = 0;
+       for (i = 0; i < DRBD_SYNC_MARKS; i++) {
+               mdev->rs_mark_left[i] = 0;
+               mdev->rs_mark_time[i] = 0;
+       }
        D_ASSERT(mdev->net_conf == NULL);
 
        drbd_set_my_capacity(mdev, 0);
@@ -2727,6 +2882,7 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
        }
 
        drbd_free_resources(mdev);
+       clear_bit(AL_SUSPENDED, &mdev->flags);
 
        /*
         * currently we drbd_init_ee only on module load, so
@@ -2742,6 +2898,7 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
        D_ASSERT(list_empty(&mdev->meta.work.q));
        D_ASSERT(list_empty(&mdev->resync_work.list));
        D_ASSERT(list_empty(&mdev->unplug_work.list));
+       D_ASSERT(list_empty(&mdev->go_diskless.list));
 
 }
 
@@ -3281,9 +3438,10 @@ void drbd_md_sync(struct drbd_conf *mdev)
        sector_t sector;
        int i;
 
+       del_timer(&mdev->md_sync_timer);
+       /* timer may be rearmed by drbd_md_mark_dirty() now. */
        if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
                return;
-       del_timer(&mdev->md_sync_timer);
 
        /* We use here D_FAILED and not D_ATTACHING because we try to write
         * metadata even if we detach due to a disk failure! */
@@ -3311,12 +3469,9 @@ void drbd_md_sync(struct drbd_conf *mdev)
        D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
        sector = mdev->ldev->md.md_offset;
 
-       if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
-               clear_bit(MD_DIRTY, &mdev->flags);
-       } else {
+       if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
                /* this was a try anyways ... */
                dev_err(DEV, "meta data update failed!\n");
-
                drbd_chk_io_error(mdev, 1, TRUE);
        }
 
@@ -3403,6 +3558,28 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
        return rv;
 }
 
+static void debug_drbd_uuid(struct drbd_conf *mdev, enum drbd_uuid_index index)
+{
+       static char *uuid_str[UI_EXTENDED_SIZE] = {
+               [UI_CURRENT] = "CURRENT",
+               [UI_BITMAP] = "BITMAP",
+               [UI_HISTORY_START] = "HISTORY_START",
+               [UI_HISTORY_END] = "HISTORY_END",
+               [UI_SIZE] = "SIZE",
+               [UI_FLAGS] = "FLAGS",
+       };
+
+       if (index >= UI_EXTENDED_SIZE) {
+               dev_warn(DEV, " uuid_index >= EXTENDED_SIZE\n");
+               return;
+       }
+
+       dynamic_dev_dbg(DEV, " uuid[%s] now %016llX\n",
+                uuid_str[index],
+                (unsigned long long)mdev->ldev->md.uuid[index]);
+}
+
+
 /**
  * drbd_md_mark_dirty() - Mark meta data super block as dirty
  * @mdev:      DRBD device.
@@ -3411,19 +3588,31 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
  * the meta-data super block. This function sets MD_DIRTY, and starts a
  * timer that ensures that within five seconds you have to call drbd_md_sync().
  */
+#ifdef DEBUG
+void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
+{
+       if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
+               mod_timer(&mdev->md_sync_timer, jiffies + HZ);
+               mdev->last_md_mark_dirty.line = line;
+               mdev->last_md_mark_dirty.func = func;
+       }
+}
+#else
 void drbd_md_mark_dirty(struct drbd_conf *mdev)
 {
-       set_bit(MD_DIRTY, &mdev->flags);
-       mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
+       if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
+               mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
 }
-
+#endif
 
 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
 {
        int i;
 
-       for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
+       for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++) {
                mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
+               debug_drbd_uuid(mdev, i+1);
+       }
 }
 
 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
@@ -3438,6 +3627,7 @@ void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
        }
 
        mdev->ldev->md.uuid[idx] = val;
+       debug_drbd_uuid(mdev, idx);
        drbd_md_mark_dirty(mdev);
 }
 
@@ -3447,6 +3637,7 @@ void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
        if (mdev->ldev->md.uuid[idx]) {
                drbd_uuid_move_history(mdev);
                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
+               debug_drbd_uuid(mdev, UI_HISTORY_START);
        }
        _drbd_uuid_set(mdev, idx, val);
 }
@@ -3465,6 +3656,7 @@ void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
        dev_info(DEV, "Creating new current UUID\n");
        D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
        mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
+       debug_drbd_uuid(mdev, UI_BITMAP);
 
        get_random_bytes(&val, sizeof(u64));
        _drbd_uuid_set(mdev, UI_CURRENT, val);
@@ -3479,6 +3671,8 @@ void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
                drbd_uuid_move_history(mdev);
                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
                mdev->ldev->md.uuid[UI_BITMAP] = 0;
+               debug_drbd_uuid(mdev, UI_HISTORY_START);
+               debug_drbd_uuid(mdev, UI_BITMAP);
        } else {
                if (mdev->ldev->md.uuid[UI_BITMAP])
                        dev_warn(DEV, "bm UUID already set");
@@ -3486,6 +3680,7 @@ void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
                mdev->ldev->md.uuid[UI_BITMAP] = val;
                mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
 
+               debug_drbd_uuid(mdev, UI_BITMAP);
        }
        drbd_md_mark_dirty(mdev);
 }
@@ -3528,6 +3723,7 @@ int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
 {
        int rv = -EIO;
 
+       drbd_resume_al(mdev);
        if (get_ldev_if_state(mdev, D_ATTACHING)) {
                drbd_bm_clear_all(mdev);
                rv = drbd_bm_write(mdev);
@@ -3560,6 +3756,32 @@ static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
        return 1;
 }
 
+static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
+{
+       D_ASSERT(mdev->state.disk == D_FAILED);
+       /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
+        * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
+        * the protected members anymore, though, so in the after_state_ch work
+        * it will be safe to free them. */
+       drbd_force_state(mdev, NS(disk, D_DISKLESS));
+       /* We need to wait for return of references checked out while we still
+        * have been D_FAILED, though (drbd_md_sync, bitmap io). */
+       wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+
+       clear_bit(GO_DISKLESS, &mdev->flags);
+       return 1;
+}
+
+void drbd_go_diskless(struct drbd_conf *mdev)
+{
+       D_ASSERT(mdev->state.disk == D_FAILED);
+       if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
+               drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
+               /* don't drbd_queue_work_front,
+                * we need to serialize with the after_state_ch work
+                * of the -> D_FAILED transition. */
+}
+
 /**
  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
  * @mdev:      DRBD device.
@@ -3656,8 +3878,11 @@ static void md_sync_timer_fn(unsigned long data)
 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
 {
        dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
+#ifdef DEBUG
+       dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
+               mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
+#endif
        drbd_md_sync(mdev);
-
        return 1;
 }