Merge branch 'for-2.6.39/drivers' of git://git.kernel.dk/linux-2.6-block
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73
74         md_io = (struct drbd_md_io *)bio->bi_private;
75         md_io->error = error;
76
77         complete(&md_io->event);
78 }
79
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84 {
85         unsigned long flags = 0;
86         struct drbd_conf *mdev = e->mdev;
87
88         D_ASSERT(e->block_id != ID_VACANT);
89
90         spin_lock_irqsave(&mdev->req_lock, flags);
91         mdev->read_cnt += e->size >> 9;
92         list_del(&e->w.list);
93         if (list_empty(&mdev->read_ee))
94                 wake_up(&mdev->ee_wait);
95         if (test_bit(__EE_WAS_ERROR, &e->flags))
96                 __drbd_chk_io_error(mdev, false);
97         spin_unlock_irqrestore(&mdev->req_lock, flags);
98
99         drbd_queue_work(&mdev->data.work, &e->w);
100         put_ldev(mdev);
101 }
102
103 /* writes on behalf of the partner, or resync writes,
104  * "submitted" by the receiver, final stage.  */
105 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
106 {
107         unsigned long flags = 0;
108         struct drbd_conf *mdev = e->mdev;
109         sector_t e_sector;
110         int do_wake;
111         int is_syncer_req;
112         int do_al_complete_io;
113
114         D_ASSERT(e->block_id != ID_VACANT);
115
116         /* after we moved e to done_ee,
117          * we may no longer access it,
118          * it may be freed/reused already!
119          * (as soon as we release the req_lock) */
120         e_sector = e->sector;
121         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
122         is_syncer_req = is_syncer_block_id(e->block_id);
123
124         spin_lock_irqsave(&mdev->req_lock, flags);
125         mdev->writ_cnt += e->size >> 9;
126         list_del(&e->w.list); /* has been on active_ee or sync_ee */
127         list_add_tail(&e->w.list, &mdev->done_ee);
128
129         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
130          * neither did we wake possibly waiting conflicting requests.
131          * done from "drbd_process_done_ee" within the appropriate w.cb
132          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
133
134         do_wake = is_syncer_req
135                 ? list_empty(&mdev->sync_ee)
136                 : list_empty(&mdev->active_ee);
137
138         if (test_bit(__EE_WAS_ERROR, &e->flags))
139                 __drbd_chk_io_error(mdev, false);
140         spin_unlock_irqrestore(&mdev->req_lock, flags);
141
142         if (is_syncer_req)
143                 drbd_rs_complete_io(mdev, e_sector);
144
145         if (do_wake)
146                 wake_up(&mdev->ee_wait);
147
148         if (do_al_complete_io)
149                 drbd_al_complete_io(mdev, e_sector);
150
151         wake_asender(mdev);
152         put_ldev(mdev);
153 }
154
155 /* writes on behalf of the partner, or resync writes,
156  * "submitted" by the receiver.
157  */
158 void drbd_endio_sec(struct bio *bio, int error)
159 {
160         struct drbd_epoch_entry *e = bio->bi_private;
161         struct drbd_conf *mdev = e->mdev;
162         int uptodate = bio_flagged(bio, BIO_UPTODATE);
163         int is_write = bio_data_dir(bio) == WRITE;
164
165         if (error && __ratelimit(&drbd_ratelimit_state))
166                 dev_warn(DEV, "%s: error=%d s=%llus\n",
167                                 is_write ? "write" : "read", error,
168                                 (unsigned long long)e->sector);
169         if (!error && !uptodate) {
170                 if (__ratelimit(&drbd_ratelimit_state))
171                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172                                         is_write ? "write" : "read",
173                                         (unsigned long long)e->sector);
174                 /* strange behavior of some lower level drivers...
175                  * fail the request by clearing the uptodate flag,
176                  * but do not return any error?! */
177                 error = -EIO;
178         }
179
180         if (error)
181                 set_bit(__EE_WAS_ERROR, &e->flags);
182
183         bio_put(bio); /* no need for the bio anymore */
184         if (atomic_dec_and_test(&e->pending_bios)) {
185                 if (is_write)
186                         drbd_endio_write_sec_final(e);
187                 else
188                         drbd_endio_read_sec_final(e);
189         }
190 }
191
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196         unsigned long flags;
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         struct bio_and_error m;
200         enum drbd_req_event what;
201         int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203         if (!error && !uptodate) {
204                 dev_warn(DEV, "p %s: setting error to -EIO\n",
205                          bio_data_dir(bio) == WRITE ? "write" : "read");
206                 /* strange behavior of some lower level drivers...
207                  * fail the request by clearing the uptodate flag,
208                  * but do not return any error?! */
209                 error = -EIO;
210         }
211
212         /* to avoid recursion in __req_mod */
213         if (unlikely(error)) {
214                 what = (bio_data_dir(bio) == WRITE)
215                         ? write_completed_with_error
216                         : (bio_rw(bio) == READ)
217                           ? read_completed_with_error
218                           : read_ahead_completed_with_error;
219         } else
220                 what = completed_ok;
221
222         bio_put(req->private_bio);
223         req->private_bio = ERR_PTR(error);
224
225         /* not req_mod(), we need irqsave here! */
226         spin_lock_irqsave(&mdev->req_lock, flags);
227         __req_mod(req, what, &m);
228         spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230         if (m.bio)
231                 complete_master_bio(mdev, &m);
232 }
233
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236         struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238         /* We should not detach for read io-error,
239          * but try to WRITE the P_DATA_REPLY to the failed location,
240          * to give the disk the chance to relocate that block */
241
242         spin_lock_irq(&mdev->req_lock);
243         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244                 _req_mod(req, read_retry_remote_canceled);
245                 spin_unlock_irq(&mdev->req_lock);
246                 return 1;
247         }
248         spin_unlock_irq(&mdev->req_lock);
249
250         return w_send_read_req(mdev, w, 0);
251 }
252
253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255         struct hash_desc desc;
256         struct scatterlist sg;
257         struct page *page = e->pages;
258         struct page *tmp;
259         unsigned len;
260
261         desc.tfm = tfm;
262         desc.flags = 0;
263
264         sg_init_table(&sg, 1);
265         crypto_hash_init(&desc);
266
267         while ((tmp = page_chain_next(page))) {
268                 /* all but the last page will be fully used */
269                 sg_set_page(&sg, page, PAGE_SIZE, 0);
270                 crypto_hash_update(&desc, &sg, sg.length);
271                 page = tmp;
272         }
273         /* and now the last, possibly only partially used page */
274         len = e->size & (PAGE_SIZE - 1);
275         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276         crypto_hash_update(&desc, &sg, sg.length);
277         crypto_hash_final(&desc, digest);
278 }
279
280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282         struct hash_desc desc;
283         struct scatterlist sg;
284         struct bio_vec *bvec;
285         int i;
286
287         desc.tfm = tfm;
288         desc.flags = 0;
289
290         sg_init_table(&sg, 1);
291         crypto_hash_init(&desc);
292
293         __bio_for_each_segment(bvec, bio, i, 0) {
294                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295                 crypto_hash_update(&desc, &sg, sg.length);
296         }
297         crypto_hash_final(&desc, digest);
298 }
299
300 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
301 {
302         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
303         int digest_size;
304         void *digest;
305         int ok;
306
307         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
308
309         if (unlikely(cancel)) {
310                 drbd_free_ee(mdev, e);
311                 return 1;
312         }
313
314         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
315                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
316                 digest = kmalloc(digest_size, GFP_NOIO);
317                 if (digest) {
318                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
319
320                         inc_rs_pending(mdev);
321                         ok = drbd_send_drequest_csum(mdev,
322                                                      e->sector,
323                                                      e->size,
324                                                      digest,
325                                                      digest_size,
326                                                      P_CSUM_RS_REQUEST);
327                         kfree(digest);
328                 } else {
329                         dev_err(DEV, "kmalloc() of digest failed.\n");
330                         ok = 0;
331                 }
332         } else
333                 ok = 1;
334
335         drbd_free_ee(mdev, e);
336
337         if (unlikely(!ok))
338                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
339         return ok;
340 }
341
342 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
343
344 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
345 {
346         struct drbd_epoch_entry *e;
347
348         if (!get_ldev(mdev))
349                 return -EIO;
350
351         if (drbd_rs_should_slow_down(mdev, sector))
352                 goto defer;
353
354         /* GFP_TRY, because if there is no memory available right now, this may
355          * be rescheduled for later. It is "only" background resync, after all. */
356         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
357         if (!e)
358                 goto defer;
359
360         e->w.cb = w_e_send_csum;
361         spin_lock_irq(&mdev->req_lock);
362         list_add(&e->w.list, &mdev->read_ee);
363         spin_unlock_irq(&mdev->req_lock);
364
365         atomic_add(size >> 9, &mdev->rs_sect_ev);
366         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
367                 return 0;
368
369         /* If it failed because of ENOMEM, retry should help.  If it failed
370          * because bio_add_page failed (probably broken lower level driver),
371          * retry may or may not help.
372          * If it does not, you may need to force disconnect. */
373         spin_lock_irq(&mdev->req_lock);
374         list_del(&e->w.list);
375         spin_unlock_irq(&mdev->req_lock);
376
377         drbd_free_ee(mdev, e);
378 defer:
379         put_ldev(mdev);
380         return -EAGAIN;
381 }
382
383 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
384 {
385         switch (mdev->state.conn) {
386         case C_VERIFY_S:
387                 w_make_ov_request(mdev, w, cancel);
388                 break;
389         case C_SYNC_TARGET:
390                 w_make_resync_request(mdev, w, cancel);
391                 break;
392         }
393
394         return 1;
395 }
396
397 void resync_timer_fn(unsigned long data)
398 {
399         struct drbd_conf *mdev = (struct drbd_conf *) data;
400
401         if (list_empty(&mdev->resync_work.list))
402                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
403 }
404
405 static void fifo_set(struct fifo_buffer *fb, int value)
406 {
407         int i;
408
409         for (i = 0; i < fb->size; i++)
410                 fb->values[i] = value;
411 }
412
413 static int fifo_push(struct fifo_buffer *fb, int value)
414 {
415         int ov;
416
417         ov = fb->values[fb->head_index];
418         fb->values[fb->head_index++] = value;
419
420         if (fb->head_index >= fb->size)
421                 fb->head_index = 0;
422
423         return ov;
424 }
425
426 static void fifo_add_val(struct fifo_buffer *fb, int value)
427 {
428         int i;
429
430         for (i = 0; i < fb->size; i++)
431                 fb->values[i] += value;
432 }
433
434 static int drbd_rs_controller(struct drbd_conf *mdev)
435 {
436         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
437         unsigned int want;     /* The number of sectors we want in the proxy */
438         int req_sect; /* Number of sectors to request in this turn */
439         int correction; /* Number of sectors more we need in the proxy*/
440         int cps; /* correction per invocation of drbd_rs_controller() */
441         int steps; /* Number of time steps to plan ahead */
442         int curr_corr;
443         int max_sect;
444
445         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
446         mdev->rs_in_flight -= sect_in;
447
448         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
449
450         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
451
452         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
453                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
454         } else { /* normal path */
455                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
456                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
457         }
458
459         correction = want - mdev->rs_in_flight - mdev->rs_planed;
460
461         /* Plan ahead */
462         cps = correction / steps;
463         fifo_add_val(&mdev->rs_plan_s, cps);
464         mdev->rs_planed += cps * steps;
465
466         /* What we do in this step */
467         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
468         spin_unlock(&mdev->peer_seq_lock);
469         mdev->rs_planed -= curr_corr;
470
471         req_sect = sect_in + curr_corr;
472         if (req_sect < 0)
473                 req_sect = 0;
474
475         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
476         if (req_sect > max_sect)
477                 req_sect = max_sect;
478
479         /*
480         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
481                  sect_in, mdev->rs_in_flight, want, correction,
482                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
483         */
484
485         return req_sect;
486 }
487
488 static int drbd_rs_number_requests(struct drbd_conf *mdev)
489 {
490         int number;
491         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
492                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
493                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
494         } else {
495                 mdev->c_sync_rate = mdev->sync_conf.rate;
496                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
497         }
498
499         /* ignore the amount of pending requests, the resync controller should
500          * throttle down to incoming reply rate soon enough anyways. */
501         return number;
502 }
503
504 static int w_make_resync_request(struct drbd_conf *mdev,
505                                  struct drbd_work *w, int cancel)
506 {
507         unsigned long bit;
508         sector_t sector;
509         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
510         int max_bio_size;
511         int number, rollback_i, size;
512         int align, queued, sndbuf;
513         int i = 0;
514
515         if (unlikely(cancel))
516                 return 1;
517
518         if (mdev->rs_total == 0) {
519                 /* empty resync? */
520                 drbd_resync_finished(mdev);
521                 return 1;
522         }
523
524         if (!get_ldev(mdev)) {
525                 /* Since we only need to access mdev->rsync a
526                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
527                    to continue resync with a broken disk makes no sense at
528                    all */
529                 dev_err(DEV, "Disk broke down during resync!\n");
530                 return 1;
531         }
532
533         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
534          * if it should be necessary */
535         max_bio_size =
536                 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
537                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
538
539         number = drbd_rs_number_requests(mdev);
540         if (number == 0)
541                 goto requeue;
542
543         for (i = 0; i < number; i++) {
544                 /* Stop generating RS requests, when half of the send buffer is filled */
545                 mutex_lock(&mdev->data.mutex);
546                 if (mdev->data.socket) {
547                         queued = mdev->data.socket->sk->sk_wmem_queued;
548                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
549                 } else {
550                         queued = 1;
551                         sndbuf = 0;
552                 }
553                 mutex_unlock(&mdev->data.mutex);
554                 if (queued > sndbuf / 2)
555                         goto requeue;
556
557 next_sector:
558                 size = BM_BLOCK_SIZE;
559                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
560
561                 if (bit == DRBD_END_OF_BITMAP) {
562                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
563                         put_ldev(mdev);
564                         return 1;
565                 }
566
567                 sector = BM_BIT_TO_SECT(bit);
568
569                 if (drbd_rs_should_slow_down(mdev, sector) ||
570                     drbd_try_rs_begin_io(mdev, sector)) {
571                         mdev->bm_resync_fo = bit;
572                         goto requeue;
573                 }
574                 mdev->bm_resync_fo = bit + 1;
575
576                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
577                         drbd_rs_complete_io(mdev, sector);
578                         goto next_sector;
579                 }
580
581 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
582                 /* try to find some adjacent bits.
583                  * we stop if we have already the maximum req size.
584                  *
585                  * Additionally always align bigger requests, in order to
586                  * be prepared for all stripe sizes of software RAIDs.
587                  */
588                 align = 1;
589                 rollback_i = i;
590                 for (;;) {
591                         if (size + BM_BLOCK_SIZE > max_bio_size)
592                                 break;
593
594                         /* Be always aligned */
595                         if (sector & ((1<<(align+3))-1))
596                                 break;
597
598                         /* do not cross extent boundaries */
599                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
600                                 break;
601                         /* now, is it actually dirty, after all?
602                          * caution, drbd_bm_test_bit is tri-state for some
603                          * obscure reason; ( b == 0 ) would get the out-of-band
604                          * only accidentally right because of the "oddly sized"
605                          * adjustment below */
606                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
607                                 break;
608                         bit++;
609                         size += BM_BLOCK_SIZE;
610                         if ((BM_BLOCK_SIZE << align) <= size)
611                                 align++;
612                         i++;
613                 }
614                 /* if we merged some,
615                  * reset the offset to start the next drbd_bm_find_next from */
616                 if (size > BM_BLOCK_SIZE)
617                         mdev->bm_resync_fo = bit + 1;
618 #endif
619
620                 /* adjust very last sectors, in case we are oddly sized */
621                 if (sector + (size>>9) > capacity)
622                         size = (capacity-sector)<<9;
623                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
624                         switch (read_for_csum(mdev, sector, size)) {
625                         case -EIO: /* Disk failure */
626                                 put_ldev(mdev);
627                                 return 0;
628                         case -EAGAIN: /* allocation failed, or ldev busy */
629                                 drbd_rs_complete_io(mdev, sector);
630                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
631                                 i = rollback_i;
632                                 goto requeue;
633                         case 0:
634                                 /* everything ok */
635                                 break;
636                         default:
637                                 BUG();
638                         }
639                 } else {
640                         inc_rs_pending(mdev);
641                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
642                                                sector, size, ID_SYNCER)) {
643                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
644                                 dec_rs_pending(mdev);
645                                 put_ldev(mdev);
646                                 return 0;
647                         }
648                 }
649         }
650
651         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
652                 /* last syncer _request_ was sent,
653                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
654                  * next sync group will resume), as soon as we receive the last
655                  * resync data block, and the last bit is cleared.
656                  * until then resync "work" is "inactive" ...
657                  */
658                 put_ldev(mdev);
659                 return 1;
660         }
661
662  requeue:
663         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
664         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
665         put_ldev(mdev);
666         return 1;
667 }
668
669 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
670 {
671         int number, i, size;
672         sector_t sector;
673         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
674
675         if (unlikely(cancel))
676                 return 1;
677
678         number = drbd_rs_number_requests(mdev);
679
680         sector = mdev->ov_position;
681         for (i = 0; i < number; i++) {
682                 if (sector >= capacity) {
683                         return 1;
684                 }
685
686                 size = BM_BLOCK_SIZE;
687
688                 if (drbd_rs_should_slow_down(mdev, sector) ||
689                     drbd_try_rs_begin_io(mdev, sector)) {
690                         mdev->ov_position = sector;
691                         goto requeue;
692                 }
693
694                 if (sector + (size>>9) > capacity)
695                         size = (capacity-sector)<<9;
696
697                 inc_rs_pending(mdev);
698                 if (!drbd_send_ov_request(mdev, sector, size)) {
699                         dec_rs_pending(mdev);
700                         return 0;
701                 }
702                 sector += BM_SECT_PER_BIT;
703         }
704         mdev->ov_position = sector;
705
706  requeue:
707         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
708         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
709         return 1;
710 }
711
712
713 void start_resync_timer_fn(unsigned long data)
714 {
715         struct drbd_conf *mdev = (struct drbd_conf *) data;
716
717         drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
718 }
719
720 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
721 {
722         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
723                 dev_warn(DEV, "w_start_resync later...\n");
724                 mdev->start_resync_timer.expires = jiffies + HZ/10;
725                 add_timer(&mdev->start_resync_timer);
726                 return 1;
727         }
728
729         drbd_start_resync(mdev, C_SYNC_SOURCE);
730         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
731         return 1;
732 }
733
734 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
735 {
736         kfree(w);
737         ov_oos_print(mdev);
738         drbd_resync_finished(mdev);
739
740         return 1;
741 }
742
743 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
744 {
745         kfree(w);
746
747         drbd_resync_finished(mdev);
748
749         return 1;
750 }
751
752 static void ping_peer(struct drbd_conf *mdev)
753 {
754         clear_bit(GOT_PING_ACK, &mdev->flags);
755         request_ping(mdev);
756         wait_event(mdev->misc_wait,
757                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
758 }
759
760 int drbd_resync_finished(struct drbd_conf *mdev)
761 {
762         unsigned long db, dt, dbdt;
763         unsigned long n_oos;
764         union drbd_state os, ns;
765         struct drbd_work *w;
766         char *khelper_cmd = NULL;
767         int verify_done = 0;
768
769         /* Remove all elements from the resync LRU. Since future actions
770          * might set bits in the (main) bitmap, then the entries in the
771          * resync LRU would be wrong. */
772         if (drbd_rs_del_all(mdev)) {
773                 /* In case this is not possible now, most probably because
774                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
775                  * queue (or even the read operations for those packets
776                  * is not finished by now).   Retry in 100ms. */
777
778                 schedule_timeout_interruptible(HZ / 10);
779                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
780                 if (w) {
781                         w->cb = w_resync_finished;
782                         drbd_queue_work(&mdev->data.work, w);
783                         return 1;
784                 }
785                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
786         }
787
788         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
789         if (dt <= 0)
790                 dt = 1;
791         db = mdev->rs_total;
792         dbdt = Bit2KB(db/dt);
793         mdev->rs_paused /= HZ;
794
795         if (!get_ldev(mdev))
796                 goto out;
797
798         ping_peer(mdev);
799
800         spin_lock_irq(&mdev->req_lock);
801         os = mdev->state;
802
803         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
804
805         /* This protects us against multiple calls (that can happen in the presence
806            of application IO), and against connectivity loss just before we arrive here. */
807         if (os.conn <= C_CONNECTED)
808                 goto out_unlock;
809
810         ns = os;
811         ns.conn = C_CONNECTED;
812
813         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
814              verify_done ? "Online verify " : "Resync",
815              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
816
817         n_oos = drbd_bm_total_weight(mdev);
818
819         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
820                 if (n_oos) {
821                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
822                               n_oos, Bit2KB(1));
823                         khelper_cmd = "out-of-sync";
824                 }
825         } else {
826                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
827
828                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
829                         khelper_cmd = "after-resync-target";
830
831                 if (mdev->csums_tfm && mdev->rs_total) {
832                         const unsigned long s = mdev->rs_same_csum;
833                         const unsigned long t = mdev->rs_total;
834                         const int ratio =
835                                 (t == 0)     ? 0 :
836                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
837                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
838                              "transferred %luK total %luK\n",
839                              ratio,
840                              Bit2KB(mdev->rs_same_csum),
841                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
842                              Bit2KB(mdev->rs_total));
843                 }
844         }
845
846         if (mdev->rs_failed) {
847                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
848
849                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
850                         ns.disk = D_INCONSISTENT;
851                         ns.pdsk = D_UP_TO_DATE;
852                 } else {
853                         ns.disk = D_UP_TO_DATE;
854                         ns.pdsk = D_INCONSISTENT;
855                 }
856         } else {
857                 ns.disk = D_UP_TO_DATE;
858                 ns.pdsk = D_UP_TO_DATE;
859
860                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861                         if (mdev->p_uuid) {
862                                 int i;
863                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
864                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
865                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
866                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
867                         } else {
868                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
869                         }
870                 }
871
872                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
873                         /* for verify runs, we don't update uuids here,
874                          * so there would be nothing to report. */
875                         drbd_uuid_set_bm(mdev, 0UL);
876                         drbd_print_uuids(mdev, "updated UUIDs");
877                         if (mdev->p_uuid) {
878                                 /* Now the two UUID sets are equal, update what we
879                                  * know of the peer. */
880                                 int i;
881                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
882                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
883                         }
884                 }
885         }
886
887         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
888 out_unlock:
889         spin_unlock_irq(&mdev->req_lock);
890         put_ldev(mdev);
891 out:
892         mdev->rs_total  = 0;
893         mdev->rs_failed = 0;
894         mdev->rs_paused = 0;
895         if (verify_done)
896                 mdev->ov_start_sector = 0;
897
898         drbd_md_sync(mdev);
899
900         if (khelper_cmd)
901                 drbd_khelper(mdev, khelper_cmd);
902
903         return 1;
904 }
905
906 /* helper */
907 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
908 {
909         if (drbd_ee_has_active_page(e)) {
910                 /* This might happen if sendpage() has not finished */
911                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
912                 atomic_add(i, &mdev->pp_in_use_by_net);
913                 atomic_sub(i, &mdev->pp_in_use);
914                 spin_lock_irq(&mdev->req_lock);
915                 list_add_tail(&e->w.list, &mdev->net_ee);
916                 spin_unlock_irq(&mdev->req_lock);
917                 wake_up(&drbd_pp_wait);
918         } else
919                 drbd_free_ee(mdev, e);
920 }
921
922 /**
923  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
924  * @mdev:       DRBD device.
925  * @w:          work object.
926  * @cancel:     The connection will be closed anyways
927  */
928 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
929 {
930         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
931         int ok;
932
933         if (unlikely(cancel)) {
934                 drbd_free_ee(mdev, e);
935                 dec_unacked(mdev);
936                 return 1;
937         }
938
939         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
940                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
941         } else {
942                 if (__ratelimit(&drbd_ratelimit_state))
943                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
944                             (unsigned long long)e->sector);
945
946                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
947         }
948
949         dec_unacked(mdev);
950
951         move_to_net_ee_or_free(mdev, e);
952
953         if (unlikely(!ok))
954                 dev_err(DEV, "drbd_send_block() failed\n");
955         return ok;
956 }
957
958 /**
959  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
960  * @mdev:       DRBD device.
961  * @w:          work object.
962  * @cancel:     The connection will be closed anyways
963  */
964 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965 {
966         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
967         int ok;
968
969         if (unlikely(cancel)) {
970                 drbd_free_ee(mdev, e);
971                 dec_unacked(mdev);
972                 return 1;
973         }
974
975         if (get_ldev_if_state(mdev, D_FAILED)) {
976                 drbd_rs_complete_io(mdev, e->sector);
977                 put_ldev(mdev);
978         }
979
980         if (mdev->state.conn == C_AHEAD) {
981                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
982         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
983                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
984                         inc_rs_pending(mdev);
985                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
986                 } else {
987                         if (__ratelimit(&drbd_ratelimit_state))
988                                 dev_err(DEV, "Not sending RSDataReply, "
989                                     "partner DISKLESS!\n");
990                         ok = 1;
991                 }
992         } else {
993                 if (__ratelimit(&drbd_ratelimit_state))
994                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
995                             (unsigned long long)e->sector);
996
997                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
998
999                 /* update resync data with failure */
1000                 drbd_rs_failed_io(mdev, e->sector, e->size);
1001         }
1002
1003         dec_unacked(mdev);
1004
1005         move_to_net_ee_or_free(mdev, e);
1006
1007         if (unlikely(!ok))
1008                 dev_err(DEV, "drbd_send_block() failed\n");
1009         return ok;
1010 }
1011
1012 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1013 {
1014         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1015         struct digest_info *di;
1016         int digest_size;
1017         void *digest = NULL;
1018         int ok, eq = 0;
1019
1020         if (unlikely(cancel)) {
1021                 drbd_free_ee(mdev, e);
1022                 dec_unacked(mdev);
1023                 return 1;
1024         }
1025
1026         if (get_ldev(mdev)) {
1027                 drbd_rs_complete_io(mdev, e->sector);
1028                 put_ldev(mdev);
1029         }
1030
1031         di = e->digest;
1032
1033         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1034                 /* quick hack to try to avoid a race against reconfiguration.
1035                  * a real fix would be much more involved,
1036                  * introducing more locking mechanisms */
1037                 if (mdev->csums_tfm) {
1038                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1039                         D_ASSERT(digest_size == di->digest_size);
1040                         digest = kmalloc(digest_size, GFP_NOIO);
1041                 }
1042                 if (digest) {
1043                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1044                         eq = !memcmp(digest, di->digest, digest_size);
1045                         kfree(digest);
1046                 }
1047
1048                 if (eq) {
1049                         drbd_set_in_sync(mdev, e->sector, e->size);
1050                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1051                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1052                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1053                 } else {
1054                         inc_rs_pending(mdev);
1055                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1056                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1057                         kfree(di);
1058                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1059                 }
1060         } else {
1061                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1062                 if (__ratelimit(&drbd_ratelimit_state))
1063                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1064         }
1065
1066         dec_unacked(mdev);
1067         move_to_net_ee_or_free(mdev, e);
1068
1069         if (unlikely(!ok))
1070                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1071         return ok;
1072 }
1073
1074 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1075 {
1076         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1077         int digest_size;
1078         void *digest;
1079         int ok = 1;
1080
1081         if (unlikely(cancel))
1082                 goto out;
1083
1084         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1085         digest = kmalloc(digest_size, GFP_NOIO);
1086         if (!digest) {
1087                 ok = 0; /* terminate the connection in case the allocation failed */
1088                 goto out;
1089         }
1090
1091         if (likely(!(e->flags & EE_WAS_ERROR)))
1092                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1093         else
1094                 memset(digest, 0, digest_size);
1095
1096         inc_rs_pending(mdev);
1097         ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1098                                      digest, digest_size, P_OV_REPLY);
1099         if (!ok)
1100                 dec_rs_pending(mdev);
1101         kfree(digest);
1102
1103 out:
1104         drbd_free_ee(mdev, e);
1105         dec_unacked(mdev);
1106
1107         return ok;
1108 }
1109
1110 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1111 {
1112         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1113                 mdev->ov_last_oos_size += size>>9;
1114         } else {
1115                 mdev->ov_last_oos_start = sector;
1116                 mdev->ov_last_oos_size = size>>9;
1117         }
1118         drbd_set_out_of_sync(mdev, sector, size);
1119 }
1120
1121 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122 {
1123         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1124         struct digest_info *di;
1125         int digest_size;
1126         void *digest;
1127         int ok, eq = 0;
1128
1129         if (unlikely(cancel)) {
1130                 drbd_free_ee(mdev, e);
1131                 dec_unacked(mdev);
1132                 return 1;
1133         }
1134
1135         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1136          * the resync lru has been cleaned up already */
1137         if (get_ldev(mdev)) {
1138                 drbd_rs_complete_io(mdev, e->sector);
1139                 put_ldev(mdev);
1140         }
1141
1142         di = e->digest;
1143
1144         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1145                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1146                 digest = kmalloc(digest_size, GFP_NOIO);
1147                 if (digest) {
1148                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1149
1150                         D_ASSERT(digest_size == di->digest_size);
1151                         eq = !memcmp(digest, di->digest, digest_size);
1152                         kfree(digest);
1153                 }
1154         }
1155
1156         dec_unacked(mdev);
1157         if (!eq)
1158                 drbd_ov_oos_found(mdev, e->sector, e->size);
1159         else
1160                 ov_oos_print(mdev);
1161
1162         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1163                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1164
1165         drbd_free_ee(mdev, e);
1166
1167         --mdev->ov_left;
1168
1169         /* let's advance progress step marks only for every other megabyte */
1170         if ((mdev->ov_left & 0x200) == 0x200)
1171                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1172
1173         if (mdev->ov_left == 0) {
1174                 ov_oos_print(mdev);
1175                 drbd_resync_finished(mdev);
1176         }
1177
1178         return ok;
1179 }
1180
1181 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1182 {
1183         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1184         complete(&b->done);
1185         return 1;
1186 }
1187
1188 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1189 {
1190         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1191         struct p_barrier *p = &mdev->data.sbuf.barrier;
1192         int ok = 1;
1193
1194         /* really avoid racing with tl_clear.  w.cb may have been referenced
1195          * just before it was reassigned and re-queued, so double check that.
1196          * actually, this race was harmless, since we only try to send the
1197          * barrier packet here, and otherwise do nothing with the object.
1198          * but compare with the head of w_clear_epoch */
1199         spin_lock_irq(&mdev->req_lock);
1200         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1201                 cancel = 1;
1202         spin_unlock_irq(&mdev->req_lock);
1203         if (cancel)
1204                 return 1;
1205
1206         if (!drbd_get_data_sock(mdev))
1207                 return 0;
1208         p->barrier = b->br_number;
1209         /* inc_ap_pending was done where this was queued.
1210          * dec_ap_pending will be done in got_BarrierAck
1211          * or (on connection loss) in w_clear_epoch.  */
1212         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1213                                 (struct p_header80 *)p, sizeof(*p), 0);
1214         drbd_put_data_sock(mdev);
1215
1216         return ok;
1217 }
1218
1219 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1220 {
1221         if (cancel)
1222                 return 1;
1223         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1224 }
1225
1226 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1227 {
1228         struct drbd_request *req = container_of(w, struct drbd_request, w);
1229         int ok;
1230
1231         if (unlikely(cancel)) {
1232                 req_mod(req, send_canceled);
1233                 return 1;
1234         }
1235
1236         ok = drbd_send_oos(mdev, req);
1237         req_mod(req, oos_handed_to_network);
1238
1239         return ok;
1240 }
1241
1242 /**
1243  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1244  * @mdev:       DRBD device.
1245  * @w:          work object.
1246  * @cancel:     The connection will be closed anyways
1247  */
1248 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1249 {
1250         struct drbd_request *req = container_of(w, struct drbd_request, w);
1251         int ok;
1252
1253         if (unlikely(cancel)) {
1254                 req_mod(req, send_canceled);
1255                 return 1;
1256         }
1257
1258         ok = drbd_send_dblock(mdev, req);
1259         req_mod(req, ok ? handed_over_to_network : send_failed);
1260
1261         return ok;
1262 }
1263
1264 /**
1265  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1266  * @mdev:       DRBD device.
1267  * @w:          work object.
1268  * @cancel:     The connection will be closed anyways
1269  */
1270 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1271 {
1272         struct drbd_request *req = container_of(w, struct drbd_request, w);
1273         int ok;
1274
1275         if (unlikely(cancel)) {
1276                 req_mod(req, send_canceled);
1277                 return 1;
1278         }
1279
1280         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1281                                 (unsigned long)req);
1282
1283         if (!ok) {
1284                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1285                  * so this is probably redundant */
1286                 if (mdev->state.conn >= C_CONNECTED)
1287                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1288         }
1289         req_mod(req, ok ? handed_over_to_network : send_failed);
1290
1291         return ok;
1292 }
1293
1294 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1295 {
1296         struct drbd_request *req = container_of(w, struct drbd_request, w);
1297
1298         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1299                 drbd_al_begin_io(mdev, req->sector);
1300         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1301            theoretically. Practically it can not deadlock, since this is
1302            only used when unfreezing IOs. All the extents of the requests
1303            that made it into the TL are already active */
1304
1305         drbd_req_make_private_bio(req, req->master_bio);
1306         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1307         generic_make_request(req->private_bio);
1308
1309         return 1;
1310 }
1311
1312 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1313 {
1314         struct drbd_conf *odev = mdev;
1315
1316         while (1) {
1317                 if (odev->sync_conf.after == -1)
1318                         return 1;
1319                 odev = minor_to_mdev(odev->sync_conf.after);
1320                 ERR_IF(!odev) return 1;
1321                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1322                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1323                     odev->state.aftr_isp || odev->state.peer_isp ||
1324                     odev->state.user_isp)
1325                         return 0;
1326         }
1327 }
1328
1329 /**
1330  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1331  * @mdev:       DRBD device.
1332  *
1333  * Called from process context only (admin command and after_state_ch).
1334  */
1335 static int _drbd_pause_after(struct drbd_conf *mdev)
1336 {
1337         struct drbd_conf *odev;
1338         int i, rv = 0;
1339
1340         for (i = 0; i < minor_count; i++) {
1341                 odev = minor_to_mdev(i);
1342                 if (!odev)
1343                         continue;
1344                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1345                         continue;
1346                 if (!_drbd_may_sync_now(odev))
1347                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1348                                != SS_NOTHING_TO_DO);
1349         }
1350
1351         return rv;
1352 }
1353
1354 /**
1355  * _drbd_resume_next() - Resume resync on all devices that may resync now
1356  * @mdev:       DRBD device.
1357  *
1358  * Called from process context only (admin command and worker).
1359  */
1360 static int _drbd_resume_next(struct drbd_conf *mdev)
1361 {
1362         struct drbd_conf *odev;
1363         int i, rv = 0;
1364
1365         for (i = 0; i < minor_count; i++) {
1366                 odev = minor_to_mdev(i);
1367                 if (!odev)
1368                         continue;
1369                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1370                         continue;
1371                 if (odev->state.aftr_isp) {
1372                         if (_drbd_may_sync_now(odev))
1373                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1374                                                         CS_HARD, NULL)
1375                                        != SS_NOTHING_TO_DO) ;
1376                 }
1377         }
1378         return rv;
1379 }
1380
1381 void resume_next_sg(struct drbd_conf *mdev)
1382 {
1383         write_lock_irq(&global_state_lock);
1384         _drbd_resume_next(mdev);
1385         write_unlock_irq(&global_state_lock);
1386 }
1387
1388 void suspend_other_sg(struct drbd_conf *mdev)
1389 {
1390         write_lock_irq(&global_state_lock);
1391         _drbd_pause_after(mdev);
1392         write_unlock_irq(&global_state_lock);
1393 }
1394
1395 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1396 {
1397         struct drbd_conf *odev;
1398
1399         if (o_minor == -1)
1400                 return NO_ERROR;
1401         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1402                 return ERR_SYNC_AFTER;
1403
1404         /* check for loops */
1405         odev = minor_to_mdev(o_minor);
1406         while (1) {
1407                 if (odev == mdev)
1408                         return ERR_SYNC_AFTER_CYCLE;
1409
1410                 /* dependency chain ends here, no cycles. */
1411                 if (odev->sync_conf.after == -1)
1412                         return NO_ERROR;
1413
1414                 /* follow the dependency chain */
1415                 odev = minor_to_mdev(odev->sync_conf.after);
1416         }
1417 }
1418
1419 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1420 {
1421         int changes;
1422         int retcode;
1423
1424         write_lock_irq(&global_state_lock);
1425         retcode = sync_after_error(mdev, na);
1426         if (retcode == NO_ERROR) {
1427                 mdev->sync_conf.after = na;
1428                 do {
1429                         changes  = _drbd_pause_after(mdev);
1430                         changes |= _drbd_resume_next(mdev);
1431                 } while (changes);
1432         }
1433         write_unlock_irq(&global_state_lock);
1434         return retcode;
1435 }
1436
1437 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1438 {
1439         atomic_set(&mdev->rs_sect_in, 0);
1440         atomic_set(&mdev->rs_sect_ev, 0);
1441         mdev->rs_in_flight = 0;
1442         mdev->rs_planed = 0;
1443         spin_lock(&mdev->peer_seq_lock);
1444         fifo_set(&mdev->rs_plan_s, 0);
1445         spin_unlock(&mdev->peer_seq_lock);
1446 }
1447
1448 /**
1449  * drbd_start_resync() - Start the resync process
1450  * @mdev:       DRBD device.
1451  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1452  *
1453  * This function might bring you directly into one of the
1454  * C_PAUSED_SYNC_* states.
1455  */
1456 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1457 {
1458         union drbd_state ns;
1459         int r;
1460
1461         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1462                 dev_err(DEV, "Resync already running!\n");
1463                 return;
1464         }
1465
1466         if (mdev->state.conn < C_AHEAD) {
1467                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1468                 drbd_rs_cancel_all(mdev);
1469                 /* This should be done when we abort the resync. We definitely do not
1470                    want to have this for connections going back and forth between
1471                    Ahead/Behind and SyncSource/SyncTarget */
1472         }
1473
1474         if (side == C_SYNC_TARGET) {
1475                 /* Since application IO was locked out during C_WF_BITMAP_T and
1476                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1477                    we check that we might make the data inconsistent. */
1478                 r = drbd_khelper(mdev, "before-resync-target");
1479                 r = (r >> 8) & 0xff;
1480                 if (r > 0) {
1481                         dev_info(DEV, "before-resync-target handler returned %d, "
1482                              "dropping connection.\n", r);
1483                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1484                         return;
1485                 }
1486         } else /* C_SYNC_SOURCE */ {
1487                 r = drbd_khelper(mdev, "before-resync-source");
1488                 r = (r >> 8) & 0xff;
1489                 if (r > 0) {
1490                         if (r == 3) {
1491                                 dev_info(DEV, "before-resync-source handler returned %d, "
1492                                          "ignoring. Old userland tools?", r);
1493                         } else {
1494                                 dev_info(DEV, "before-resync-source handler returned %d, "
1495                                          "dropping connection.\n", r);
1496                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1497                                 return;
1498                         }
1499                 }
1500         }
1501
1502         drbd_state_lock(mdev);
1503
1504         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1505                 drbd_state_unlock(mdev);
1506                 return;
1507         }
1508
1509         write_lock_irq(&global_state_lock);
1510         ns = mdev->state;
1511
1512         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1513
1514         ns.conn = side;
1515
1516         if (side == C_SYNC_TARGET)
1517                 ns.disk = D_INCONSISTENT;
1518         else /* side == C_SYNC_SOURCE */
1519                 ns.pdsk = D_INCONSISTENT;
1520
1521         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1522         ns = mdev->state;
1523
1524         if (ns.conn < C_CONNECTED)
1525                 r = SS_UNKNOWN_ERROR;
1526
1527         if (r == SS_SUCCESS) {
1528                 unsigned long tw = drbd_bm_total_weight(mdev);
1529                 unsigned long now = jiffies;
1530                 int i;
1531
1532                 mdev->rs_failed    = 0;
1533                 mdev->rs_paused    = 0;
1534                 mdev->rs_same_csum = 0;
1535                 mdev->rs_last_events = 0;
1536                 mdev->rs_last_sect_ev = 0;
1537                 mdev->rs_total     = tw;
1538                 mdev->rs_start     = now;
1539                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1540                         mdev->rs_mark_left[i] = tw;
1541                         mdev->rs_mark_time[i] = now;
1542                 }
1543                 _drbd_pause_after(mdev);
1544         }
1545         write_unlock_irq(&global_state_lock);
1546
1547         if (r == SS_SUCCESS) {
1548                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1549                      drbd_conn_str(ns.conn),
1550                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1551                      (unsigned long) mdev->rs_total);
1552                 if (side == C_SYNC_TARGET)
1553                         mdev->bm_resync_fo = 0;
1554
1555                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1556                  * with w_send_oos, or the sync target will get confused as to
1557                  * how much bits to resync.  We cannot do that always, because for an
1558                  * empty resync and protocol < 95, we need to do it here, as we call
1559                  * drbd_resync_finished from here in that case.
1560                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1561                  * and from after_state_ch otherwise. */
1562                 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1563                         drbd_gen_and_send_sync_uuid(mdev);
1564
1565                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1566                         /* This still has a race (about when exactly the peers
1567                          * detect connection loss) that can lead to a full sync
1568                          * on next handshake. In 8.3.9 we fixed this with explicit
1569                          * resync-finished notifications, but the fix
1570                          * introduces a protocol change.  Sleeping for some
1571                          * time longer than the ping interval + timeout on the
1572                          * SyncSource, to give the SyncTarget the chance to
1573                          * detect connection loss, then waiting for a ping
1574                          * response (implicit in drbd_resync_finished) reduces
1575                          * the race considerably, but does not solve it. */
1576                         if (side == C_SYNC_SOURCE)
1577                                 schedule_timeout_interruptible(
1578                                         mdev->net_conf->ping_int * HZ +
1579                                         mdev->net_conf->ping_timeo*HZ/9);
1580                         drbd_resync_finished(mdev);
1581                 }
1582
1583                 drbd_rs_controller_reset(mdev);
1584                 /* ns.conn may already be != mdev->state.conn,
1585                  * we may have been paused in between, or become paused until
1586                  * the timer triggers.
1587                  * No matter, that is handled in resync_timer_fn() */
1588                 if (ns.conn == C_SYNC_TARGET)
1589                         mod_timer(&mdev->resync_timer, jiffies);
1590
1591                 drbd_md_sync(mdev);
1592         }
1593         put_ldev(mdev);
1594         drbd_state_unlock(mdev);
1595 }
1596
1597 int drbd_worker(struct drbd_thread *thi)
1598 {
1599         struct drbd_conf *mdev = thi->mdev;
1600         struct drbd_work *w = NULL;
1601         LIST_HEAD(work_list);
1602         int intr = 0, i;
1603
1604         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1605
1606         while (get_t_state(thi) == Running) {
1607                 drbd_thread_current_set_cpu(mdev);
1608
1609                 if (down_trylock(&mdev->data.work.s)) {
1610                         mutex_lock(&mdev->data.mutex);
1611                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1612                                 drbd_tcp_uncork(mdev->data.socket);
1613                         mutex_unlock(&mdev->data.mutex);
1614
1615                         intr = down_interruptible(&mdev->data.work.s);
1616
1617                         mutex_lock(&mdev->data.mutex);
1618                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1619                                 drbd_tcp_cork(mdev->data.socket);
1620                         mutex_unlock(&mdev->data.mutex);
1621                 }
1622
1623                 if (intr) {
1624                         D_ASSERT(intr == -EINTR);
1625                         flush_signals(current);
1626                         ERR_IF (get_t_state(thi) == Running)
1627                                 continue;
1628                         break;
1629                 }
1630
1631                 if (get_t_state(thi) != Running)
1632                         break;
1633                 /* With this break, we have done a down() but not consumed
1634                    the entry from the list. The cleanup code takes care of
1635                    this...   */
1636
1637                 w = NULL;
1638                 spin_lock_irq(&mdev->data.work.q_lock);
1639                 ERR_IF(list_empty(&mdev->data.work.q)) {
1640                         /* something terribly wrong in our logic.
1641                          * we were able to down() the semaphore,
1642                          * but the list is empty... doh.
1643                          *
1644                          * what is the best thing to do now?
1645                          * try again from scratch, restarting the receiver,
1646                          * asender, whatnot? could break even more ugly,
1647                          * e.g. when we are primary, but no good local data.
1648                          *
1649                          * I'll try to get away just starting over this loop.
1650                          */
1651                         spin_unlock_irq(&mdev->data.work.q_lock);
1652                         continue;
1653                 }
1654                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1655                 list_del_init(&w->list);
1656                 spin_unlock_irq(&mdev->data.work.q_lock);
1657
1658                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1659                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1660                         if (mdev->state.conn >= C_CONNECTED)
1661                                 drbd_force_state(mdev,
1662                                                 NS(conn, C_NETWORK_FAILURE));
1663                 }
1664         }
1665         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1666         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1667
1668         spin_lock_irq(&mdev->data.work.q_lock);
1669         i = 0;
1670         while (!list_empty(&mdev->data.work.q)) {
1671                 list_splice_init(&mdev->data.work.q, &work_list);
1672                 spin_unlock_irq(&mdev->data.work.q_lock);
1673
1674                 while (!list_empty(&work_list)) {
1675                         w = list_entry(work_list.next, struct drbd_work, list);
1676                         list_del_init(&w->list);
1677                         w->cb(mdev, w, 1);
1678                         i++; /* dead debugging code */
1679                 }
1680
1681                 spin_lock_irq(&mdev->data.work.q_lock);
1682         }
1683         sema_init(&mdev->data.work.s, 0);
1684         /* DANGEROUS race: if someone did queue his work within the spinlock,
1685          * but up() ed outside the spinlock, we could get an up() on the
1686          * semaphore without corresponding list entry.
1687          * So don't do that.
1688          */
1689         spin_unlock_irq(&mdev->data.work.q_lock);
1690
1691         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1692         /* _drbd_set_state only uses stop_nowait.
1693          * wait here for the Exiting receiver. */
1694         drbd_thread_stop(&mdev->receiver);
1695         drbd_mdev_cleanup(mdev);
1696
1697         dev_info(DEV, "worker terminated\n");
1698
1699         clear_bit(DEVICE_DYING, &mdev->flags);
1700         clear_bit(CONFIG_PENDING, &mdev->flags);
1701         wake_up(&mdev->state_wait);
1702
1703         return 0;
1704 }