Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 #define SLEEP_TIME (HZ/10)
43
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45
46
47
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_write_sec
51    drbd_endio_read_sec
52    drbd_endio_pri
53
54  * more endio handlers:
55    atodb_endio in drbd_actlog.c
56    drbd_bm_async_io_complete in drbd_bitmap.c
57
58  * For all these callbacks, note the following:
59  * The callbacks will be called in irq context by the IDE drivers,
60  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
61  * Try to get the locking right :)
62  *
63  */
64
65
66 /* About the global_state_lock
67    Each state transition on an device holds a read lock. In case we have
68    to evaluate the sync after dependencies, we grab a write lock, because
69    we need stable states on all devices for that.  */
70 rwlock_t global_state_lock;
71
72 /* used for synchronous meta data and bitmap IO
73  * submitted by drbd_md_sync_page_io()
74  */
75 void drbd_md_io_complete(struct bio *bio, int error)
76 {
77         struct drbd_md_io *md_io;
78
79         md_io = (struct drbd_md_io *)bio->bi_private;
80         md_io->error = error;
81
82         complete(&md_io->event);
83 }
84
85 /* reads on behalf of the partner,
86  * "submitted" by the receiver
87  */
88 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
89 {
90         unsigned long flags = 0;
91         struct drbd_epoch_entry *e = NULL;
92         struct drbd_conf *mdev;
93         int uptodate = bio_flagged(bio, BIO_UPTODATE);
94
95         e = bio->bi_private;
96         mdev = e->mdev;
97
98         if (error)
99                 dev_warn(DEV, "read: error=%d s=%llus\n", error,
100                                 (unsigned long long)e->sector);
101         if (!error && !uptodate) {
102                 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103                                 (unsigned long long)e->sector);
104                 /* strange behavior of some lower level drivers...
105                  * fail the request by clearing the uptodate flag,
106                  * but do not return any error?! */
107                 error = -EIO;
108         }
109
110         D_ASSERT(e->block_id != ID_VACANT);
111
112         spin_lock_irqsave(&mdev->req_lock, flags);
113         mdev->read_cnt += e->size >> 9;
114         list_del(&e->w.list);
115         if (list_empty(&mdev->read_ee))
116                 wake_up(&mdev->ee_wait);
117         spin_unlock_irqrestore(&mdev->req_lock, flags);
118
119         drbd_chk_io_error(mdev, error, FALSE);
120         drbd_queue_work(&mdev->data.work, &e->w);
121         put_ldev(mdev);
122 }
123
124 /* writes on behalf of the partner, or resync writes,
125  * "submitted" by the receiver.
126  */
127 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
128 {
129         unsigned long flags = 0;
130         struct drbd_epoch_entry *e = NULL;
131         struct drbd_conf *mdev;
132         sector_t e_sector;
133         int do_wake;
134         int is_syncer_req;
135         int do_al_complete_io;
136         int uptodate = bio_flagged(bio, BIO_UPTODATE);
137         int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
138
139         e = bio->bi_private;
140         mdev = e->mdev;
141
142         if (error)
143                 dev_warn(DEV, "write: error=%d s=%llus\n", error,
144                                 (unsigned long long)e->sector);
145         if (!error && !uptodate) {
146                 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147                                 (unsigned long long)e->sector);
148                 /* strange behavior of some lower level drivers...
149                  * fail the request by clearing the uptodate flag,
150                  * but do not return any error?! */
151                 error = -EIO;
152         }
153
154         /* error == -ENOTSUPP would be a better test,
155          * alas it is not reliable */
156         if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
158                 spin_lock_irqsave(&mdev->req_lock, flags);
159                 list_del(&e->w.list);
160                 e->w.cb = w_e_reissue;
161                 /* put_ldev actually happens below, once we come here again. */
162                 __release(local);
163                 spin_unlock_irqrestore(&mdev->req_lock, flags);
164                 drbd_queue_work(&mdev->data.work, &e->w);
165                 return;
166         }
167
168         D_ASSERT(e->block_id != ID_VACANT);
169
170         spin_lock_irqsave(&mdev->req_lock, flags);
171         mdev->writ_cnt += e->size >> 9;
172         is_syncer_req = is_syncer_block_id(e->block_id);
173
174         /* after we moved e to done_ee,
175          * we may no longer access it,
176          * it may be freed/reused already!
177          * (as soon as we release the req_lock) */
178         e_sector = e->sector;
179         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
180
181         list_del(&e->w.list); /* has been on active_ee or sync_ee */
182         list_add_tail(&e->w.list, &mdev->done_ee);
183
184         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
185          * neither did we wake possibly waiting conflicting requests.
186          * done from "drbd_process_done_ee" within the appropriate w.cb
187          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
188
189         do_wake = is_syncer_req
190                 ? list_empty(&mdev->sync_ee)
191                 : list_empty(&mdev->active_ee);
192
193         if (error)
194                 __drbd_chk_io_error(mdev, FALSE);
195         spin_unlock_irqrestore(&mdev->req_lock, flags);
196
197         if (is_syncer_req)
198                 drbd_rs_complete_io(mdev, e_sector);
199
200         if (do_wake)
201                 wake_up(&mdev->ee_wait);
202
203         if (do_al_complete_io)
204                 drbd_al_complete_io(mdev, e_sector);
205
206         wake_asender(mdev);
207         put_ldev(mdev);
208
209 }
210
211 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
212  */
213 void drbd_endio_pri(struct bio *bio, int error)
214 {
215         unsigned long flags;
216         struct drbd_request *req = bio->bi_private;
217         struct drbd_conf *mdev = req->mdev;
218         struct bio_and_error m;
219         enum drbd_req_event what;
220         int uptodate = bio_flagged(bio, BIO_UPTODATE);
221
222         if (error)
223                 dev_warn(DEV, "p %s: error=%d\n",
224                          bio_data_dir(bio) == WRITE ? "write" : "read", error);
225         if (!error && !uptodate) {
226                 dev_warn(DEV, "p %s: setting error to -EIO\n",
227                          bio_data_dir(bio) == WRITE ? "write" : "read");
228                 /* strange behavior of some lower level drivers...
229                  * fail the request by clearing the uptodate flag,
230                  * but do not return any error?! */
231                 error = -EIO;
232         }
233
234         /* to avoid recursion in __req_mod */
235         if (unlikely(error)) {
236                 what = (bio_data_dir(bio) == WRITE)
237                         ? write_completed_with_error
238                         : (bio_rw(bio) == READ)
239                           ? read_completed_with_error
240                           : read_ahead_completed_with_error;
241         } else
242                 what = completed_ok;
243
244         bio_put(req->private_bio);
245         req->private_bio = ERR_PTR(error);
246
247         spin_lock_irqsave(&mdev->req_lock, flags);
248         __req_mod(req, what, &m);
249         spin_unlock_irqrestore(&mdev->req_lock, flags);
250
251         if (m.bio)
252                 complete_master_bio(mdev, &m);
253 }
254
255 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256 {
257         struct drbd_request *req = container_of(w, struct drbd_request, w);
258
259         /* NOTE: mdev->ldev can be NULL by the time we get here! */
260         /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
261
262         /* the only way this callback is scheduled is from _req_may_be_done,
263          * when it is done and had a local write error, see comments there */
264         drbd_req_free(req);
265
266         return TRUE;
267 }
268
269 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
270 {
271         struct drbd_request *req = container_of(w, struct drbd_request, w);
272
273         /* We should not detach for read io-error,
274          * but try to WRITE the P_DATA_REPLY to the failed location,
275          * to give the disk the chance to relocate that block */
276
277         spin_lock_irq(&mdev->req_lock);
278         if (cancel ||
279             mdev->state.conn < C_CONNECTED ||
280             mdev->state.pdsk <= D_INCONSISTENT) {
281                 _req_mod(req, send_canceled);
282                 spin_unlock_irq(&mdev->req_lock);
283                 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
284                 return 1;
285         }
286         spin_unlock_irq(&mdev->req_lock);
287
288         return w_send_read_req(mdev, w, 0);
289 }
290
291 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
292 {
293         ERR_IF(cancel) return 1;
294         dev_err(DEV, "resync inactive, but callback triggered??\n");
295         return 1; /* Simply ignore this! */
296 }
297
298 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
299 {
300         struct hash_desc desc;
301         struct scatterlist sg;
302         struct bio_vec *bvec;
303         int i;
304
305         desc.tfm = tfm;
306         desc.flags = 0;
307
308         sg_init_table(&sg, 1);
309         crypto_hash_init(&desc);
310
311         __bio_for_each_segment(bvec, bio, i, 0) {
312                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
313                 crypto_hash_update(&desc, &sg, sg.length);
314         }
315         crypto_hash_final(&desc, digest);
316 }
317
318 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
319 {
320         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
321         int digest_size;
322         void *digest;
323         int ok;
324
325         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
326
327         if (unlikely(cancel)) {
328                 drbd_free_ee(mdev, e);
329                 return 1;
330         }
331
332         if (likely(drbd_bio_uptodate(e->private_bio))) {
333                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334                 digest = kmalloc(digest_size, GFP_NOIO);
335                 if (digest) {
336                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
337
338                         inc_rs_pending(mdev);
339                         ok = drbd_send_drequest_csum(mdev,
340                                                      e->sector,
341                                                      e->size,
342                                                      digest,
343                                                      digest_size,
344                                                      P_CSUM_RS_REQUEST);
345                         kfree(digest);
346                 } else {
347                         dev_err(DEV, "kmalloc() of digest failed.\n");
348                         ok = 0;
349                 }
350         } else
351                 ok = 1;
352
353         drbd_free_ee(mdev, e);
354
355         if (unlikely(!ok))
356                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
357         return ok;
358 }
359
360 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
361
362 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
363 {
364         struct drbd_epoch_entry *e;
365
366         if (!get_ldev(mdev))
367                 return 0;
368
369         /* GFP_TRY, because if there is no memory available right now, this may
370          * be rescheduled for later. It is "only" background resync, after all. */
371         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372         if (!e) {
373                 put_ldev(mdev);
374                 return 2;
375         }
376
377         spin_lock_irq(&mdev->req_lock);
378         list_add(&e->w.list, &mdev->read_ee);
379         spin_unlock_irq(&mdev->req_lock);
380
381         e->private_bio->bi_end_io = drbd_endio_read_sec;
382         e->private_bio->bi_rw = READ;
383         e->w.cb = w_e_send_csum;
384
385         mdev->read_cnt += size >> 9;
386         drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
387
388         return 1;
389 }
390
391 void resync_timer_fn(unsigned long data)
392 {
393         unsigned long flags;
394         struct drbd_conf *mdev = (struct drbd_conf *) data;
395         int queue;
396
397         spin_lock_irqsave(&mdev->req_lock, flags);
398
399         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
400                 queue = 1;
401                 if (mdev->state.conn == C_VERIFY_S)
402                         mdev->resync_work.cb = w_make_ov_request;
403                 else
404                         mdev->resync_work.cb = w_make_resync_request;
405         } else {
406                 queue = 0;
407                 mdev->resync_work.cb = w_resync_inactive;
408         }
409
410         spin_unlock_irqrestore(&mdev->req_lock, flags);
411
412         /* harmless race: list_empty outside data.work.q_lock */
413         if (list_empty(&mdev->resync_work.list) && queue)
414                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
415 }
416
417 int w_make_resync_request(struct drbd_conf *mdev,
418                 struct drbd_work *w, int cancel)
419 {
420         unsigned long bit;
421         sector_t sector;
422         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
423         int max_segment_size = queue_max_segment_size(mdev->rq_queue);
424         int number, i, size, pe, mx;
425         int align, queued, sndbuf;
426
427         if (unlikely(cancel))
428                 return 1;
429
430         if (unlikely(mdev->state.conn < C_CONNECTED)) {
431                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
432                 return 0;
433         }
434
435         if (mdev->state.conn != C_SYNC_TARGET)
436                 dev_err(DEV, "%s in w_make_resync_request\n",
437                         drbd_conn_str(mdev->state.conn));
438
439         if (!get_ldev(mdev)) {
440                 /* Since we only need to access mdev->rsync a
441                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
442                    to continue resync with a broken disk makes no sense at
443                    all */
444                 dev_err(DEV, "Disk broke down during resync!\n");
445                 mdev->resync_work.cb = w_resync_inactive;
446                 return 1;
447         }
448
449         number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
450         pe = atomic_read(&mdev->rs_pending_cnt);
451
452         mutex_lock(&mdev->data.mutex);
453         if (mdev->data.socket)
454                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
455         else
456                 mx = 1;
457         mutex_unlock(&mdev->data.mutex);
458
459         /* For resync rates >160MB/sec, allow more pending RS requests */
460         if (number > mx)
461                 mx = number;
462
463         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
464         if ((pe + number) > mx) {
465                 number = mx - pe;
466         }
467
468         for (i = 0; i < number; i++) {
469                 /* Stop generating RS requests, when half of the send buffer is filled */
470                 mutex_lock(&mdev->data.mutex);
471                 if (mdev->data.socket) {
472                         queued = mdev->data.socket->sk->sk_wmem_queued;
473                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
474                 } else {
475                         queued = 1;
476                         sndbuf = 0;
477                 }
478                 mutex_unlock(&mdev->data.mutex);
479                 if (queued > sndbuf / 2)
480                         goto requeue;
481
482 next_sector:
483                 size = BM_BLOCK_SIZE;
484                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
485
486                 if (bit == -1UL) {
487                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
488                         mdev->resync_work.cb = w_resync_inactive;
489                         put_ldev(mdev);
490                         return 1;
491                 }
492
493                 sector = BM_BIT_TO_SECT(bit);
494
495                 if (drbd_try_rs_begin_io(mdev, sector)) {
496                         mdev->bm_resync_fo = bit;
497                         goto requeue;
498                 }
499                 mdev->bm_resync_fo = bit + 1;
500
501                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
502                         drbd_rs_complete_io(mdev, sector);
503                         goto next_sector;
504                 }
505
506 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
507                 /* try to find some adjacent bits.
508                  * we stop if we have already the maximum req size.
509                  *
510                  * Additionally always align bigger requests, in order to
511                  * be prepared for all stripe sizes of software RAIDs.
512                  *
513                  * we _do_ care about the agreed-upon q->max_segment_size
514                  * here, as splitting up the requests on the other side is more
515                  * difficult.  the consequence is, that on lvm and md and other
516                  * "indirect" devices, this is dead code, since
517                  * q->max_segment_size will be PAGE_SIZE.
518                  */
519                 align = 1;
520                 for (;;) {
521                         if (size + BM_BLOCK_SIZE > max_segment_size)
522                                 break;
523
524                         /* Be always aligned */
525                         if (sector & ((1<<(align+3))-1))
526                                 break;
527
528                         /* do not cross extent boundaries */
529                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
530                                 break;
531                         /* now, is it actually dirty, after all?
532                          * caution, drbd_bm_test_bit is tri-state for some
533                          * obscure reason; ( b == 0 ) would get the out-of-band
534                          * only accidentally right because of the "oddly sized"
535                          * adjustment below */
536                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
537                                 break;
538                         bit++;
539                         size += BM_BLOCK_SIZE;
540                         if ((BM_BLOCK_SIZE << align) <= size)
541                                 align++;
542                         i++;
543                 }
544                 /* if we merged some,
545                  * reset the offset to start the next drbd_bm_find_next from */
546                 if (size > BM_BLOCK_SIZE)
547                         mdev->bm_resync_fo = bit + 1;
548 #endif
549
550                 /* adjust very last sectors, in case we are oddly sized */
551                 if (sector + (size>>9) > capacity)
552                         size = (capacity-sector)<<9;
553                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
554                         switch (read_for_csum(mdev, sector, size)) {
555                         case 0: /* Disk failure*/
556                                 put_ldev(mdev);
557                                 return 0;
558                         case 2: /* Allocation failed */
559                                 drbd_rs_complete_io(mdev, sector);
560                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
561                                 goto requeue;
562                         /* case 1: everything ok */
563                         }
564                 } else {
565                         inc_rs_pending(mdev);
566                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
567                                                sector, size, ID_SYNCER)) {
568                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
569                                 dec_rs_pending(mdev);
570                                 put_ldev(mdev);
571                                 return 0;
572                         }
573                 }
574         }
575
576         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
577                 /* last syncer _request_ was sent,
578                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
579                  * next sync group will resume), as soon as we receive the last
580                  * resync data block, and the last bit is cleared.
581                  * until then resync "work" is "inactive" ...
582                  */
583                 mdev->resync_work.cb = w_resync_inactive;
584                 put_ldev(mdev);
585                 return 1;
586         }
587
588  requeue:
589         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
590         put_ldev(mdev);
591         return 1;
592 }
593
594 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
595 {
596         int number, i, size;
597         sector_t sector;
598         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
599
600         if (unlikely(cancel))
601                 return 1;
602
603         if (unlikely(mdev->state.conn < C_CONNECTED)) {
604                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
605                 return 0;
606         }
607
608         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
609         if (atomic_read(&mdev->rs_pending_cnt) > number)
610                 goto requeue;
611
612         number -= atomic_read(&mdev->rs_pending_cnt);
613
614         sector = mdev->ov_position;
615         for (i = 0; i < number; i++) {
616                 if (sector >= capacity) {
617                         mdev->resync_work.cb = w_resync_inactive;
618                         return 1;
619                 }
620
621                 size = BM_BLOCK_SIZE;
622
623                 if (drbd_try_rs_begin_io(mdev, sector)) {
624                         mdev->ov_position = sector;
625                         goto requeue;
626                 }
627
628                 if (sector + (size>>9) > capacity)
629                         size = (capacity-sector)<<9;
630
631                 inc_rs_pending(mdev);
632                 if (!drbd_send_ov_request(mdev, sector, size)) {
633                         dec_rs_pending(mdev);
634                         return 0;
635                 }
636                 sector += BM_SECT_PER_BIT;
637         }
638         mdev->ov_position = sector;
639
640  requeue:
641         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
642         return 1;
643 }
644
645
646 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
647 {
648         kfree(w);
649         ov_oos_print(mdev);
650         drbd_resync_finished(mdev);
651
652         return 1;
653 }
654
655 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
656 {
657         kfree(w);
658
659         drbd_resync_finished(mdev);
660
661         return 1;
662 }
663
664 int drbd_resync_finished(struct drbd_conf *mdev)
665 {
666         unsigned long db, dt, dbdt;
667         unsigned long n_oos;
668         union drbd_state os, ns;
669         struct drbd_work *w;
670         char *khelper_cmd = NULL;
671
672         /* Remove all elements from the resync LRU. Since future actions
673          * might set bits in the (main) bitmap, then the entries in the
674          * resync LRU would be wrong. */
675         if (drbd_rs_del_all(mdev)) {
676                 /* In case this is not possible now, most probably because
677                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
678                  * queue (or even the read operations for those packets
679                  * is not finished by now).   Retry in 100ms. */
680
681                 drbd_kick_lo(mdev);
682                 __set_current_state(TASK_INTERRUPTIBLE);
683                 schedule_timeout(HZ / 10);
684                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
685                 if (w) {
686                         w->cb = w_resync_finished;
687                         drbd_queue_work(&mdev->data.work, w);
688                         return 1;
689                 }
690                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
691         }
692
693         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
694         if (dt <= 0)
695                 dt = 1;
696         db = mdev->rs_total;
697         dbdt = Bit2KB(db/dt);
698         mdev->rs_paused /= HZ;
699
700         if (!get_ldev(mdev))
701                 goto out;
702
703         spin_lock_irq(&mdev->req_lock);
704         os = mdev->state;
705
706         /* This protects us against multiple calls (that can happen in the presence
707            of application IO), and against connectivity loss just before we arrive here. */
708         if (os.conn <= C_CONNECTED)
709                 goto out_unlock;
710
711         ns = os;
712         ns.conn = C_CONNECTED;
713
714         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
715              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
716              "Online verify " : "Resync",
717              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
718
719         n_oos = drbd_bm_total_weight(mdev);
720
721         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
722                 if (n_oos) {
723                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
724                               n_oos, Bit2KB(1));
725                         khelper_cmd = "out-of-sync";
726                 }
727         } else {
728                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
729
730                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
731                         khelper_cmd = "after-resync-target";
732
733                 if (mdev->csums_tfm && mdev->rs_total) {
734                         const unsigned long s = mdev->rs_same_csum;
735                         const unsigned long t = mdev->rs_total;
736                         const int ratio =
737                                 (t == 0)     ? 0 :
738                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
739                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
740                              "transferred %luK total %luK\n",
741                              ratio,
742                              Bit2KB(mdev->rs_same_csum),
743                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
744                              Bit2KB(mdev->rs_total));
745                 }
746         }
747
748         if (mdev->rs_failed) {
749                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
750
751                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
752                         ns.disk = D_INCONSISTENT;
753                         ns.pdsk = D_UP_TO_DATE;
754                 } else {
755                         ns.disk = D_UP_TO_DATE;
756                         ns.pdsk = D_INCONSISTENT;
757                 }
758         } else {
759                 ns.disk = D_UP_TO_DATE;
760                 ns.pdsk = D_UP_TO_DATE;
761
762                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
763                         if (mdev->p_uuid) {
764                                 int i;
765                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
766                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
767                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
768                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
769                         } else {
770                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
771                         }
772                 }
773
774                 drbd_uuid_set_bm(mdev, 0UL);
775
776                 if (mdev->p_uuid) {
777                         /* Now the two UUID sets are equal, update what we
778                          * know of the peer. */
779                         int i;
780                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
781                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
782                 }
783         }
784
785         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
786 out_unlock:
787         spin_unlock_irq(&mdev->req_lock);
788         put_ldev(mdev);
789 out:
790         mdev->rs_total  = 0;
791         mdev->rs_failed = 0;
792         mdev->rs_paused = 0;
793         mdev->ov_start_sector = 0;
794
795         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
796                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
797                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
798         }
799
800         if (khelper_cmd)
801                 drbd_khelper(mdev, khelper_cmd);
802
803         return 1;
804 }
805
806 /* helper */
807 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
808 {
809         if (drbd_bio_has_active_page(e->private_bio)) {
810                 /* This might happen if sendpage() has not finished */
811                 spin_lock_irq(&mdev->req_lock);
812                 list_add_tail(&e->w.list, &mdev->net_ee);
813                 spin_unlock_irq(&mdev->req_lock);
814         } else
815                 drbd_free_ee(mdev, e);
816 }
817
818 /**
819  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
820  * @mdev:       DRBD device.
821  * @w:          work object.
822  * @cancel:     The connection will be closed anyways
823  */
824 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
825 {
826         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
827         int ok;
828
829         if (unlikely(cancel)) {
830                 drbd_free_ee(mdev, e);
831                 dec_unacked(mdev);
832                 return 1;
833         }
834
835         if (likely(drbd_bio_uptodate(e->private_bio))) {
836                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
837         } else {
838                 if (__ratelimit(&drbd_ratelimit_state))
839                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
840                             (unsigned long long)e->sector);
841
842                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
843         }
844
845         dec_unacked(mdev);
846
847         move_to_net_ee_or_free(mdev, e);
848
849         if (unlikely(!ok))
850                 dev_err(DEV, "drbd_send_block() failed\n");
851         return ok;
852 }
853
854 /**
855  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
856  * @mdev:       DRBD device.
857  * @w:          work object.
858  * @cancel:     The connection will be closed anyways
859  */
860 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
861 {
862         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
863         int ok;
864
865         if (unlikely(cancel)) {
866                 drbd_free_ee(mdev, e);
867                 dec_unacked(mdev);
868                 return 1;
869         }
870
871         if (get_ldev_if_state(mdev, D_FAILED)) {
872                 drbd_rs_complete_io(mdev, e->sector);
873                 put_ldev(mdev);
874         }
875
876         if (likely(drbd_bio_uptodate(e->private_bio))) {
877                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
878                         inc_rs_pending(mdev);
879                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
880                 } else {
881                         if (__ratelimit(&drbd_ratelimit_state))
882                                 dev_err(DEV, "Not sending RSDataReply, "
883                                     "partner DISKLESS!\n");
884                         ok = 1;
885                 }
886         } else {
887                 if (__ratelimit(&drbd_ratelimit_state))
888                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
889                             (unsigned long long)e->sector);
890
891                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
892
893                 /* update resync data with failure */
894                 drbd_rs_failed_io(mdev, e->sector, e->size);
895         }
896
897         dec_unacked(mdev);
898
899         move_to_net_ee_or_free(mdev, e);
900
901         if (unlikely(!ok))
902                 dev_err(DEV, "drbd_send_block() failed\n");
903         return ok;
904 }
905
906 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
907 {
908         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
909         struct digest_info *di;
910         int digest_size;
911         void *digest = NULL;
912         int ok, eq = 0;
913
914         if (unlikely(cancel)) {
915                 drbd_free_ee(mdev, e);
916                 dec_unacked(mdev);
917                 return 1;
918         }
919
920         drbd_rs_complete_io(mdev, e->sector);
921
922         di = (struct digest_info *)(unsigned long)e->block_id;
923
924         if (likely(drbd_bio_uptodate(e->private_bio))) {
925                 /* quick hack to try to avoid a race against reconfiguration.
926                  * a real fix would be much more involved,
927                  * introducing more locking mechanisms */
928                 if (mdev->csums_tfm) {
929                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
930                         D_ASSERT(digest_size == di->digest_size);
931                         digest = kmalloc(digest_size, GFP_NOIO);
932                 }
933                 if (digest) {
934                         drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
935                         eq = !memcmp(digest, di->digest, digest_size);
936                         kfree(digest);
937                 }
938
939                 if (eq) {
940                         drbd_set_in_sync(mdev, e->sector, e->size);
941                         /* rs_same_csums unit is BM_BLOCK_SIZE */
942                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
943                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
944                 } else {
945                         inc_rs_pending(mdev);
946                         e->block_id = ID_SYNCER;
947                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
948                 }
949         } else {
950                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
951                 if (__ratelimit(&drbd_ratelimit_state))
952                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
953         }
954
955         dec_unacked(mdev);
956
957         kfree(di);
958
959         move_to_net_ee_or_free(mdev, e);
960
961         if (unlikely(!ok))
962                 dev_err(DEV, "drbd_send_block/ack() failed\n");
963         return ok;
964 }
965
966 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
967 {
968         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
969         int digest_size;
970         void *digest;
971         int ok = 1;
972
973         if (unlikely(cancel))
974                 goto out;
975
976         if (unlikely(!drbd_bio_uptodate(e->private_bio)))
977                 goto out;
978
979         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
980         /* FIXME if this allocation fails, online verify will not terminate! */
981         digest = kmalloc(digest_size, GFP_NOIO);
982         if (digest) {
983                 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
984                 inc_rs_pending(mdev);
985                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
986                                              digest, digest_size, P_OV_REPLY);
987                 if (!ok)
988                         dec_rs_pending(mdev);
989                 kfree(digest);
990         }
991
992 out:
993         drbd_free_ee(mdev, e);
994
995         dec_unacked(mdev);
996
997         return ok;
998 }
999
1000 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1001 {
1002         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1003                 mdev->ov_last_oos_size += size>>9;
1004         } else {
1005                 mdev->ov_last_oos_start = sector;
1006                 mdev->ov_last_oos_size = size>>9;
1007         }
1008         drbd_set_out_of_sync(mdev, sector, size);
1009         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1010 }
1011
1012 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1013 {
1014         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1015         struct digest_info *di;
1016         int digest_size;
1017         void *digest;
1018         int ok, eq = 0;
1019
1020         if (unlikely(cancel)) {
1021                 drbd_free_ee(mdev, e);
1022                 dec_unacked(mdev);
1023                 return 1;
1024         }
1025
1026         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1027          * the resync lru has been cleaned up already */
1028         drbd_rs_complete_io(mdev, e->sector);
1029
1030         di = (struct digest_info *)(unsigned long)e->block_id;
1031
1032         if (likely(drbd_bio_uptodate(e->private_bio))) {
1033                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1034                 digest = kmalloc(digest_size, GFP_NOIO);
1035                 if (digest) {
1036                         drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1037
1038                         D_ASSERT(digest_size == di->digest_size);
1039                         eq = !memcmp(digest, di->digest, digest_size);
1040                         kfree(digest);
1041                 }
1042         } else {
1043                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1044                 if (__ratelimit(&drbd_ratelimit_state))
1045                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1046         }
1047
1048         dec_unacked(mdev);
1049
1050         kfree(di);
1051
1052         if (!eq)
1053                 drbd_ov_oos_found(mdev, e->sector, e->size);
1054         else
1055                 ov_oos_print(mdev);
1056
1057         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1058                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1059
1060         drbd_free_ee(mdev, e);
1061
1062         if (--mdev->ov_left == 0) {
1063                 ov_oos_print(mdev);
1064                 drbd_resync_finished(mdev);
1065         }
1066
1067         return ok;
1068 }
1069
1070 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1071 {
1072         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1073         complete(&b->done);
1074         return 1;
1075 }
1076
1077 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1078 {
1079         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1080         struct p_barrier *p = &mdev->data.sbuf.barrier;
1081         int ok = 1;
1082
1083         /* really avoid racing with tl_clear.  w.cb may have been referenced
1084          * just before it was reassigned and re-queued, so double check that.
1085          * actually, this race was harmless, since we only try to send the
1086          * barrier packet here, and otherwise do nothing with the object.
1087          * but compare with the head of w_clear_epoch */
1088         spin_lock_irq(&mdev->req_lock);
1089         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1090                 cancel = 1;
1091         spin_unlock_irq(&mdev->req_lock);
1092         if (cancel)
1093                 return 1;
1094
1095         if (!drbd_get_data_sock(mdev))
1096                 return 0;
1097         p->barrier = b->br_number;
1098         /* inc_ap_pending was done where this was queued.
1099          * dec_ap_pending will be done in got_BarrierAck
1100          * or (on connection loss) in w_clear_epoch.  */
1101         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1102                                 (struct p_header *)p, sizeof(*p), 0);
1103         drbd_put_data_sock(mdev);
1104
1105         return ok;
1106 }
1107
1108 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1109 {
1110         if (cancel)
1111                 return 1;
1112         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1113 }
1114
1115 /**
1116  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1117  * @mdev:       DRBD device.
1118  * @w:          work object.
1119  * @cancel:     The connection will be closed anyways
1120  */
1121 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122 {
1123         struct drbd_request *req = container_of(w, struct drbd_request, w);
1124         int ok;
1125
1126         if (unlikely(cancel)) {
1127                 req_mod(req, send_canceled);
1128                 return 1;
1129         }
1130
1131         ok = drbd_send_dblock(mdev, req);
1132         req_mod(req, ok ? handed_over_to_network : send_failed);
1133
1134         return ok;
1135 }
1136
1137 /**
1138  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1139  * @mdev:       DRBD device.
1140  * @w:          work object.
1141  * @cancel:     The connection will be closed anyways
1142  */
1143 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1144 {
1145         struct drbd_request *req = container_of(w, struct drbd_request, w);
1146         int ok;
1147
1148         if (unlikely(cancel)) {
1149                 req_mod(req, send_canceled);
1150                 return 1;
1151         }
1152
1153         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1154                                 (unsigned long)req);
1155
1156         if (!ok) {
1157                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1158                  * so this is probably redundant */
1159                 if (mdev->state.conn >= C_CONNECTED)
1160                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1161         }
1162         req_mod(req, ok ? handed_over_to_network : send_failed);
1163
1164         return ok;
1165 }
1166
1167 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1168 {
1169         struct drbd_conf *odev = mdev;
1170
1171         while (1) {
1172                 if (odev->sync_conf.after == -1)
1173                         return 1;
1174                 odev = minor_to_mdev(odev->sync_conf.after);
1175                 ERR_IF(!odev) return 1;
1176                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1177                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1178                     odev->state.aftr_isp || odev->state.peer_isp ||
1179                     odev->state.user_isp)
1180                         return 0;
1181         }
1182 }
1183
1184 /**
1185  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1186  * @mdev:       DRBD device.
1187  *
1188  * Called from process context only (admin command and after_state_ch).
1189  */
1190 static int _drbd_pause_after(struct drbd_conf *mdev)
1191 {
1192         struct drbd_conf *odev;
1193         int i, rv = 0;
1194
1195         for (i = 0; i < minor_count; i++) {
1196                 odev = minor_to_mdev(i);
1197                 if (!odev)
1198                         continue;
1199                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1200                         continue;
1201                 if (!_drbd_may_sync_now(odev))
1202                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1203                                != SS_NOTHING_TO_DO);
1204         }
1205
1206         return rv;
1207 }
1208
1209 /**
1210  * _drbd_resume_next() - Resume resync on all devices that may resync now
1211  * @mdev:       DRBD device.
1212  *
1213  * Called from process context only (admin command and worker).
1214  */
1215 static int _drbd_resume_next(struct drbd_conf *mdev)
1216 {
1217         struct drbd_conf *odev;
1218         int i, rv = 0;
1219
1220         for (i = 0; i < minor_count; i++) {
1221                 odev = minor_to_mdev(i);
1222                 if (!odev)
1223                         continue;
1224                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1225                         continue;
1226                 if (odev->state.aftr_isp) {
1227                         if (_drbd_may_sync_now(odev))
1228                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1229                                                         CS_HARD, NULL)
1230                                        != SS_NOTHING_TO_DO) ;
1231                 }
1232         }
1233         return rv;
1234 }
1235
1236 void resume_next_sg(struct drbd_conf *mdev)
1237 {
1238         write_lock_irq(&global_state_lock);
1239         _drbd_resume_next(mdev);
1240         write_unlock_irq(&global_state_lock);
1241 }
1242
1243 void suspend_other_sg(struct drbd_conf *mdev)
1244 {
1245         write_lock_irq(&global_state_lock);
1246         _drbd_pause_after(mdev);
1247         write_unlock_irq(&global_state_lock);
1248 }
1249
1250 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1251 {
1252         struct drbd_conf *odev;
1253
1254         if (o_minor == -1)
1255                 return NO_ERROR;
1256         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1257                 return ERR_SYNC_AFTER;
1258
1259         /* check for loops */
1260         odev = minor_to_mdev(o_minor);
1261         while (1) {
1262                 if (odev == mdev)
1263                         return ERR_SYNC_AFTER_CYCLE;
1264
1265                 /* dependency chain ends here, no cycles. */
1266                 if (odev->sync_conf.after == -1)
1267                         return NO_ERROR;
1268
1269                 /* follow the dependency chain */
1270                 odev = minor_to_mdev(odev->sync_conf.after);
1271         }
1272 }
1273
1274 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1275 {
1276         int changes;
1277         int retcode;
1278
1279         write_lock_irq(&global_state_lock);
1280         retcode = sync_after_error(mdev, na);
1281         if (retcode == NO_ERROR) {
1282                 mdev->sync_conf.after = na;
1283                 do {
1284                         changes  = _drbd_pause_after(mdev);
1285                         changes |= _drbd_resume_next(mdev);
1286                 } while (changes);
1287         }
1288         write_unlock_irq(&global_state_lock);
1289         return retcode;
1290 }
1291
1292 static void ping_peer(struct drbd_conf *mdev)
1293 {
1294         clear_bit(GOT_PING_ACK, &mdev->flags);
1295         request_ping(mdev);
1296         wait_event(mdev->misc_wait,
1297                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1298 }
1299
1300 /**
1301  * drbd_start_resync() - Start the resync process
1302  * @mdev:       DRBD device.
1303  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1304  *
1305  * This function might bring you directly into one of the
1306  * C_PAUSED_SYNC_* states.
1307  */
1308 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1309 {
1310         union drbd_state ns;
1311         int r;
1312
1313         if (mdev->state.conn >= C_SYNC_SOURCE) {
1314                 dev_err(DEV, "Resync already running!\n");
1315                 return;
1316         }
1317
1318         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1319         drbd_rs_cancel_all(mdev);
1320
1321         if (side == C_SYNC_TARGET) {
1322                 /* Since application IO was locked out during C_WF_BITMAP_T and
1323                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1324                    we check that we might make the data inconsistent. */
1325                 r = drbd_khelper(mdev, "before-resync-target");
1326                 r = (r >> 8) & 0xff;
1327                 if (r > 0) {
1328                         dev_info(DEV, "before-resync-target handler returned %d, "
1329                              "dropping connection.\n", r);
1330                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1331                         return;
1332                 }
1333         }
1334
1335         drbd_state_lock(mdev);
1336
1337         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1338                 drbd_state_unlock(mdev);
1339                 return;
1340         }
1341
1342         if (side == C_SYNC_TARGET) {
1343                 mdev->bm_resync_fo = 0;
1344         } else /* side == C_SYNC_SOURCE */ {
1345                 u64 uuid;
1346
1347                 get_random_bytes(&uuid, sizeof(u64));
1348                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1349                 drbd_send_sync_uuid(mdev, uuid);
1350
1351                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1352         }
1353
1354         write_lock_irq(&global_state_lock);
1355         ns = mdev->state;
1356
1357         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1358
1359         ns.conn = side;
1360
1361         if (side == C_SYNC_TARGET)
1362                 ns.disk = D_INCONSISTENT;
1363         else /* side == C_SYNC_SOURCE */
1364                 ns.pdsk = D_INCONSISTENT;
1365
1366         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1367         ns = mdev->state;
1368
1369         if (ns.conn < C_CONNECTED)
1370                 r = SS_UNKNOWN_ERROR;
1371
1372         if (r == SS_SUCCESS) {
1373                 mdev->rs_total     =
1374                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1375                 mdev->rs_failed    = 0;
1376                 mdev->rs_paused    = 0;
1377                 mdev->rs_start     =
1378                 mdev->rs_mark_time = jiffies;
1379                 mdev->rs_same_csum = 0;
1380                 _drbd_pause_after(mdev);
1381         }
1382         write_unlock_irq(&global_state_lock);
1383         put_ldev(mdev);
1384
1385         if (r == SS_SUCCESS) {
1386                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1387                      drbd_conn_str(ns.conn),
1388                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1389                      (unsigned long) mdev->rs_total);
1390
1391                 if (mdev->rs_total == 0) {
1392                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1393                         ping_peer(mdev);
1394                         drbd_resync_finished(mdev);
1395                 }
1396
1397                 /* ns.conn may already be != mdev->state.conn,
1398                  * we may have been paused in between, or become paused until
1399                  * the timer triggers.
1400                  * No matter, that is handled in resync_timer_fn() */
1401                 if (ns.conn == C_SYNC_TARGET)
1402                         mod_timer(&mdev->resync_timer, jiffies);
1403
1404                 drbd_md_sync(mdev);
1405         }
1406         drbd_state_unlock(mdev);
1407 }
1408
1409 int drbd_worker(struct drbd_thread *thi)
1410 {
1411         struct drbd_conf *mdev = thi->mdev;
1412         struct drbd_work *w = NULL;
1413         LIST_HEAD(work_list);
1414         int intr = 0, i;
1415
1416         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1417
1418         while (get_t_state(thi) == Running) {
1419                 drbd_thread_current_set_cpu(mdev);
1420
1421                 if (down_trylock(&mdev->data.work.s)) {
1422                         mutex_lock(&mdev->data.mutex);
1423                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1424                                 drbd_tcp_uncork(mdev->data.socket);
1425                         mutex_unlock(&mdev->data.mutex);
1426
1427                         intr = down_interruptible(&mdev->data.work.s);
1428
1429                         mutex_lock(&mdev->data.mutex);
1430                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1431                                 drbd_tcp_cork(mdev->data.socket);
1432                         mutex_unlock(&mdev->data.mutex);
1433                 }
1434
1435                 if (intr) {
1436                         D_ASSERT(intr == -EINTR);
1437                         flush_signals(current);
1438                         ERR_IF (get_t_state(thi) == Running)
1439                                 continue;
1440                         break;
1441                 }
1442
1443                 if (get_t_state(thi) != Running)
1444                         break;
1445                 /* With this break, we have done a down() but not consumed
1446                    the entry from the list. The cleanup code takes care of
1447                    this...   */
1448
1449                 w = NULL;
1450                 spin_lock_irq(&mdev->data.work.q_lock);
1451                 ERR_IF(list_empty(&mdev->data.work.q)) {
1452                         /* something terribly wrong in our logic.
1453                          * we were able to down() the semaphore,
1454                          * but the list is empty... doh.
1455                          *
1456                          * what is the best thing to do now?
1457                          * try again from scratch, restarting the receiver,
1458                          * asender, whatnot? could break even more ugly,
1459                          * e.g. when we are primary, but no good local data.
1460                          *
1461                          * I'll try to get away just starting over this loop.
1462                          */
1463                         spin_unlock_irq(&mdev->data.work.q_lock);
1464                         continue;
1465                 }
1466                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1467                 list_del_init(&w->list);
1468                 spin_unlock_irq(&mdev->data.work.q_lock);
1469
1470                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1471                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1472                         if (mdev->state.conn >= C_CONNECTED)
1473                                 drbd_force_state(mdev,
1474                                                 NS(conn, C_NETWORK_FAILURE));
1475                 }
1476         }
1477         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1478         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1479
1480         spin_lock_irq(&mdev->data.work.q_lock);
1481         i = 0;
1482         while (!list_empty(&mdev->data.work.q)) {
1483                 list_splice_init(&mdev->data.work.q, &work_list);
1484                 spin_unlock_irq(&mdev->data.work.q_lock);
1485
1486                 while (!list_empty(&work_list)) {
1487                         w = list_entry(work_list.next, struct drbd_work, list);
1488                         list_del_init(&w->list);
1489                         w->cb(mdev, w, 1);
1490                         i++; /* dead debugging code */
1491                 }
1492
1493                 spin_lock_irq(&mdev->data.work.q_lock);
1494         }
1495         sema_init(&mdev->data.work.s, 0);
1496         /* DANGEROUS race: if someone did queue his work within the spinlock,
1497          * but up() ed outside the spinlock, we could get an up() on the
1498          * semaphore without corresponding list entry.
1499          * So don't do that.
1500          */
1501         spin_unlock_irq(&mdev->data.work.q_lock);
1502
1503         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1504         /* _drbd_set_state only uses stop_nowait.
1505          * wait here for the Exiting receiver. */
1506         drbd_thread_stop(&mdev->receiver);
1507         drbd_mdev_cleanup(mdev);
1508
1509         dev_info(DEV, "worker terminated\n");
1510
1511         clear_bit(DEVICE_DYING, &mdev->flags);
1512         clear_bit(CONFIG_PENDING, &mdev->flags);
1513         wake_up(&mdev->state_wait);
1514
1515         return 0;
1516 }