drbd: New sync_param packet, that includes the parameters of the new controller
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61
62
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74         struct drbd_md_io *md_io;
75
76         md_io = (struct drbd_md_io *)bio->bi_private;
77         md_io->error = error;
78
79         complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87         unsigned long flags = 0;
88         struct drbd_conf *mdev = e->mdev;
89
90         D_ASSERT(e->block_id != ID_VACANT);
91
92         spin_lock_irqsave(&mdev->req_lock, flags);
93         mdev->read_cnt += e->size >> 9;
94         list_del(&e->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &e->flags))
98                 __drbd_chk_io_error(mdev, FALSE);
99         spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101         drbd_queue_work(&mdev->data.work, &e->w);
102         put_ldev(mdev);
103 }
104
105 static int is_failed_barrier(int ee_flags)
106 {
107         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108                         == (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_conf *mdev = e->mdev;
117         sector_t e_sector;
118         int do_wake;
119         int is_syncer_req;
120         int do_al_complete_io;
121
122         /* if this is a failed barrier request, disable use of barriers,
123          * and schedule for resubmission */
124         if (is_failed_barrier(e->flags)) {
125                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126                 spin_lock_irqsave(&mdev->req_lock, flags);
127                 list_del(&e->w.list);
128                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129                 e->w.cb = w_e_reissue;
130                 /* put_ldev actually happens below, once we come here again. */
131                 __release(local);
132                 spin_unlock_irqrestore(&mdev->req_lock, flags);
133                 drbd_queue_work(&mdev->data.work, &e->w);
134                 return;
135         }
136
137         D_ASSERT(e->block_id != ID_VACANT);
138
139         /* after we moved e to done_ee,
140          * we may no longer access it,
141          * it may be freed/reused already!
142          * (as soon as we release the req_lock) */
143         e_sector = e->sector;
144         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145         is_syncer_req = is_syncer_block_id(e->block_id);
146
147         spin_lock_irqsave(&mdev->req_lock, flags);
148         mdev->writ_cnt += e->size >> 9;
149         list_del(&e->w.list); /* has been on active_ee or sync_ee */
150         list_add_tail(&e->w.list, &mdev->done_ee);
151
152         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153          * neither did we wake possibly waiting conflicting requests.
154          * done from "drbd_process_done_ee" within the appropriate w.cb
155          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157         do_wake = is_syncer_req
158                 ? list_empty(&mdev->sync_ee)
159                 : list_empty(&mdev->active_ee);
160
161         if (test_bit(__EE_WAS_ERROR, &e->flags))
162                 __drbd_chk_io_error(mdev, FALSE);
163         spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165         if (is_syncer_req)
166                 drbd_rs_complete_io(mdev, e_sector);
167
168         if (do_wake)
169                 wake_up(&mdev->ee_wait);
170
171         if (do_al_complete_io)
172                 drbd_al_complete_io(mdev, e_sector);
173
174         wake_asender(mdev);
175         put_ldev(mdev);
176 }
177
178 /* writes on behalf of the partner, or resync writes,
179  * "submitted" by the receiver.
180  */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183         struct drbd_epoch_entry *e = bio->bi_private;
184         struct drbd_conf *mdev = e->mdev;
185         int uptodate = bio_flagged(bio, BIO_UPTODATE);
186         int is_write = bio_data_dir(bio) == WRITE;
187
188         if (error)
189                 dev_warn(DEV, "%s: error=%d s=%llus\n",
190                                 is_write ? "write" : "read", error,
191                                 (unsigned long long)e->sector);
192         if (!error && !uptodate) {
193                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194                                 is_write ? "write" : "read",
195                                 (unsigned long long)e->sector);
196                 /* strange behavior of some lower level drivers...
197                  * fail the request by clearing the uptodate flag,
198                  * but do not return any error?! */
199                 error = -EIO;
200         }
201
202         if (error)
203                 set_bit(__EE_WAS_ERROR, &e->flags);
204
205         bio_put(bio); /* no need for the bio anymore */
206         if (atomic_dec_and_test(&e->pending_bios)) {
207                 if (is_write)
208                         drbd_endio_write_sec_final(e);
209                 else
210                         drbd_endio_read_sec_final(e);
211         }
212 }
213
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215  */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218         unsigned long flags;
219         struct drbd_request *req = bio->bi_private;
220         struct drbd_conf *mdev = req->mdev;
221         struct bio_and_error m;
222         enum drbd_req_event what;
223         int uptodate = bio_flagged(bio, BIO_UPTODATE);
224
225         if (!error && !uptodate) {
226                 dev_warn(DEV, "p %s: setting error to -EIO\n",
227                          bio_data_dir(bio) == WRITE ? "write" : "read");
228                 /* strange behavior of some lower level drivers...
229                  * fail the request by clearing the uptodate flag,
230                  * but do not return any error?! */
231                 error = -EIO;
232         }
233
234         /* to avoid recursion in __req_mod */
235         if (unlikely(error)) {
236                 what = (bio_data_dir(bio) == WRITE)
237                         ? write_completed_with_error
238                         : (bio_rw(bio) == READ)
239                           ? read_completed_with_error
240                           : read_ahead_completed_with_error;
241         } else
242                 what = completed_ok;
243
244         bio_put(req->private_bio);
245         req->private_bio = ERR_PTR(error);
246
247         spin_lock_irqsave(&mdev->req_lock, flags);
248         __req_mod(req, what, &m);
249         spin_unlock_irqrestore(&mdev->req_lock, flags);
250
251         if (m.bio)
252                 complete_master_bio(mdev, &m);
253 }
254
255 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256 {
257         struct drbd_request *req = container_of(w, struct drbd_request, w);
258
259         /* We should not detach for read io-error,
260          * but try to WRITE the P_DATA_REPLY to the failed location,
261          * to give the disk the chance to relocate that block */
262
263         spin_lock_irq(&mdev->req_lock);
264         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
265                 _req_mod(req, read_retry_remote_canceled);
266                 spin_unlock_irq(&mdev->req_lock);
267                 return 1;
268         }
269         spin_unlock_irq(&mdev->req_lock);
270
271         return w_send_read_req(mdev, w, 0);
272 }
273
274 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
275 {
276         ERR_IF(cancel) return 1;
277         dev_err(DEV, "resync inactive, but callback triggered??\n");
278         return 1; /* Simply ignore this! */
279 }
280
281 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
282 {
283         struct hash_desc desc;
284         struct scatterlist sg;
285         struct page *page = e->pages;
286         struct page *tmp;
287         unsigned len;
288
289         desc.tfm = tfm;
290         desc.flags = 0;
291
292         sg_init_table(&sg, 1);
293         crypto_hash_init(&desc);
294
295         while ((tmp = page_chain_next(page))) {
296                 /* all but the last page will be fully used */
297                 sg_set_page(&sg, page, PAGE_SIZE, 0);
298                 crypto_hash_update(&desc, &sg, sg.length);
299                 page = tmp;
300         }
301         /* and now the last, possibly only partially used page */
302         len = e->size & (PAGE_SIZE - 1);
303         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
304         crypto_hash_update(&desc, &sg, sg.length);
305         crypto_hash_final(&desc, digest);
306 }
307
308 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
309 {
310         struct hash_desc desc;
311         struct scatterlist sg;
312         struct bio_vec *bvec;
313         int i;
314
315         desc.tfm = tfm;
316         desc.flags = 0;
317
318         sg_init_table(&sg, 1);
319         crypto_hash_init(&desc);
320
321         __bio_for_each_segment(bvec, bio, i, 0) {
322                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
323                 crypto_hash_update(&desc, &sg, sg.length);
324         }
325         crypto_hash_final(&desc, digest);
326 }
327
328 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
329 {
330         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
331         int digest_size;
332         void *digest;
333         int ok;
334
335         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
336
337         if (unlikely(cancel)) {
338                 drbd_free_ee(mdev, e);
339                 return 1;
340         }
341
342         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
343                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
344                 digest = kmalloc(digest_size, GFP_NOIO);
345                 if (digest) {
346                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
347
348                         inc_rs_pending(mdev);
349                         ok = drbd_send_drequest_csum(mdev,
350                                                      e->sector,
351                                                      e->size,
352                                                      digest,
353                                                      digest_size,
354                                                      P_CSUM_RS_REQUEST);
355                         kfree(digest);
356                 } else {
357                         dev_err(DEV, "kmalloc() of digest failed.\n");
358                         ok = 0;
359                 }
360         } else
361                 ok = 1;
362
363         drbd_free_ee(mdev, e);
364
365         if (unlikely(!ok))
366                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
367         return ok;
368 }
369
370 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
371
372 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
373 {
374         struct drbd_epoch_entry *e;
375
376         if (!get_ldev(mdev))
377                 return 0;
378
379         /* GFP_TRY, because if there is no memory available right now, this may
380          * be rescheduled for later. It is "only" background resync, after all. */
381         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
382         if (!e)
383                 goto fail;
384
385         spin_lock_irq(&mdev->req_lock);
386         list_add(&e->w.list, &mdev->read_ee);
387         spin_unlock_irq(&mdev->req_lock);
388
389         e->w.cb = w_e_send_csum;
390         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
391                 return 1;
392
393         drbd_free_ee(mdev, e);
394 fail:
395         put_ldev(mdev);
396         return 2;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         unsigned long flags;
402         struct drbd_conf *mdev = (struct drbd_conf *) data;
403         int queue;
404
405         spin_lock_irqsave(&mdev->req_lock, flags);
406
407         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
408                 queue = 1;
409                 if (mdev->state.conn == C_VERIFY_S)
410                         mdev->resync_work.cb = w_make_ov_request;
411                 else
412                         mdev->resync_work.cb = w_make_resync_request;
413         } else {
414                 queue = 0;
415                 mdev->resync_work.cb = w_resync_inactive;
416         }
417
418         spin_unlock_irqrestore(&mdev->req_lock, flags);
419
420         /* harmless race: list_empty outside data.work.q_lock */
421         if (list_empty(&mdev->resync_work.list) && queue)
422                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
423 }
424
425 int w_make_resync_request(struct drbd_conf *mdev,
426                 struct drbd_work *w, int cancel)
427 {
428         unsigned long bit;
429         sector_t sector;
430         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
431         int max_segment_size;
432         int number, i, size, pe, mx;
433         int align, queued, sndbuf;
434
435         if (unlikely(cancel))
436                 return 1;
437
438         if (unlikely(mdev->state.conn < C_CONNECTED)) {
439                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
440                 return 0;
441         }
442
443         if (mdev->state.conn != C_SYNC_TARGET)
444                 dev_err(DEV, "%s in w_make_resync_request\n",
445                         drbd_conn_str(mdev->state.conn));
446
447         if (!get_ldev(mdev)) {
448                 /* Since we only need to access mdev->rsync a
449                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
450                    to continue resync with a broken disk makes no sense at
451                    all */
452                 dev_err(DEV, "Disk broke down during resync!\n");
453                 mdev->resync_work.cb = w_resync_inactive;
454                 return 1;
455         }
456
457         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
458          * if it should be necessary */
459         max_segment_size = mdev->agreed_pro_version < 94 ?
460                 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
461
462         number = SLEEP_TIME * mdev->sync_conf.rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
463         pe = atomic_read(&mdev->rs_pending_cnt);
464
465         mutex_lock(&mdev->data.mutex);
466         if (mdev->data.socket)
467                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
468         else
469                 mx = 1;
470         mutex_unlock(&mdev->data.mutex);
471
472         /* For resync rates >160MB/sec, allow more pending RS requests */
473         if (number > mx)
474                 mx = number;
475
476         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
477         if ((pe + number) > mx) {
478                 number = mx - pe;
479         }
480
481         for (i = 0; i < number; i++) {
482                 /* Stop generating RS requests, when half of the send buffer is filled */
483                 mutex_lock(&mdev->data.mutex);
484                 if (mdev->data.socket) {
485                         queued = mdev->data.socket->sk->sk_wmem_queued;
486                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
487                 } else {
488                         queued = 1;
489                         sndbuf = 0;
490                 }
491                 mutex_unlock(&mdev->data.mutex);
492                 if (queued > sndbuf / 2)
493                         goto requeue;
494
495 next_sector:
496                 size = BM_BLOCK_SIZE;
497                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
498
499                 if (bit == -1UL) {
500                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
501                         mdev->resync_work.cb = w_resync_inactive;
502                         put_ldev(mdev);
503                         return 1;
504                 }
505
506                 sector = BM_BIT_TO_SECT(bit);
507
508                 if (drbd_try_rs_begin_io(mdev, sector)) {
509                         mdev->bm_resync_fo = bit;
510                         goto requeue;
511                 }
512                 mdev->bm_resync_fo = bit + 1;
513
514                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
515                         drbd_rs_complete_io(mdev, sector);
516                         goto next_sector;
517                 }
518
519 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
520                 /* try to find some adjacent bits.
521                  * we stop if we have already the maximum req size.
522                  *
523                  * Additionally always align bigger requests, in order to
524                  * be prepared for all stripe sizes of software RAIDs.
525                  */
526                 align = 1;
527                 for (;;) {
528                         if (size + BM_BLOCK_SIZE > max_segment_size)
529                                 break;
530
531                         /* Be always aligned */
532                         if (sector & ((1<<(align+3))-1))
533                                 break;
534
535                         /* do not cross extent boundaries */
536                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
537                                 break;
538                         /* now, is it actually dirty, after all?
539                          * caution, drbd_bm_test_bit is tri-state for some
540                          * obscure reason; ( b == 0 ) would get the out-of-band
541                          * only accidentally right because of the "oddly sized"
542                          * adjustment below */
543                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
544                                 break;
545                         bit++;
546                         size += BM_BLOCK_SIZE;
547                         if ((BM_BLOCK_SIZE << align) <= size)
548                                 align++;
549                         i++;
550                 }
551                 /* if we merged some,
552                  * reset the offset to start the next drbd_bm_find_next from */
553                 if (size > BM_BLOCK_SIZE)
554                         mdev->bm_resync_fo = bit + 1;
555 #endif
556
557                 /* adjust very last sectors, in case we are oddly sized */
558                 if (sector + (size>>9) > capacity)
559                         size = (capacity-sector)<<9;
560                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
561                         switch (read_for_csum(mdev, sector, size)) {
562                         case 0: /* Disk failure*/
563                                 put_ldev(mdev);
564                                 return 0;
565                         case 2: /* Allocation failed */
566                                 drbd_rs_complete_io(mdev, sector);
567                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
568                                 goto requeue;
569                         /* case 1: everything ok */
570                         }
571                 } else {
572                         inc_rs_pending(mdev);
573                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
574                                                sector, size, ID_SYNCER)) {
575                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
576                                 dec_rs_pending(mdev);
577                                 put_ldev(mdev);
578                                 return 0;
579                         }
580                 }
581         }
582
583         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
584                 /* last syncer _request_ was sent,
585                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
586                  * next sync group will resume), as soon as we receive the last
587                  * resync data block, and the last bit is cleared.
588                  * until then resync "work" is "inactive" ...
589                  */
590                 mdev->resync_work.cb = w_resync_inactive;
591                 put_ldev(mdev);
592                 return 1;
593         }
594
595  requeue:
596         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
597         put_ldev(mdev);
598         return 1;
599 }
600
601 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
602 {
603         int number, i, size;
604         sector_t sector;
605         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
606
607         if (unlikely(cancel))
608                 return 1;
609
610         if (unlikely(mdev->state.conn < C_CONNECTED)) {
611                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
612                 return 0;
613         }
614
615         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
616         if (atomic_read(&mdev->rs_pending_cnt) > number)
617                 goto requeue;
618
619         number -= atomic_read(&mdev->rs_pending_cnt);
620
621         sector = mdev->ov_position;
622         for (i = 0; i < number; i++) {
623                 if (sector >= capacity) {
624                         mdev->resync_work.cb = w_resync_inactive;
625                         return 1;
626                 }
627
628                 size = BM_BLOCK_SIZE;
629
630                 if (drbd_try_rs_begin_io(mdev, sector)) {
631                         mdev->ov_position = sector;
632                         goto requeue;
633                 }
634
635                 if (sector + (size>>9) > capacity)
636                         size = (capacity-sector)<<9;
637
638                 inc_rs_pending(mdev);
639                 if (!drbd_send_ov_request(mdev, sector, size)) {
640                         dec_rs_pending(mdev);
641                         return 0;
642                 }
643                 sector += BM_SECT_PER_BIT;
644         }
645         mdev->ov_position = sector;
646
647  requeue:
648         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
649         return 1;
650 }
651
652
653 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
654 {
655         kfree(w);
656         ov_oos_print(mdev);
657         drbd_resync_finished(mdev);
658
659         return 1;
660 }
661
662 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
663 {
664         kfree(w);
665
666         drbd_resync_finished(mdev);
667
668         return 1;
669 }
670
671 int drbd_resync_finished(struct drbd_conf *mdev)
672 {
673         unsigned long db, dt, dbdt;
674         unsigned long n_oos;
675         union drbd_state os, ns;
676         struct drbd_work *w;
677         char *khelper_cmd = NULL;
678
679         /* Remove all elements from the resync LRU. Since future actions
680          * might set bits in the (main) bitmap, then the entries in the
681          * resync LRU would be wrong. */
682         if (drbd_rs_del_all(mdev)) {
683                 /* In case this is not possible now, most probably because
684                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
685                  * queue (or even the read operations for those packets
686                  * is not finished by now).   Retry in 100ms. */
687
688                 drbd_kick_lo(mdev);
689                 __set_current_state(TASK_INTERRUPTIBLE);
690                 schedule_timeout(HZ / 10);
691                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
692                 if (w) {
693                         w->cb = w_resync_finished;
694                         drbd_queue_work(&mdev->data.work, w);
695                         return 1;
696                 }
697                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
698         }
699
700         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
701         if (dt <= 0)
702                 dt = 1;
703         db = mdev->rs_total;
704         dbdt = Bit2KB(db/dt);
705         mdev->rs_paused /= HZ;
706
707         if (!get_ldev(mdev))
708                 goto out;
709
710         spin_lock_irq(&mdev->req_lock);
711         os = mdev->state;
712
713         /* This protects us against multiple calls (that can happen in the presence
714            of application IO), and against connectivity loss just before we arrive here. */
715         if (os.conn <= C_CONNECTED)
716                 goto out_unlock;
717
718         ns = os;
719         ns.conn = C_CONNECTED;
720
721         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
722              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
723              "Online verify " : "Resync",
724              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
725
726         n_oos = drbd_bm_total_weight(mdev);
727
728         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
729                 if (n_oos) {
730                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
731                               n_oos, Bit2KB(1));
732                         khelper_cmd = "out-of-sync";
733                 }
734         } else {
735                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
736
737                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
738                         khelper_cmd = "after-resync-target";
739
740                 if (mdev->csums_tfm && mdev->rs_total) {
741                         const unsigned long s = mdev->rs_same_csum;
742                         const unsigned long t = mdev->rs_total;
743                         const int ratio =
744                                 (t == 0)     ? 0 :
745                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
746                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
747                              "transferred %luK total %luK\n",
748                              ratio,
749                              Bit2KB(mdev->rs_same_csum),
750                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
751                              Bit2KB(mdev->rs_total));
752                 }
753         }
754
755         if (mdev->rs_failed) {
756                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
757
758                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
759                         ns.disk = D_INCONSISTENT;
760                         ns.pdsk = D_UP_TO_DATE;
761                 } else {
762                         ns.disk = D_UP_TO_DATE;
763                         ns.pdsk = D_INCONSISTENT;
764                 }
765         } else {
766                 ns.disk = D_UP_TO_DATE;
767                 ns.pdsk = D_UP_TO_DATE;
768
769                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
770                         if (mdev->p_uuid) {
771                                 int i;
772                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
773                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
774                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
775                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
776                         } else {
777                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
778                         }
779                 }
780
781                 drbd_uuid_set_bm(mdev, 0UL);
782
783                 if (mdev->p_uuid) {
784                         /* Now the two UUID sets are equal, update what we
785                          * know of the peer. */
786                         int i;
787                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
788                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
789                 }
790         }
791
792         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
793 out_unlock:
794         spin_unlock_irq(&mdev->req_lock);
795         put_ldev(mdev);
796 out:
797         mdev->rs_total  = 0;
798         mdev->rs_failed = 0;
799         mdev->rs_paused = 0;
800         mdev->ov_start_sector = 0;
801
802         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
803                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
804                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
805         }
806
807         if (khelper_cmd)
808                 drbd_khelper(mdev, khelper_cmd);
809
810         return 1;
811 }
812
813 /* helper */
814 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
815 {
816         if (drbd_ee_has_active_page(e)) {
817                 /* This might happen if sendpage() has not finished */
818                 spin_lock_irq(&mdev->req_lock);
819                 list_add_tail(&e->w.list, &mdev->net_ee);
820                 spin_unlock_irq(&mdev->req_lock);
821         } else
822                 drbd_free_ee(mdev, e);
823 }
824
825 /**
826  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
827  * @mdev:       DRBD device.
828  * @w:          work object.
829  * @cancel:     The connection will be closed anyways
830  */
831 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
832 {
833         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
834         int ok;
835
836         if (unlikely(cancel)) {
837                 drbd_free_ee(mdev, e);
838                 dec_unacked(mdev);
839                 return 1;
840         }
841
842         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
843                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
844         } else {
845                 if (__ratelimit(&drbd_ratelimit_state))
846                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
847                             (unsigned long long)e->sector);
848
849                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
850         }
851
852         dec_unacked(mdev);
853
854         move_to_net_ee_or_free(mdev, e);
855
856         if (unlikely(!ok))
857                 dev_err(DEV, "drbd_send_block() failed\n");
858         return ok;
859 }
860
861 /**
862  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
863  * @mdev:       DRBD device.
864  * @w:          work object.
865  * @cancel:     The connection will be closed anyways
866  */
867 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
868 {
869         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
870         int ok;
871
872         if (unlikely(cancel)) {
873                 drbd_free_ee(mdev, e);
874                 dec_unacked(mdev);
875                 return 1;
876         }
877
878         if (get_ldev_if_state(mdev, D_FAILED)) {
879                 drbd_rs_complete_io(mdev, e->sector);
880                 put_ldev(mdev);
881         }
882
883         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
884                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
885                         inc_rs_pending(mdev);
886                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
887                 } else {
888                         if (__ratelimit(&drbd_ratelimit_state))
889                                 dev_err(DEV, "Not sending RSDataReply, "
890                                     "partner DISKLESS!\n");
891                         ok = 1;
892                 }
893         } else {
894                 if (__ratelimit(&drbd_ratelimit_state))
895                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
896                             (unsigned long long)e->sector);
897
898                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
899
900                 /* update resync data with failure */
901                 drbd_rs_failed_io(mdev, e->sector, e->size);
902         }
903
904         dec_unacked(mdev);
905
906         move_to_net_ee_or_free(mdev, e);
907
908         if (unlikely(!ok))
909                 dev_err(DEV, "drbd_send_block() failed\n");
910         return ok;
911 }
912
913 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
914 {
915         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
916         struct digest_info *di;
917         int digest_size;
918         void *digest = NULL;
919         int ok, eq = 0;
920
921         if (unlikely(cancel)) {
922                 drbd_free_ee(mdev, e);
923                 dec_unacked(mdev);
924                 return 1;
925         }
926
927         drbd_rs_complete_io(mdev, e->sector);
928
929         di = (struct digest_info *)(unsigned long)e->block_id;
930
931         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
932                 /* quick hack to try to avoid a race against reconfiguration.
933                  * a real fix would be much more involved,
934                  * introducing more locking mechanisms */
935                 if (mdev->csums_tfm) {
936                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
937                         D_ASSERT(digest_size == di->digest_size);
938                         digest = kmalloc(digest_size, GFP_NOIO);
939                 }
940                 if (digest) {
941                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
942                         eq = !memcmp(digest, di->digest, digest_size);
943                         kfree(digest);
944                 }
945
946                 if (eq) {
947                         drbd_set_in_sync(mdev, e->sector, e->size);
948                         /* rs_same_csums unit is BM_BLOCK_SIZE */
949                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
950                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
951                 } else {
952                         inc_rs_pending(mdev);
953                         e->block_id = ID_SYNCER;
954                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
955                 }
956         } else {
957                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
958                 if (__ratelimit(&drbd_ratelimit_state))
959                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
960         }
961
962         dec_unacked(mdev);
963
964         kfree(di);
965
966         move_to_net_ee_or_free(mdev, e);
967
968         if (unlikely(!ok))
969                 dev_err(DEV, "drbd_send_block/ack() failed\n");
970         return ok;
971 }
972
973 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
974 {
975         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
976         int digest_size;
977         void *digest;
978         int ok = 1;
979
980         if (unlikely(cancel))
981                 goto out;
982
983         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
984                 goto out;
985
986         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
987         /* FIXME if this allocation fails, online verify will not terminate! */
988         digest = kmalloc(digest_size, GFP_NOIO);
989         if (digest) {
990                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
991                 inc_rs_pending(mdev);
992                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
993                                              digest, digest_size, P_OV_REPLY);
994                 if (!ok)
995                         dec_rs_pending(mdev);
996                 kfree(digest);
997         }
998
999 out:
1000         drbd_free_ee(mdev, e);
1001
1002         dec_unacked(mdev);
1003
1004         return ok;
1005 }
1006
1007 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1008 {
1009         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1010                 mdev->ov_last_oos_size += size>>9;
1011         } else {
1012                 mdev->ov_last_oos_start = sector;
1013                 mdev->ov_last_oos_size = size>>9;
1014         }
1015         drbd_set_out_of_sync(mdev, sector, size);
1016         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1017 }
1018
1019 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1020 {
1021         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1022         struct digest_info *di;
1023         int digest_size;
1024         void *digest;
1025         int ok, eq = 0;
1026
1027         if (unlikely(cancel)) {
1028                 drbd_free_ee(mdev, e);
1029                 dec_unacked(mdev);
1030                 return 1;
1031         }
1032
1033         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1034          * the resync lru has been cleaned up already */
1035         drbd_rs_complete_io(mdev, e->sector);
1036
1037         di = (struct digest_info *)(unsigned long)e->block_id;
1038
1039         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1040                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1041                 digest = kmalloc(digest_size, GFP_NOIO);
1042                 if (digest) {
1043                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1044
1045                         D_ASSERT(digest_size == di->digest_size);
1046                         eq = !memcmp(digest, di->digest, digest_size);
1047                         kfree(digest);
1048                 }
1049         } else {
1050                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1051                 if (__ratelimit(&drbd_ratelimit_state))
1052                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1053         }
1054
1055         dec_unacked(mdev);
1056
1057         kfree(di);
1058
1059         if (!eq)
1060                 drbd_ov_oos_found(mdev, e->sector, e->size);
1061         else
1062                 ov_oos_print(mdev);
1063
1064         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1065                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1066
1067         drbd_free_ee(mdev, e);
1068
1069         if (--mdev->ov_left == 0) {
1070                 ov_oos_print(mdev);
1071                 drbd_resync_finished(mdev);
1072         }
1073
1074         return ok;
1075 }
1076
1077 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1078 {
1079         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1080         complete(&b->done);
1081         return 1;
1082 }
1083
1084 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1085 {
1086         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1087         struct p_barrier *p = &mdev->data.sbuf.barrier;
1088         int ok = 1;
1089
1090         /* really avoid racing with tl_clear.  w.cb may have been referenced
1091          * just before it was reassigned and re-queued, so double check that.
1092          * actually, this race was harmless, since we only try to send the
1093          * barrier packet here, and otherwise do nothing with the object.
1094          * but compare with the head of w_clear_epoch */
1095         spin_lock_irq(&mdev->req_lock);
1096         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1097                 cancel = 1;
1098         spin_unlock_irq(&mdev->req_lock);
1099         if (cancel)
1100                 return 1;
1101
1102         if (!drbd_get_data_sock(mdev))
1103                 return 0;
1104         p->barrier = b->br_number;
1105         /* inc_ap_pending was done where this was queued.
1106          * dec_ap_pending will be done in got_BarrierAck
1107          * or (on connection loss) in w_clear_epoch.  */
1108         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1109                                 (struct p_header *)p, sizeof(*p), 0);
1110         drbd_put_data_sock(mdev);
1111
1112         return ok;
1113 }
1114
1115 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1116 {
1117         if (cancel)
1118                 return 1;
1119         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1120 }
1121
1122 /**
1123  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1124  * @mdev:       DRBD device.
1125  * @w:          work object.
1126  * @cancel:     The connection will be closed anyways
1127  */
1128 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1129 {
1130         struct drbd_request *req = container_of(w, struct drbd_request, w);
1131         int ok;
1132
1133         if (unlikely(cancel)) {
1134                 req_mod(req, send_canceled);
1135                 return 1;
1136         }
1137
1138         ok = drbd_send_dblock(mdev, req);
1139         req_mod(req, ok ? handed_over_to_network : send_failed);
1140
1141         return ok;
1142 }
1143
1144 /**
1145  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1146  * @mdev:       DRBD device.
1147  * @w:          work object.
1148  * @cancel:     The connection will be closed anyways
1149  */
1150 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1151 {
1152         struct drbd_request *req = container_of(w, struct drbd_request, w);
1153         int ok;
1154
1155         if (unlikely(cancel)) {
1156                 req_mod(req, send_canceled);
1157                 return 1;
1158         }
1159
1160         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1161                                 (unsigned long)req);
1162
1163         if (!ok) {
1164                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1165                  * so this is probably redundant */
1166                 if (mdev->state.conn >= C_CONNECTED)
1167                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1168         }
1169         req_mod(req, ok ? handed_over_to_network : send_failed);
1170
1171         return ok;
1172 }
1173
1174 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1175 {
1176         struct drbd_request *req = container_of(w, struct drbd_request, w);
1177
1178         if (bio_data_dir(req->master_bio) == WRITE)
1179                 drbd_al_begin_io(mdev, req->sector);
1180         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1181            theoretically. Practically it can not deadlock, since this is
1182            only used when unfreezing IOs. All the extents of the requests
1183            that made it into the TL are already active */
1184
1185         drbd_req_make_private_bio(req, req->master_bio);
1186         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1187         generic_make_request(req->private_bio);
1188
1189         return 1;
1190 }
1191
1192 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1193 {
1194         struct drbd_conf *odev = mdev;
1195
1196         while (1) {
1197                 if (odev->sync_conf.after == -1)
1198                         return 1;
1199                 odev = minor_to_mdev(odev->sync_conf.after);
1200                 ERR_IF(!odev) return 1;
1201                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1202                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1203                     odev->state.aftr_isp || odev->state.peer_isp ||
1204                     odev->state.user_isp)
1205                         return 0;
1206         }
1207 }
1208
1209 /**
1210  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1211  * @mdev:       DRBD device.
1212  *
1213  * Called from process context only (admin command and after_state_ch).
1214  */
1215 static int _drbd_pause_after(struct drbd_conf *mdev)
1216 {
1217         struct drbd_conf *odev;
1218         int i, rv = 0;
1219
1220         for (i = 0; i < minor_count; i++) {
1221                 odev = minor_to_mdev(i);
1222                 if (!odev)
1223                         continue;
1224                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1225                         continue;
1226                 if (!_drbd_may_sync_now(odev))
1227                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1228                                != SS_NOTHING_TO_DO);
1229         }
1230
1231         return rv;
1232 }
1233
1234 /**
1235  * _drbd_resume_next() - Resume resync on all devices that may resync now
1236  * @mdev:       DRBD device.
1237  *
1238  * Called from process context only (admin command and worker).
1239  */
1240 static int _drbd_resume_next(struct drbd_conf *mdev)
1241 {
1242         struct drbd_conf *odev;
1243         int i, rv = 0;
1244
1245         for (i = 0; i < minor_count; i++) {
1246                 odev = minor_to_mdev(i);
1247                 if (!odev)
1248                         continue;
1249                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1250                         continue;
1251                 if (odev->state.aftr_isp) {
1252                         if (_drbd_may_sync_now(odev))
1253                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1254                                                         CS_HARD, NULL)
1255                                        != SS_NOTHING_TO_DO) ;
1256                 }
1257         }
1258         return rv;
1259 }
1260
1261 void resume_next_sg(struct drbd_conf *mdev)
1262 {
1263         write_lock_irq(&global_state_lock);
1264         _drbd_resume_next(mdev);
1265         write_unlock_irq(&global_state_lock);
1266 }
1267
1268 void suspend_other_sg(struct drbd_conf *mdev)
1269 {
1270         write_lock_irq(&global_state_lock);
1271         _drbd_pause_after(mdev);
1272         write_unlock_irq(&global_state_lock);
1273 }
1274
1275 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1276 {
1277         struct drbd_conf *odev;
1278
1279         if (o_minor == -1)
1280                 return NO_ERROR;
1281         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1282                 return ERR_SYNC_AFTER;
1283
1284         /* check for loops */
1285         odev = minor_to_mdev(o_minor);
1286         while (1) {
1287                 if (odev == mdev)
1288                         return ERR_SYNC_AFTER_CYCLE;
1289
1290                 /* dependency chain ends here, no cycles. */
1291                 if (odev->sync_conf.after == -1)
1292                         return NO_ERROR;
1293
1294                 /* follow the dependency chain */
1295                 odev = minor_to_mdev(odev->sync_conf.after);
1296         }
1297 }
1298
1299 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1300 {
1301         int changes;
1302         int retcode;
1303
1304         write_lock_irq(&global_state_lock);
1305         retcode = sync_after_error(mdev, na);
1306         if (retcode == NO_ERROR) {
1307                 mdev->sync_conf.after = na;
1308                 do {
1309                         changes  = _drbd_pause_after(mdev);
1310                         changes |= _drbd_resume_next(mdev);
1311                 } while (changes);
1312         }
1313         write_unlock_irq(&global_state_lock);
1314         return retcode;
1315 }
1316
1317 static void ping_peer(struct drbd_conf *mdev)
1318 {
1319         clear_bit(GOT_PING_ACK, &mdev->flags);
1320         request_ping(mdev);
1321         wait_event(mdev->misc_wait,
1322                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1323 }
1324
1325 /**
1326  * drbd_start_resync() - Start the resync process
1327  * @mdev:       DRBD device.
1328  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1329  *
1330  * This function might bring you directly into one of the
1331  * C_PAUSED_SYNC_* states.
1332  */
1333 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1334 {
1335         union drbd_state ns;
1336         int r;
1337
1338         if (mdev->state.conn >= C_SYNC_SOURCE) {
1339                 dev_err(DEV, "Resync already running!\n");
1340                 return;
1341         }
1342
1343         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1344         drbd_rs_cancel_all(mdev);
1345
1346         if (side == C_SYNC_TARGET) {
1347                 /* Since application IO was locked out during C_WF_BITMAP_T and
1348                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1349                    we check that we might make the data inconsistent. */
1350                 r = drbd_khelper(mdev, "before-resync-target");
1351                 r = (r >> 8) & 0xff;
1352                 if (r > 0) {
1353                         dev_info(DEV, "before-resync-target handler returned %d, "
1354                              "dropping connection.\n", r);
1355                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1356                         return;
1357                 }
1358         }
1359
1360         drbd_state_lock(mdev);
1361
1362         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1363                 drbd_state_unlock(mdev);
1364                 return;
1365         }
1366
1367         if (side == C_SYNC_TARGET) {
1368                 mdev->bm_resync_fo = 0;
1369         } else /* side == C_SYNC_SOURCE */ {
1370                 u64 uuid;
1371
1372                 get_random_bytes(&uuid, sizeof(u64));
1373                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1374                 drbd_send_sync_uuid(mdev, uuid);
1375
1376                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1377         }
1378
1379         write_lock_irq(&global_state_lock);
1380         ns = mdev->state;
1381
1382         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1383
1384         ns.conn = side;
1385
1386         if (side == C_SYNC_TARGET)
1387                 ns.disk = D_INCONSISTENT;
1388         else /* side == C_SYNC_SOURCE */
1389                 ns.pdsk = D_INCONSISTENT;
1390
1391         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1392         ns = mdev->state;
1393
1394         if (ns.conn < C_CONNECTED)
1395                 r = SS_UNKNOWN_ERROR;
1396
1397         if (r == SS_SUCCESS) {
1398                 mdev->rs_total     =
1399                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1400                 mdev->rs_failed    = 0;
1401                 mdev->rs_paused    = 0;
1402                 mdev->rs_start     =
1403                 mdev->rs_mark_time = jiffies;
1404                 mdev->rs_same_csum = 0;
1405                 _drbd_pause_after(mdev);
1406         }
1407         write_unlock_irq(&global_state_lock);
1408         put_ldev(mdev);
1409
1410         if (r == SS_SUCCESS) {
1411                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1412                      drbd_conn_str(ns.conn),
1413                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1414                      (unsigned long) mdev->rs_total);
1415
1416                 if (mdev->rs_total == 0) {
1417                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1418                         ping_peer(mdev);
1419                         drbd_resync_finished(mdev);
1420                 }
1421
1422                 /* ns.conn may already be != mdev->state.conn,
1423                  * we may have been paused in between, or become paused until
1424                  * the timer triggers.
1425                  * No matter, that is handled in resync_timer_fn() */
1426                 if (ns.conn == C_SYNC_TARGET)
1427                         mod_timer(&mdev->resync_timer, jiffies);
1428
1429                 drbd_md_sync(mdev);
1430         }
1431         drbd_state_unlock(mdev);
1432 }
1433
1434 int drbd_worker(struct drbd_thread *thi)
1435 {
1436         struct drbd_conf *mdev = thi->mdev;
1437         struct drbd_work *w = NULL;
1438         LIST_HEAD(work_list);
1439         int intr = 0, i;
1440
1441         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1442
1443         while (get_t_state(thi) == Running) {
1444                 drbd_thread_current_set_cpu(mdev);
1445
1446                 if (down_trylock(&mdev->data.work.s)) {
1447                         mutex_lock(&mdev->data.mutex);
1448                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1449                                 drbd_tcp_uncork(mdev->data.socket);
1450                         mutex_unlock(&mdev->data.mutex);
1451
1452                         intr = down_interruptible(&mdev->data.work.s);
1453
1454                         mutex_lock(&mdev->data.mutex);
1455                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1456                                 drbd_tcp_cork(mdev->data.socket);
1457                         mutex_unlock(&mdev->data.mutex);
1458                 }
1459
1460                 if (intr) {
1461                         D_ASSERT(intr == -EINTR);
1462                         flush_signals(current);
1463                         ERR_IF (get_t_state(thi) == Running)
1464                                 continue;
1465                         break;
1466                 }
1467
1468                 if (get_t_state(thi) != Running)
1469                         break;
1470                 /* With this break, we have done a down() but not consumed
1471                    the entry from the list. The cleanup code takes care of
1472                    this...   */
1473
1474                 w = NULL;
1475                 spin_lock_irq(&mdev->data.work.q_lock);
1476                 ERR_IF(list_empty(&mdev->data.work.q)) {
1477                         /* something terribly wrong in our logic.
1478                          * we were able to down() the semaphore,
1479                          * but the list is empty... doh.
1480                          *
1481                          * what is the best thing to do now?
1482                          * try again from scratch, restarting the receiver,
1483                          * asender, whatnot? could break even more ugly,
1484                          * e.g. when we are primary, but no good local data.
1485                          *
1486                          * I'll try to get away just starting over this loop.
1487                          */
1488                         spin_unlock_irq(&mdev->data.work.q_lock);
1489                         continue;
1490                 }
1491                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1492                 list_del_init(&w->list);
1493                 spin_unlock_irq(&mdev->data.work.q_lock);
1494
1495                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1496                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1497                         if (mdev->state.conn >= C_CONNECTED)
1498                                 drbd_force_state(mdev,
1499                                                 NS(conn, C_NETWORK_FAILURE));
1500                 }
1501         }
1502         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1503         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1504
1505         spin_lock_irq(&mdev->data.work.q_lock);
1506         i = 0;
1507         while (!list_empty(&mdev->data.work.q)) {
1508                 list_splice_init(&mdev->data.work.q, &work_list);
1509                 spin_unlock_irq(&mdev->data.work.q_lock);
1510
1511                 while (!list_empty(&work_list)) {
1512                         w = list_entry(work_list.next, struct drbd_work, list);
1513                         list_del_init(&w->list);
1514                         w->cb(mdev, w, 1);
1515                         i++; /* dead debugging code */
1516                 }
1517
1518                 spin_lock_irq(&mdev->data.work.q_lock);
1519         }
1520         sema_init(&mdev->data.work.s, 0);
1521         /* DANGEROUS race: if someone did queue his work within the spinlock,
1522          * but up() ed outside the spinlock, we could get an up() on the
1523          * semaphore without corresponding list entry.
1524          * So don't do that.
1525          */
1526         spin_unlock_irq(&mdev->data.work.q_lock);
1527
1528         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1529         /* _drbd_set_state only uses stop_nowait.
1530          * wait here for the Exiting receiver. */
1531         drbd_thread_stop(&mdev->receiver);
1532         drbd_mdev_cleanup(mdev);
1533
1534         dev_info(DEV, "worker terminated\n");
1535
1536         clear_bit(DEVICE_DYING, &mdev->flags);
1537         clear_bit(CONFIG_PENDING, &mdev->flags);
1538         wake_up(&mdev->state_wait);
1539
1540         return 0;
1541 }