4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
63 /* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67 rwlock_t global_state_lock;
69 /* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
72 void drbd_md_io_complete(struct bio *bio, int error)
74 struct drbd_md_io *md_io;
76 md_io = (struct drbd_md_io *)bio->bi_private;
79 complete(&md_io->event);
82 /* reads on behalf of the partner,
83 * "submitted" by the receiver
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 unsigned long flags = 0;
88 struct drbd_conf *mdev = e->mdev;
90 D_ASSERT(e->block_id != ID_VACANT);
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
101 drbd_queue_work(&mdev->data.work, &e->w);
105 static int is_failed_barrier(int ee_flags)
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_conf *mdev = e->mdev;
120 int do_al_complete_io;
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
137 D_ASSERT(e->block_id != ID_VACANT);
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145 is_syncer_req = is_syncer_block_id(e->block_id);
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
161 if (test_bit(__EE_WAS_ERROR, &e->flags))
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
166 drbd_rs_complete_io(mdev, e_sector);
169 wake_up(&mdev->ee_wait);
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
178 /* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
181 void drbd_endio_sec(struct bio *bio, int error)
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
203 set_bit(__EE_WAS_ERROR, &e->flags);
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
208 drbd_endio_write_sec_final(e);
210 drbd_endio_read_sec_final(e);
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216 void drbd_endio_pri(struct bio *bio, int error)
219 struct drbd_request *req = bio->bi_private;
220 struct drbd_conf *mdev = req->mdev;
221 struct bio_and_error m;
222 enum drbd_req_event what;
223 int uptodate = bio_flagged(bio, BIO_UPTODATE);
225 if (!error && !uptodate) {
226 dev_warn(DEV, "p %s: setting error to -EIO\n",
227 bio_data_dir(bio) == WRITE ? "write" : "read");
228 /* strange behavior of some lower level drivers...
229 * fail the request by clearing the uptodate flag,
230 * but do not return any error?! */
234 /* to avoid recursion in __req_mod */
235 if (unlikely(error)) {
236 what = (bio_data_dir(bio) == WRITE)
237 ? write_completed_with_error
238 : (bio_rw(bio) == READ)
239 ? read_completed_with_error
240 : read_ahead_completed_with_error;
244 bio_put(req->private_bio);
245 req->private_bio = ERR_PTR(error);
247 spin_lock_irqsave(&mdev->req_lock, flags);
248 __req_mod(req, what, &m);
249 spin_unlock_irqrestore(&mdev->req_lock, flags);
252 complete_master_bio(mdev, &m);
255 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
257 struct drbd_request *req = container_of(w, struct drbd_request, w);
259 /* We should not detach for read io-error,
260 * but try to WRITE the P_DATA_REPLY to the failed location,
261 * to give the disk the chance to relocate that block */
263 spin_lock_irq(&mdev->req_lock);
264 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
265 _req_mod(req, read_retry_remote_canceled);
266 spin_unlock_irq(&mdev->req_lock);
269 spin_unlock_irq(&mdev->req_lock);
271 return w_send_read_req(mdev, w, 0);
274 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
276 ERR_IF(cancel) return 1;
277 dev_err(DEV, "resync inactive, but callback triggered??\n");
278 return 1; /* Simply ignore this! */
281 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
283 struct hash_desc desc;
284 struct scatterlist sg;
285 struct page *page = e->pages;
292 sg_init_table(&sg, 1);
293 crypto_hash_init(&desc);
295 while ((tmp = page_chain_next(page))) {
296 /* all but the last page will be fully used */
297 sg_set_page(&sg, page, PAGE_SIZE, 0);
298 crypto_hash_update(&desc, &sg, sg.length);
301 /* and now the last, possibly only partially used page */
302 len = e->size & (PAGE_SIZE - 1);
303 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
304 crypto_hash_update(&desc, &sg, sg.length);
305 crypto_hash_final(&desc, digest);
308 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
310 struct hash_desc desc;
311 struct scatterlist sg;
312 struct bio_vec *bvec;
318 sg_init_table(&sg, 1);
319 crypto_hash_init(&desc);
321 __bio_for_each_segment(bvec, bio, i, 0) {
322 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
323 crypto_hash_update(&desc, &sg, sg.length);
325 crypto_hash_final(&desc, digest);
328 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
330 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
335 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
337 if (unlikely(cancel)) {
338 drbd_free_ee(mdev, e);
342 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
343 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
344 digest = kmalloc(digest_size, GFP_NOIO);
346 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
348 inc_rs_pending(mdev);
349 ok = drbd_send_drequest_csum(mdev,
357 dev_err(DEV, "kmalloc() of digest failed.\n");
363 drbd_free_ee(mdev, e);
366 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
370 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
372 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
374 struct drbd_epoch_entry *e;
379 /* GFP_TRY, because if there is no memory available right now, this may
380 * be rescheduled for later. It is "only" background resync, after all. */
381 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
385 spin_lock_irq(&mdev->req_lock);
386 list_add(&e->w.list, &mdev->read_ee);
387 spin_unlock_irq(&mdev->req_lock);
389 e->w.cb = w_e_send_csum;
390 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
393 drbd_free_ee(mdev, e);
399 void resync_timer_fn(unsigned long data)
402 struct drbd_conf *mdev = (struct drbd_conf *) data;
405 spin_lock_irqsave(&mdev->req_lock, flags);
407 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
409 if (mdev->state.conn == C_VERIFY_S)
410 mdev->resync_work.cb = w_make_ov_request;
412 mdev->resync_work.cb = w_make_resync_request;
415 mdev->resync_work.cb = w_resync_inactive;
418 spin_unlock_irqrestore(&mdev->req_lock, flags);
420 /* harmless race: list_empty outside data.work.q_lock */
421 if (list_empty(&mdev->resync_work.list) && queue)
422 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
425 int w_make_resync_request(struct drbd_conf *mdev,
426 struct drbd_work *w, int cancel)
430 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
431 int max_segment_size;
432 int number, i, size, pe, mx;
433 int align, queued, sndbuf;
435 if (unlikely(cancel))
438 if (unlikely(mdev->state.conn < C_CONNECTED)) {
439 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
443 if (mdev->state.conn != C_SYNC_TARGET)
444 dev_err(DEV, "%s in w_make_resync_request\n",
445 drbd_conn_str(mdev->state.conn));
447 if (!get_ldev(mdev)) {
448 /* Since we only need to access mdev->rsync a
449 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
450 to continue resync with a broken disk makes no sense at
452 dev_err(DEV, "Disk broke down during resync!\n");
453 mdev->resync_work.cb = w_resync_inactive;
457 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
458 * if it should be necessary */
459 max_segment_size = mdev->agreed_pro_version < 94 ?
460 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
462 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE / 1024) * HZ);
463 pe = atomic_read(&mdev->rs_pending_cnt);
465 mutex_lock(&mdev->data.mutex);
466 if (mdev->data.socket)
467 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
470 mutex_unlock(&mdev->data.mutex);
472 /* For resync rates >160MB/sec, allow more pending RS requests */
476 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
477 if ((pe + number) > mx) {
481 for (i = 0; i < number; i++) {
482 /* Stop generating RS requests, when half of the send buffer is filled */
483 mutex_lock(&mdev->data.mutex);
484 if (mdev->data.socket) {
485 queued = mdev->data.socket->sk->sk_wmem_queued;
486 sndbuf = mdev->data.socket->sk->sk_sndbuf;
491 mutex_unlock(&mdev->data.mutex);
492 if (queued > sndbuf / 2)
496 size = BM_BLOCK_SIZE;
497 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
500 mdev->bm_resync_fo = drbd_bm_bits(mdev);
501 mdev->resync_work.cb = w_resync_inactive;
506 sector = BM_BIT_TO_SECT(bit);
508 if (drbd_try_rs_begin_io(mdev, sector)) {
509 mdev->bm_resync_fo = bit;
512 mdev->bm_resync_fo = bit + 1;
514 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
515 drbd_rs_complete_io(mdev, sector);
519 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
520 /* try to find some adjacent bits.
521 * we stop if we have already the maximum req size.
523 * Additionally always align bigger requests, in order to
524 * be prepared for all stripe sizes of software RAIDs.
528 if (size + BM_BLOCK_SIZE > max_segment_size)
531 /* Be always aligned */
532 if (sector & ((1<<(align+3))-1))
535 /* do not cross extent boundaries */
536 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
538 /* now, is it actually dirty, after all?
539 * caution, drbd_bm_test_bit is tri-state for some
540 * obscure reason; ( b == 0 ) would get the out-of-band
541 * only accidentally right because of the "oddly sized"
542 * adjustment below */
543 if (drbd_bm_test_bit(mdev, bit+1) != 1)
546 size += BM_BLOCK_SIZE;
547 if ((BM_BLOCK_SIZE << align) <= size)
551 /* if we merged some,
552 * reset the offset to start the next drbd_bm_find_next from */
553 if (size > BM_BLOCK_SIZE)
554 mdev->bm_resync_fo = bit + 1;
557 /* adjust very last sectors, in case we are oddly sized */
558 if (sector + (size>>9) > capacity)
559 size = (capacity-sector)<<9;
560 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
561 switch (read_for_csum(mdev, sector, size)) {
562 case 0: /* Disk failure*/
565 case 2: /* Allocation failed */
566 drbd_rs_complete_io(mdev, sector);
567 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
569 /* case 1: everything ok */
572 inc_rs_pending(mdev);
573 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
574 sector, size, ID_SYNCER)) {
575 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
576 dec_rs_pending(mdev);
583 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
584 /* last syncer _request_ was sent,
585 * but the P_RS_DATA_REPLY not yet received. sync will end (and
586 * next sync group will resume), as soon as we receive the last
587 * resync data block, and the last bit is cleared.
588 * until then resync "work" is "inactive" ...
590 mdev->resync_work.cb = w_resync_inactive;
596 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
601 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
605 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
607 if (unlikely(cancel))
610 if (unlikely(mdev->state.conn < C_CONNECTED)) {
611 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
615 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
616 if (atomic_read(&mdev->rs_pending_cnt) > number)
619 number -= atomic_read(&mdev->rs_pending_cnt);
621 sector = mdev->ov_position;
622 for (i = 0; i < number; i++) {
623 if (sector >= capacity) {
624 mdev->resync_work.cb = w_resync_inactive;
628 size = BM_BLOCK_SIZE;
630 if (drbd_try_rs_begin_io(mdev, sector)) {
631 mdev->ov_position = sector;
635 if (sector + (size>>9) > capacity)
636 size = (capacity-sector)<<9;
638 inc_rs_pending(mdev);
639 if (!drbd_send_ov_request(mdev, sector, size)) {
640 dec_rs_pending(mdev);
643 sector += BM_SECT_PER_BIT;
645 mdev->ov_position = sector;
648 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
653 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
657 drbd_resync_finished(mdev);
662 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
666 drbd_resync_finished(mdev);
671 int drbd_resync_finished(struct drbd_conf *mdev)
673 unsigned long db, dt, dbdt;
675 union drbd_state os, ns;
677 char *khelper_cmd = NULL;
679 /* Remove all elements from the resync LRU. Since future actions
680 * might set bits in the (main) bitmap, then the entries in the
681 * resync LRU would be wrong. */
682 if (drbd_rs_del_all(mdev)) {
683 /* In case this is not possible now, most probably because
684 * there are P_RS_DATA_REPLY Packets lingering on the worker's
685 * queue (or even the read operations for those packets
686 * is not finished by now). Retry in 100ms. */
689 __set_current_state(TASK_INTERRUPTIBLE);
690 schedule_timeout(HZ / 10);
691 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
693 w->cb = w_resync_finished;
694 drbd_queue_work(&mdev->data.work, w);
697 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
700 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
704 dbdt = Bit2KB(db/dt);
705 mdev->rs_paused /= HZ;
710 spin_lock_irq(&mdev->req_lock);
713 /* This protects us against multiple calls (that can happen in the presence
714 of application IO), and against connectivity loss just before we arrive here. */
715 if (os.conn <= C_CONNECTED)
719 ns.conn = C_CONNECTED;
721 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
722 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
723 "Online verify " : "Resync",
724 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
726 n_oos = drbd_bm_total_weight(mdev);
728 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
730 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
732 khelper_cmd = "out-of-sync";
735 D_ASSERT((n_oos - mdev->rs_failed) == 0);
737 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
738 khelper_cmd = "after-resync-target";
740 if (mdev->csums_tfm && mdev->rs_total) {
741 const unsigned long s = mdev->rs_same_csum;
742 const unsigned long t = mdev->rs_total;
745 (t < 100000) ? ((s*100)/t) : (s/(t/100));
746 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
747 "transferred %luK total %luK\n",
749 Bit2KB(mdev->rs_same_csum),
750 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
751 Bit2KB(mdev->rs_total));
755 if (mdev->rs_failed) {
756 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
758 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
759 ns.disk = D_INCONSISTENT;
760 ns.pdsk = D_UP_TO_DATE;
762 ns.disk = D_UP_TO_DATE;
763 ns.pdsk = D_INCONSISTENT;
766 ns.disk = D_UP_TO_DATE;
767 ns.pdsk = D_UP_TO_DATE;
769 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
772 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
773 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
774 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
775 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
777 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
781 drbd_uuid_set_bm(mdev, 0UL);
784 /* Now the two UUID sets are equal, update what we
785 * know of the peer. */
787 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
788 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
792 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
794 spin_unlock_irq(&mdev->req_lock);
800 mdev->ov_start_sector = 0;
802 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
803 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
804 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
808 drbd_khelper(mdev, khelper_cmd);
814 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
816 if (drbd_ee_has_active_page(e)) {
817 /* This might happen if sendpage() has not finished */
818 spin_lock_irq(&mdev->req_lock);
819 list_add_tail(&e->w.list, &mdev->net_ee);
820 spin_unlock_irq(&mdev->req_lock);
822 drbd_free_ee(mdev, e);
826 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
827 * @mdev: DRBD device.
829 * @cancel: The connection will be closed anyways
831 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
833 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
836 if (unlikely(cancel)) {
837 drbd_free_ee(mdev, e);
842 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
843 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
845 if (__ratelimit(&drbd_ratelimit_state))
846 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
847 (unsigned long long)e->sector);
849 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
854 move_to_net_ee_or_free(mdev, e);
857 dev_err(DEV, "drbd_send_block() failed\n");
862 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
863 * @mdev: DRBD device.
865 * @cancel: The connection will be closed anyways
867 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
869 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
872 if (unlikely(cancel)) {
873 drbd_free_ee(mdev, e);
878 if (get_ldev_if_state(mdev, D_FAILED)) {
879 drbd_rs_complete_io(mdev, e->sector);
883 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
884 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
885 inc_rs_pending(mdev);
886 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
888 if (__ratelimit(&drbd_ratelimit_state))
889 dev_err(DEV, "Not sending RSDataReply, "
890 "partner DISKLESS!\n");
894 if (__ratelimit(&drbd_ratelimit_state))
895 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
896 (unsigned long long)e->sector);
898 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
900 /* update resync data with failure */
901 drbd_rs_failed_io(mdev, e->sector, e->size);
906 move_to_net_ee_or_free(mdev, e);
909 dev_err(DEV, "drbd_send_block() failed\n");
913 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
915 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
916 struct digest_info *di;
921 if (unlikely(cancel)) {
922 drbd_free_ee(mdev, e);
927 drbd_rs_complete_io(mdev, e->sector);
929 di = (struct digest_info *)(unsigned long)e->block_id;
931 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
932 /* quick hack to try to avoid a race against reconfiguration.
933 * a real fix would be much more involved,
934 * introducing more locking mechanisms */
935 if (mdev->csums_tfm) {
936 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
937 D_ASSERT(digest_size == di->digest_size);
938 digest = kmalloc(digest_size, GFP_NOIO);
941 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
942 eq = !memcmp(digest, di->digest, digest_size);
947 drbd_set_in_sync(mdev, e->sector, e->size);
948 /* rs_same_csums unit is BM_BLOCK_SIZE */
949 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
950 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
952 inc_rs_pending(mdev);
953 e->block_id = ID_SYNCER;
954 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
957 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
958 if (__ratelimit(&drbd_ratelimit_state))
959 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
966 move_to_net_ee_or_free(mdev, e);
969 dev_err(DEV, "drbd_send_block/ack() failed\n");
973 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
975 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
980 if (unlikely(cancel))
983 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
986 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
987 /* FIXME if this allocation fails, online verify will not terminate! */
988 digest = kmalloc(digest_size, GFP_NOIO);
990 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
991 inc_rs_pending(mdev);
992 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
993 digest, digest_size, P_OV_REPLY);
995 dec_rs_pending(mdev);
1000 drbd_free_ee(mdev, e);
1007 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1009 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1010 mdev->ov_last_oos_size += size>>9;
1012 mdev->ov_last_oos_start = sector;
1013 mdev->ov_last_oos_size = size>>9;
1015 drbd_set_out_of_sync(mdev, sector, size);
1016 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1019 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1021 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1022 struct digest_info *di;
1027 if (unlikely(cancel)) {
1028 drbd_free_ee(mdev, e);
1033 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1034 * the resync lru has been cleaned up already */
1035 drbd_rs_complete_io(mdev, e->sector);
1037 di = (struct digest_info *)(unsigned long)e->block_id;
1039 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1040 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1041 digest = kmalloc(digest_size, GFP_NOIO);
1043 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1045 D_ASSERT(digest_size == di->digest_size);
1046 eq = !memcmp(digest, di->digest, digest_size);
1050 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1051 if (__ratelimit(&drbd_ratelimit_state))
1052 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1060 drbd_ov_oos_found(mdev, e->sector, e->size);
1064 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1065 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1067 drbd_free_ee(mdev, e);
1069 if (--mdev->ov_left == 0) {
1071 drbd_resync_finished(mdev);
1077 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1079 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1084 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1087 struct p_barrier *p = &mdev->data.sbuf.barrier;
1090 /* really avoid racing with tl_clear. w.cb may have been referenced
1091 * just before it was reassigned and re-queued, so double check that.
1092 * actually, this race was harmless, since we only try to send the
1093 * barrier packet here, and otherwise do nothing with the object.
1094 * but compare with the head of w_clear_epoch */
1095 spin_lock_irq(&mdev->req_lock);
1096 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1098 spin_unlock_irq(&mdev->req_lock);
1102 if (!drbd_get_data_sock(mdev))
1104 p->barrier = b->br_number;
1105 /* inc_ap_pending was done where this was queued.
1106 * dec_ap_pending will be done in got_BarrierAck
1107 * or (on connection loss) in w_clear_epoch. */
1108 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1109 (struct p_header *)p, sizeof(*p), 0);
1110 drbd_put_data_sock(mdev);
1115 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1119 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1123 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1124 * @mdev: DRBD device.
1126 * @cancel: The connection will be closed anyways
1128 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1130 struct drbd_request *req = container_of(w, struct drbd_request, w);
1133 if (unlikely(cancel)) {
1134 req_mod(req, send_canceled);
1138 ok = drbd_send_dblock(mdev, req);
1139 req_mod(req, ok ? handed_over_to_network : send_failed);
1145 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1146 * @mdev: DRBD device.
1148 * @cancel: The connection will be closed anyways
1150 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1152 struct drbd_request *req = container_of(w, struct drbd_request, w);
1155 if (unlikely(cancel)) {
1156 req_mod(req, send_canceled);
1160 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1161 (unsigned long)req);
1164 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1165 * so this is probably redundant */
1166 if (mdev->state.conn >= C_CONNECTED)
1167 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1169 req_mod(req, ok ? handed_over_to_network : send_failed);
1174 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1176 struct drbd_request *req = container_of(w, struct drbd_request, w);
1178 if (bio_data_dir(req->master_bio) == WRITE)
1179 drbd_al_begin_io(mdev, req->sector);
1180 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1181 theoretically. Practically it can not deadlock, since this is
1182 only used when unfreezing IOs. All the extents of the requests
1183 that made it into the TL are already active */
1185 drbd_req_make_private_bio(req, req->master_bio);
1186 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1187 generic_make_request(req->private_bio);
1192 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1194 struct drbd_conf *odev = mdev;
1197 if (odev->sync_conf.after == -1)
1199 odev = minor_to_mdev(odev->sync_conf.after);
1200 ERR_IF(!odev) return 1;
1201 if ((odev->state.conn >= C_SYNC_SOURCE &&
1202 odev->state.conn <= C_PAUSED_SYNC_T) ||
1203 odev->state.aftr_isp || odev->state.peer_isp ||
1204 odev->state.user_isp)
1210 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1211 * @mdev: DRBD device.
1213 * Called from process context only (admin command and after_state_ch).
1215 static int _drbd_pause_after(struct drbd_conf *mdev)
1217 struct drbd_conf *odev;
1220 for (i = 0; i < minor_count; i++) {
1221 odev = minor_to_mdev(i);
1224 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1226 if (!_drbd_may_sync_now(odev))
1227 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1228 != SS_NOTHING_TO_DO);
1235 * _drbd_resume_next() - Resume resync on all devices that may resync now
1236 * @mdev: DRBD device.
1238 * Called from process context only (admin command and worker).
1240 static int _drbd_resume_next(struct drbd_conf *mdev)
1242 struct drbd_conf *odev;
1245 for (i = 0; i < minor_count; i++) {
1246 odev = minor_to_mdev(i);
1249 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1251 if (odev->state.aftr_isp) {
1252 if (_drbd_may_sync_now(odev))
1253 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1255 != SS_NOTHING_TO_DO) ;
1261 void resume_next_sg(struct drbd_conf *mdev)
1263 write_lock_irq(&global_state_lock);
1264 _drbd_resume_next(mdev);
1265 write_unlock_irq(&global_state_lock);
1268 void suspend_other_sg(struct drbd_conf *mdev)
1270 write_lock_irq(&global_state_lock);
1271 _drbd_pause_after(mdev);
1272 write_unlock_irq(&global_state_lock);
1275 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1277 struct drbd_conf *odev;
1281 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1282 return ERR_SYNC_AFTER;
1284 /* check for loops */
1285 odev = minor_to_mdev(o_minor);
1288 return ERR_SYNC_AFTER_CYCLE;
1290 /* dependency chain ends here, no cycles. */
1291 if (odev->sync_conf.after == -1)
1294 /* follow the dependency chain */
1295 odev = minor_to_mdev(odev->sync_conf.after);
1299 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1304 write_lock_irq(&global_state_lock);
1305 retcode = sync_after_error(mdev, na);
1306 if (retcode == NO_ERROR) {
1307 mdev->sync_conf.after = na;
1309 changes = _drbd_pause_after(mdev);
1310 changes |= _drbd_resume_next(mdev);
1313 write_unlock_irq(&global_state_lock);
1317 static void ping_peer(struct drbd_conf *mdev)
1319 clear_bit(GOT_PING_ACK, &mdev->flags);
1321 wait_event(mdev->misc_wait,
1322 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1326 * drbd_start_resync() - Start the resync process
1327 * @mdev: DRBD device.
1328 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1330 * This function might bring you directly into one of the
1331 * C_PAUSED_SYNC_* states.
1333 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1335 union drbd_state ns;
1338 if (mdev->state.conn >= C_SYNC_SOURCE) {
1339 dev_err(DEV, "Resync already running!\n");
1343 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1344 drbd_rs_cancel_all(mdev);
1346 if (side == C_SYNC_TARGET) {
1347 /* Since application IO was locked out during C_WF_BITMAP_T and
1348 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1349 we check that we might make the data inconsistent. */
1350 r = drbd_khelper(mdev, "before-resync-target");
1351 r = (r >> 8) & 0xff;
1353 dev_info(DEV, "before-resync-target handler returned %d, "
1354 "dropping connection.\n", r);
1355 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1360 drbd_state_lock(mdev);
1362 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1363 drbd_state_unlock(mdev);
1367 if (side == C_SYNC_TARGET) {
1368 mdev->bm_resync_fo = 0;
1369 } else /* side == C_SYNC_SOURCE */ {
1372 get_random_bytes(&uuid, sizeof(u64));
1373 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1374 drbd_send_sync_uuid(mdev, uuid);
1376 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1379 write_lock_irq(&global_state_lock);
1382 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1386 if (side == C_SYNC_TARGET)
1387 ns.disk = D_INCONSISTENT;
1388 else /* side == C_SYNC_SOURCE */
1389 ns.pdsk = D_INCONSISTENT;
1391 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1394 if (ns.conn < C_CONNECTED)
1395 r = SS_UNKNOWN_ERROR;
1397 if (r == SS_SUCCESS) {
1399 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1400 mdev->rs_failed = 0;
1401 mdev->rs_paused = 0;
1403 mdev->rs_mark_time = jiffies;
1404 mdev->rs_same_csum = 0;
1405 _drbd_pause_after(mdev);
1407 write_unlock_irq(&global_state_lock);
1410 if (r == SS_SUCCESS) {
1411 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1412 drbd_conn_str(ns.conn),
1413 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1414 (unsigned long) mdev->rs_total);
1416 if (mdev->rs_total == 0) {
1417 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1419 drbd_resync_finished(mdev);
1422 /* ns.conn may already be != mdev->state.conn,
1423 * we may have been paused in between, or become paused until
1424 * the timer triggers.
1425 * No matter, that is handled in resync_timer_fn() */
1426 if (ns.conn == C_SYNC_TARGET)
1427 mod_timer(&mdev->resync_timer, jiffies);
1431 drbd_state_unlock(mdev);
1434 int drbd_worker(struct drbd_thread *thi)
1436 struct drbd_conf *mdev = thi->mdev;
1437 struct drbd_work *w = NULL;
1438 LIST_HEAD(work_list);
1441 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1443 while (get_t_state(thi) == Running) {
1444 drbd_thread_current_set_cpu(mdev);
1446 if (down_trylock(&mdev->data.work.s)) {
1447 mutex_lock(&mdev->data.mutex);
1448 if (mdev->data.socket && !mdev->net_conf->no_cork)
1449 drbd_tcp_uncork(mdev->data.socket);
1450 mutex_unlock(&mdev->data.mutex);
1452 intr = down_interruptible(&mdev->data.work.s);
1454 mutex_lock(&mdev->data.mutex);
1455 if (mdev->data.socket && !mdev->net_conf->no_cork)
1456 drbd_tcp_cork(mdev->data.socket);
1457 mutex_unlock(&mdev->data.mutex);
1461 D_ASSERT(intr == -EINTR);
1462 flush_signals(current);
1463 ERR_IF (get_t_state(thi) == Running)
1468 if (get_t_state(thi) != Running)
1470 /* With this break, we have done a down() but not consumed
1471 the entry from the list. The cleanup code takes care of
1475 spin_lock_irq(&mdev->data.work.q_lock);
1476 ERR_IF(list_empty(&mdev->data.work.q)) {
1477 /* something terribly wrong in our logic.
1478 * we were able to down() the semaphore,
1479 * but the list is empty... doh.
1481 * what is the best thing to do now?
1482 * try again from scratch, restarting the receiver,
1483 * asender, whatnot? could break even more ugly,
1484 * e.g. when we are primary, but no good local data.
1486 * I'll try to get away just starting over this loop.
1488 spin_unlock_irq(&mdev->data.work.q_lock);
1491 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1492 list_del_init(&w->list);
1493 spin_unlock_irq(&mdev->data.work.q_lock);
1495 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1496 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1497 if (mdev->state.conn >= C_CONNECTED)
1498 drbd_force_state(mdev,
1499 NS(conn, C_NETWORK_FAILURE));
1502 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1503 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1505 spin_lock_irq(&mdev->data.work.q_lock);
1507 while (!list_empty(&mdev->data.work.q)) {
1508 list_splice_init(&mdev->data.work.q, &work_list);
1509 spin_unlock_irq(&mdev->data.work.q_lock);
1511 while (!list_empty(&work_list)) {
1512 w = list_entry(work_list.next, struct drbd_work, list);
1513 list_del_init(&w->list);
1515 i++; /* dead debugging code */
1518 spin_lock_irq(&mdev->data.work.q_lock);
1520 sema_init(&mdev->data.work.s, 0);
1521 /* DANGEROUS race: if someone did queue his work within the spinlock,
1522 * but up() ed outside the spinlock, we could get an up() on the
1523 * semaphore without corresponding list entry.
1526 spin_unlock_irq(&mdev->data.work.q_lock);
1528 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1529 /* _drbd_set_state only uses stop_nowait.
1530 * wait here for the Exiting receiver. */
1531 drbd_thread_stop(&mdev->receiver);
1532 drbd_mdev_cleanup(mdev);
1534 dev_info(DEV, "worker terminated\n");
1536 clear_bit(DEVICE_DYING, &mdev->flags);
1537 clear_bit(CONFIG_PENDING, &mdev->flags);
1538 wake_up(&mdev->state_wait);