Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-2.6
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73
74         md_io = (struct drbd_md_io *)bio->bi_private;
75         md_io->error = error;
76
77         complete(&md_io->event);
78 }
79
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84 {
85         unsigned long flags = 0;
86         struct drbd_conf *mdev = e->mdev;
87
88         D_ASSERT(e->block_id != ID_VACANT);
89
90         spin_lock_irqsave(&mdev->req_lock, flags);
91         mdev->read_cnt += e->size >> 9;
92         list_del(&e->w.list);
93         if (list_empty(&mdev->read_ee))
94                 wake_up(&mdev->ee_wait);
95         if (test_bit(__EE_WAS_ERROR, &e->flags))
96                 __drbd_chk_io_error(mdev, false);
97         spin_unlock_irqrestore(&mdev->req_lock, flags);
98
99         drbd_queue_work(&mdev->data.work, &e->w);
100         put_ldev(mdev);
101 }
102
103 /* writes on behalf of the partner, or resync writes,
104  * "submitted" by the receiver, final stage.  */
105 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
106 {
107         unsigned long flags = 0;
108         struct drbd_conf *mdev = e->mdev;
109         sector_t e_sector;
110         int do_wake;
111         int is_syncer_req;
112         int do_al_complete_io;
113
114         D_ASSERT(e->block_id != ID_VACANT);
115
116         /* after we moved e to done_ee,
117          * we may no longer access it,
118          * it may be freed/reused already!
119          * (as soon as we release the req_lock) */
120         e_sector = e->sector;
121         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
122         is_syncer_req = is_syncer_block_id(e->block_id);
123
124         spin_lock_irqsave(&mdev->req_lock, flags);
125         mdev->writ_cnt += e->size >> 9;
126         list_del(&e->w.list); /* has been on active_ee or sync_ee */
127         list_add_tail(&e->w.list, &mdev->done_ee);
128
129         /* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
130          * neither did we wake possibly waiting conflicting requests.
131          * done from "drbd_process_done_ee" within the appropriate w.cb
132          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
133
134         do_wake = is_syncer_req
135                 ? list_empty(&mdev->sync_ee)
136                 : list_empty(&mdev->active_ee);
137
138         if (test_bit(__EE_WAS_ERROR, &e->flags))
139                 __drbd_chk_io_error(mdev, false);
140         spin_unlock_irqrestore(&mdev->req_lock, flags);
141
142         if (is_syncer_req)
143                 drbd_rs_complete_io(mdev, e_sector);
144
145         if (do_wake)
146                 wake_up(&mdev->ee_wait);
147
148         if (do_al_complete_io)
149                 drbd_al_complete_io(mdev, e_sector);
150
151         wake_asender(mdev);
152         put_ldev(mdev);
153 }
154
155 /* writes on behalf of the partner, or resync writes,
156  * "submitted" by the receiver.
157  */
158 void drbd_endio_sec(struct bio *bio, int error)
159 {
160         struct drbd_epoch_entry *e = bio->bi_private;
161         struct drbd_conf *mdev = e->mdev;
162         int uptodate = bio_flagged(bio, BIO_UPTODATE);
163         int is_write = bio_data_dir(bio) == WRITE;
164
165         if (error && __ratelimit(&drbd_ratelimit_state))
166                 dev_warn(DEV, "%s: error=%d s=%llus\n",
167                                 is_write ? "write" : "read", error,
168                                 (unsigned long long)e->sector);
169         if (!error && !uptodate) {
170                 if (__ratelimit(&drbd_ratelimit_state))
171                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172                                         is_write ? "write" : "read",
173                                         (unsigned long long)e->sector);
174                 /* strange behavior of some lower level drivers...
175                  * fail the request by clearing the uptodate flag,
176                  * but do not return any error?! */
177                 error = -EIO;
178         }
179
180         if (error)
181                 set_bit(__EE_WAS_ERROR, &e->flags);
182
183         bio_put(bio); /* no need for the bio anymore */
184         if (atomic_dec_and_test(&e->pending_bios)) {
185                 if (is_write)
186                         drbd_endio_write_sec_final(e);
187                 else
188                         drbd_endio_read_sec_final(e);
189         }
190 }
191
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196         unsigned long flags;
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         struct bio_and_error m;
200         enum drbd_req_event what;
201         int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203         if (!error && !uptodate) {
204                 dev_warn(DEV, "p %s: setting error to -EIO\n",
205                          bio_data_dir(bio) == WRITE ? "write" : "read");
206                 /* strange behavior of some lower level drivers...
207                  * fail the request by clearing the uptodate flag,
208                  * but do not return any error?! */
209                 error = -EIO;
210         }
211
212         /* to avoid recursion in __req_mod */
213         if (unlikely(error)) {
214                 what = (bio_data_dir(bio) == WRITE)
215                         ? write_completed_with_error
216                         : (bio_rw(bio) == READ)
217                           ? read_completed_with_error
218                           : read_ahead_completed_with_error;
219         } else
220                 what = completed_ok;
221
222         bio_put(req->private_bio);
223         req->private_bio = ERR_PTR(error);
224
225         /* not req_mod(), we need irqsave here! */
226         spin_lock_irqsave(&mdev->req_lock, flags);
227         __req_mod(req, what, &m);
228         spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230         if (m.bio)
231                 complete_master_bio(mdev, &m);
232 }
233
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236         struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238         /* We should not detach for read io-error,
239          * but try to WRITE the P_DATA_REPLY to the failed location,
240          * to give the disk the chance to relocate that block */
241
242         spin_lock_irq(&mdev->req_lock);
243         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244                 _req_mod(req, read_retry_remote_canceled);
245                 spin_unlock_irq(&mdev->req_lock);
246                 return 1;
247         }
248         spin_unlock_irq(&mdev->req_lock);
249
250         return w_send_read_req(mdev, w, 0);
251 }
252
253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255         struct hash_desc desc;
256         struct scatterlist sg;
257         struct page *page = e->pages;
258         struct page *tmp;
259         unsigned len;
260
261         desc.tfm = tfm;
262         desc.flags = 0;
263
264         sg_init_table(&sg, 1);
265         crypto_hash_init(&desc);
266
267         while ((tmp = page_chain_next(page))) {
268                 /* all but the last page will be fully used */
269                 sg_set_page(&sg, page, PAGE_SIZE, 0);
270                 crypto_hash_update(&desc, &sg, sg.length);
271                 page = tmp;
272         }
273         /* and now the last, possibly only partially used page */
274         len = e->size & (PAGE_SIZE - 1);
275         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276         crypto_hash_update(&desc, &sg, sg.length);
277         crypto_hash_final(&desc, digest);
278 }
279
280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282         struct hash_desc desc;
283         struct scatterlist sg;
284         struct bio_vec *bvec;
285         int i;
286
287         desc.tfm = tfm;
288         desc.flags = 0;
289
290         sg_init_table(&sg, 1);
291         crypto_hash_init(&desc);
292
293         __bio_for_each_segment(bvec, bio, i, 0) {
294                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295                 crypto_hash_update(&desc, &sg, sg.length);
296         }
297         crypto_hash_final(&desc, digest);
298 }
299
300 /* TODO merge common code with w_e_end_ov_req */
301 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
302 {
303         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
304         int digest_size;
305         void *digest;
306         int ok = 1;
307
308         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
309
310         if (unlikely(cancel))
311                 goto out;
312
313         if (likely((e->flags & EE_WAS_ERROR) != 0))
314                 goto out;
315
316         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
317         digest = kmalloc(digest_size, GFP_NOIO);
318         if (digest) {
319                 sector_t sector = e->sector;
320                 unsigned int size = e->size;
321                 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
322                 /* Free e and pages before send.
323                  * In case we block on congestion, we could otherwise run into
324                  * some distributed deadlock, if the other side blocks on
325                  * congestion as well, because our receiver blocks in
326                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
327                 drbd_free_ee(mdev, e);
328                 e = NULL;
329                 inc_rs_pending(mdev);
330                 ok = drbd_send_drequest_csum(mdev, sector, size,
331                                              digest, digest_size,
332                                              P_CSUM_RS_REQUEST);
333                 kfree(digest);
334         } else {
335                 dev_err(DEV, "kmalloc() of digest failed.\n");
336                 ok = 0;
337         }
338
339 out:
340         if (e)
341                 drbd_free_ee(mdev, e);
342
343         if (unlikely(!ok))
344                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
345         return ok;
346 }
347
348 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
349
350 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
351 {
352         struct drbd_epoch_entry *e;
353
354         if (!get_ldev(mdev))
355                 return -EIO;
356
357         if (drbd_rs_should_slow_down(mdev, sector))
358                 goto defer;
359
360         /* GFP_TRY, because if there is no memory available right now, this may
361          * be rescheduled for later. It is "only" background resync, after all. */
362         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
363         if (!e)
364                 goto defer;
365
366         e->w.cb = w_e_send_csum;
367         spin_lock_irq(&mdev->req_lock);
368         list_add(&e->w.list, &mdev->read_ee);
369         spin_unlock_irq(&mdev->req_lock);
370
371         atomic_add(size >> 9, &mdev->rs_sect_ev);
372         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
373                 return 0;
374
375         /* If it failed because of ENOMEM, retry should help.  If it failed
376          * because bio_add_page failed (probably broken lower level driver),
377          * retry may or may not help.
378          * If it does not, you may need to force disconnect. */
379         spin_lock_irq(&mdev->req_lock);
380         list_del(&e->w.list);
381         spin_unlock_irq(&mdev->req_lock);
382
383         drbd_free_ee(mdev, e);
384 defer:
385         put_ldev(mdev);
386         return -EAGAIN;
387 }
388
389 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
390 {
391         switch (mdev->state.conn) {
392         case C_VERIFY_S:
393                 w_make_ov_request(mdev, w, cancel);
394                 break;
395         case C_SYNC_TARGET:
396                 w_make_resync_request(mdev, w, cancel);
397                 break;
398         }
399
400         return 1;
401 }
402
403 void resync_timer_fn(unsigned long data)
404 {
405         struct drbd_conf *mdev = (struct drbd_conf *) data;
406
407         if (list_empty(&mdev->resync_work.list))
408                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
409 }
410
411 static void fifo_set(struct fifo_buffer *fb, int value)
412 {
413         int i;
414
415         for (i = 0; i < fb->size; i++)
416                 fb->values[i] = value;
417 }
418
419 static int fifo_push(struct fifo_buffer *fb, int value)
420 {
421         int ov;
422
423         ov = fb->values[fb->head_index];
424         fb->values[fb->head_index++] = value;
425
426         if (fb->head_index >= fb->size)
427                 fb->head_index = 0;
428
429         return ov;
430 }
431
432 static void fifo_add_val(struct fifo_buffer *fb, int value)
433 {
434         int i;
435
436         for (i = 0; i < fb->size; i++)
437                 fb->values[i] += value;
438 }
439
440 static int drbd_rs_controller(struct drbd_conf *mdev)
441 {
442         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
443         unsigned int want;     /* The number of sectors we want in the proxy */
444         int req_sect; /* Number of sectors to request in this turn */
445         int correction; /* Number of sectors more we need in the proxy*/
446         int cps; /* correction per invocation of drbd_rs_controller() */
447         int steps; /* Number of time steps to plan ahead */
448         int curr_corr;
449         int max_sect;
450
451         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
452         mdev->rs_in_flight -= sect_in;
453
454         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
455
456         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
457
458         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
459                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
460         } else { /* normal path */
461                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
462                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
463         }
464
465         correction = want - mdev->rs_in_flight - mdev->rs_planed;
466
467         /* Plan ahead */
468         cps = correction / steps;
469         fifo_add_val(&mdev->rs_plan_s, cps);
470         mdev->rs_planed += cps * steps;
471
472         /* What we do in this step */
473         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
474         spin_unlock(&mdev->peer_seq_lock);
475         mdev->rs_planed -= curr_corr;
476
477         req_sect = sect_in + curr_corr;
478         if (req_sect < 0)
479                 req_sect = 0;
480
481         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
482         if (req_sect > max_sect)
483                 req_sect = max_sect;
484
485         /*
486         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
487                  sect_in, mdev->rs_in_flight, want, correction,
488                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
489         */
490
491         return req_sect;
492 }
493
494 static int drbd_rs_number_requests(struct drbd_conf *mdev)
495 {
496         int number;
497         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
498                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
499                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
500         } else {
501                 mdev->c_sync_rate = mdev->sync_conf.rate;
502                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
503         }
504
505         /* ignore the amount of pending requests, the resync controller should
506          * throttle down to incoming reply rate soon enough anyways. */
507         return number;
508 }
509
510 static int w_make_resync_request(struct drbd_conf *mdev,
511                                  struct drbd_work *w, int cancel)
512 {
513         unsigned long bit;
514         sector_t sector;
515         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
516         int max_bio_size;
517         int number, rollback_i, size;
518         int align, queued, sndbuf;
519         int i = 0;
520
521         if (unlikely(cancel))
522                 return 1;
523
524         if (mdev->rs_total == 0) {
525                 /* empty resync? */
526                 drbd_resync_finished(mdev);
527                 return 1;
528         }
529
530         if (!get_ldev(mdev)) {
531                 /* Since we only need to access mdev->rsync a
532                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
533                    to continue resync with a broken disk makes no sense at
534                    all */
535                 dev_err(DEV, "Disk broke down during resync!\n");
536                 return 1;
537         }
538
539         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
540          * if it should be necessary */
541         max_bio_size =
542                 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
543                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
544
545         number = drbd_rs_number_requests(mdev);
546         if (number == 0)
547                 goto requeue;
548
549         for (i = 0; i < number; i++) {
550                 /* Stop generating RS requests, when half of the send buffer is filled */
551                 mutex_lock(&mdev->data.mutex);
552                 if (mdev->data.socket) {
553                         queued = mdev->data.socket->sk->sk_wmem_queued;
554                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
555                 } else {
556                         queued = 1;
557                         sndbuf = 0;
558                 }
559                 mutex_unlock(&mdev->data.mutex);
560                 if (queued > sndbuf / 2)
561                         goto requeue;
562
563 next_sector:
564                 size = BM_BLOCK_SIZE;
565                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
566
567                 if (bit == DRBD_END_OF_BITMAP) {
568                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
569                         put_ldev(mdev);
570                         return 1;
571                 }
572
573                 sector = BM_BIT_TO_SECT(bit);
574
575                 if (drbd_rs_should_slow_down(mdev, sector) ||
576                     drbd_try_rs_begin_io(mdev, sector)) {
577                         mdev->bm_resync_fo = bit;
578                         goto requeue;
579                 }
580                 mdev->bm_resync_fo = bit + 1;
581
582                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
583                         drbd_rs_complete_io(mdev, sector);
584                         goto next_sector;
585                 }
586
587 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
588                 /* try to find some adjacent bits.
589                  * we stop if we have already the maximum req size.
590                  *
591                  * Additionally always align bigger requests, in order to
592                  * be prepared for all stripe sizes of software RAIDs.
593                  */
594                 align = 1;
595                 rollback_i = i;
596                 for (;;) {
597                         if (size + BM_BLOCK_SIZE > max_bio_size)
598                                 break;
599
600                         /* Be always aligned */
601                         if (sector & ((1<<(align+3))-1))
602                                 break;
603
604                         /* do not cross extent boundaries */
605                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
606                                 break;
607                         /* now, is it actually dirty, after all?
608                          * caution, drbd_bm_test_bit is tri-state for some
609                          * obscure reason; ( b == 0 ) would get the out-of-band
610                          * only accidentally right because of the "oddly sized"
611                          * adjustment below */
612                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
613                                 break;
614                         bit++;
615                         size += BM_BLOCK_SIZE;
616                         if ((BM_BLOCK_SIZE << align) <= size)
617                                 align++;
618                         i++;
619                 }
620                 /* if we merged some,
621                  * reset the offset to start the next drbd_bm_find_next from */
622                 if (size > BM_BLOCK_SIZE)
623                         mdev->bm_resync_fo = bit + 1;
624 #endif
625
626                 /* adjust very last sectors, in case we are oddly sized */
627                 if (sector + (size>>9) > capacity)
628                         size = (capacity-sector)<<9;
629                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
630                         switch (read_for_csum(mdev, sector, size)) {
631                         case -EIO: /* Disk failure */
632                                 put_ldev(mdev);
633                                 return 0;
634                         case -EAGAIN: /* allocation failed, or ldev busy */
635                                 drbd_rs_complete_io(mdev, sector);
636                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
637                                 i = rollback_i;
638                                 goto requeue;
639                         case 0:
640                                 /* everything ok */
641                                 break;
642                         default:
643                                 BUG();
644                         }
645                 } else {
646                         inc_rs_pending(mdev);
647                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
648                                                sector, size, ID_SYNCER)) {
649                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
650                                 dec_rs_pending(mdev);
651                                 put_ldev(mdev);
652                                 return 0;
653                         }
654                 }
655         }
656
657         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
658                 /* last syncer _request_ was sent,
659                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
660                  * next sync group will resume), as soon as we receive the last
661                  * resync data block, and the last bit is cleared.
662                  * until then resync "work" is "inactive" ...
663                  */
664                 put_ldev(mdev);
665                 return 1;
666         }
667
668  requeue:
669         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
670         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
671         put_ldev(mdev);
672         return 1;
673 }
674
675 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
676 {
677         int number, i, size;
678         sector_t sector;
679         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
680
681         if (unlikely(cancel))
682                 return 1;
683
684         number = drbd_rs_number_requests(mdev);
685
686         sector = mdev->ov_position;
687         for (i = 0; i < number; i++) {
688                 if (sector >= capacity) {
689                         return 1;
690                 }
691
692                 size = BM_BLOCK_SIZE;
693
694                 if (drbd_rs_should_slow_down(mdev, sector) ||
695                     drbd_try_rs_begin_io(mdev, sector)) {
696                         mdev->ov_position = sector;
697                         goto requeue;
698                 }
699
700                 if (sector + (size>>9) > capacity)
701                         size = (capacity-sector)<<9;
702
703                 inc_rs_pending(mdev);
704                 if (!drbd_send_ov_request(mdev, sector, size)) {
705                         dec_rs_pending(mdev);
706                         return 0;
707                 }
708                 sector += BM_SECT_PER_BIT;
709         }
710         mdev->ov_position = sector;
711
712  requeue:
713         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
714         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
715         return 1;
716 }
717
718
719 void start_resync_timer_fn(unsigned long data)
720 {
721         struct drbd_conf *mdev = (struct drbd_conf *) data;
722
723         drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
724 }
725
726 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
727 {
728         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
729                 dev_warn(DEV, "w_start_resync later...\n");
730                 mdev->start_resync_timer.expires = jiffies + HZ/10;
731                 add_timer(&mdev->start_resync_timer);
732                 return 1;
733         }
734
735         drbd_start_resync(mdev, C_SYNC_SOURCE);
736         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
737         return 1;
738 }
739
740 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741 {
742         kfree(w);
743         ov_oos_print(mdev);
744         drbd_resync_finished(mdev);
745
746         return 1;
747 }
748
749 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
750 {
751         kfree(w);
752
753         drbd_resync_finished(mdev);
754
755         return 1;
756 }
757
758 static void ping_peer(struct drbd_conf *mdev)
759 {
760         clear_bit(GOT_PING_ACK, &mdev->flags);
761         request_ping(mdev);
762         wait_event(mdev->misc_wait,
763                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
764 }
765
766 int drbd_resync_finished(struct drbd_conf *mdev)
767 {
768         unsigned long db, dt, dbdt;
769         unsigned long n_oos;
770         union drbd_state os, ns;
771         struct drbd_work *w;
772         char *khelper_cmd = NULL;
773         int verify_done = 0;
774
775         /* Remove all elements from the resync LRU. Since future actions
776          * might set bits in the (main) bitmap, then the entries in the
777          * resync LRU would be wrong. */
778         if (drbd_rs_del_all(mdev)) {
779                 /* In case this is not possible now, most probably because
780                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
781                  * queue (or even the read operations for those packets
782                  * is not finished by now).   Retry in 100ms. */
783
784                 schedule_timeout_interruptible(HZ / 10);
785                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
786                 if (w) {
787                         w->cb = w_resync_finished;
788                         drbd_queue_work(&mdev->data.work, w);
789                         return 1;
790                 }
791                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
792         }
793
794         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
795         if (dt <= 0)
796                 dt = 1;
797         db = mdev->rs_total;
798         dbdt = Bit2KB(db/dt);
799         mdev->rs_paused /= HZ;
800
801         if (!get_ldev(mdev))
802                 goto out;
803
804         ping_peer(mdev);
805
806         spin_lock_irq(&mdev->req_lock);
807         os = mdev->state;
808
809         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
810
811         /* This protects us against multiple calls (that can happen in the presence
812            of application IO), and against connectivity loss just before we arrive here. */
813         if (os.conn <= C_CONNECTED)
814                 goto out_unlock;
815
816         ns = os;
817         ns.conn = C_CONNECTED;
818
819         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
820              verify_done ? "Online verify " : "Resync",
821              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
822
823         n_oos = drbd_bm_total_weight(mdev);
824
825         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
826                 if (n_oos) {
827                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
828                               n_oos, Bit2KB(1));
829                         khelper_cmd = "out-of-sync";
830                 }
831         } else {
832                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
833
834                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
835                         khelper_cmd = "after-resync-target";
836
837                 if (mdev->csums_tfm && mdev->rs_total) {
838                         const unsigned long s = mdev->rs_same_csum;
839                         const unsigned long t = mdev->rs_total;
840                         const int ratio =
841                                 (t == 0)     ? 0 :
842                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
843                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
844                              "transferred %luK total %luK\n",
845                              ratio,
846                              Bit2KB(mdev->rs_same_csum),
847                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
848                              Bit2KB(mdev->rs_total));
849                 }
850         }
851
852         if (mdev->rs_failed) {
853                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
854
855                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
856                         ns.disk = D_INCONSISTENT;
857                         ns.pdsk = D_UP_TO_DATE;
858                 } else {
859                         ns.disk = D_UP_TO_DATE;
860                         ns.pdsk = D_INCONSISTENT;
861                 }
862         } else {
863                 ns.disk = D_UP_TO_DATE;
864                 ns.pdsk = D_UP_TO_DATE;
865
866                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
867                         if (mdev->p_uuid) {
868                                 int i;
869                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
870                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
871                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
872                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
873                         } else {
874                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
875                         }
876                 }
877
878                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
879                         /* for verify runs, we don't update uuids here,
880                          * so there would be nothing to report. */
881                         drbd_uuid_set_bm(mdev, 0UL);
882                         drbd_print_uuids(mdev, "updated UUIDs");
883                         if (mdev->p_uuid) {
884                                 /* Now the two UUID sets are equal, update what we
885                                  * know of the peer. */
886                                 int i;
887                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
888                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
889                         }
890                 }
891         }
892
893         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
894 out_unlock:
895         spin_unlock_irq(&mdev->req_lock);
896         put_ldev(mdev);
897 out:
898         mdev->rs_total  = 0;
899         mdev->rs_failed = 0;
900         mdev->rs_paused = 0;
901         if (verify_done)
902                 mdev->ov_start_sector = 0;
903
904         drbd_md_sync(mdev);
905
906         if (khelper_cmd)
907                 drbd_khelper(mdev, khelper_cmd);
908
909         return 1;
910 }
911
912 /* helper */
913 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
914 {
915         if (drbd_ee_has_active_page(e)) {
916                 /* This might happen if sendpage() has not finished */
917                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
918                 atomic_add(i, &mdev->pp_in_use_by_net);
919                 atomic_sub(i, &mdev->pp_in_use);
920                 spin_lock_irq(&mdev->req_lock);
921                 list_add_tail(&e->w.list, &mdev->net_ee);
922                 spin_unlock_irq(&mdev->req_lock);
923                 wake_up(&drbd_pp_wait);
924         } else
925                 drbd_free_ee(mdev, e);
926 }
927
928 /**
929  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
930  * @mdev:       DRBD device.
931  * @w:          work object.
932  * @cancel:     The connection will be closed anyways
933  */
934 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
935 {
936         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
937         int ok;
938
939         if (unlikely(cancel)) {
940                 drbd_free_ee(mdev, e);
941                 dec_unacked(mdev);
942                 return 1;
943         }
944
945         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
946                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
947         } else {
948                 if (__ratelimit(&drbd_ratelimit_state))
949                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
950                             (unsigned long long)e->sector);
951
952                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
953         }
954
955         dec_unacked(mdev);
956
957         move_to_net_ee_or_free(mdev, e);
958
959         if (unlikely(!ok))
960                 dev_err(DEV, "drbd_send_block() failed\n");
961         return ok;
962 }
963
964 /**
965  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
966  * @mdev:       DRBD device.
967  * @w:          work object.
968  * @cancel:     The connection will be closed anyways
969  */
970 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
971 {
972         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
973         int ok;
974
975         if (unlikely(cancel)) {
976                 drbd_free_ee(mdev, e);
977                 dec_unacked(mdev);
978                 return 1;
979         }
980
981         if (get_ldev_if_state(mdev, D_FAILED)) {
982                 drbd_rs_complete_io(mdev, e->sector);
983                 put_ldev(mdev);
984         }
985
986         if (mdev->state.conn == C_AHEAD) {
987                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
988         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
989                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
990                         inc_rs_pending(mdev);
991                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
992                 } else {
993                         if (__ratelimit(&drbd_ratelimit_state))
994                                 dev_err(DEV, "Not sending RSDataReply, "
995                                     "partner DISKLESS!\n");
996                         ok = 1;
997                 }
998         } else {
999                 if (__ratelimit(&drbd_ratelimit_state))
1000                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1001                             (unsigned long long)e->sector);
1002
1003                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1004
1005                 /* update resync data with failure */
1006                 drbd_rs_failed_io(mdev, e->sector, e->size);
1007         }
1008
1009         dec_unacked(mdev);
1010
1011         move_to_net_ee_or_free(mdev, e);
1012
1013         if (unlikely(!ok))
1014                 dev_err(DEV, "drbd_send_block() failed\n");
1015         return ok;
1016 }
1017
1018 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1019 {
1020         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1021         struct digest_info *di;
1022         int digest_size;
1023         void *digest = NULL;
1024         int ok, eq = 0;
1025
1026         if (unlikely(cancel)) {
1027                 drbd_free_ee(mdev, e);
1028                 dec_unacked(mdev);
1029                 return 1;
1030         }
1031
1032         if (get_ldev(mdev)) {
1033                 drbd_rs_complete_io(mdev, e->sector);
1034                 put_ldev(mdev);
1035         }
1036
1037         di = e->digest;
1038
1039         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1040                 /* quick hack to try to avoid a race against reconfiguration.
1041                  * a real fix would be much more involved,
1042                  * introducing more locking mechanisms */
1043                 if (mdev->csums_tfm) {
1044                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1045                         D_ASSERT(digest_size == di->digest_size);
1046                         digest = kmalloc(digest_size, GFP_NOIO);
1047                 }
1048                 if (digest) {
1049                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1050                         eq = !memcmp(digest, di->digest, digest_size);
1051                         kfree(digest);
1052                 }
1053
1054                 if (eq) {
1055                         drbd_set_in_sync(mdev, e->sector, e->size);
1056                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1057                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1058                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1059                 } else {
1060                         inc_rs_pending(mdev);
1061                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1062                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1063                         kfree(di);
1064                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1065                 }
1066         } else {
1067                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1068                 if (__ratelimit(&drbd_ratelimit_state))
1069                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1070         }
1071
1072         dec_unacked(mdev);
1073         move_to_net_ee_or_free(mdev, e);
1074
1075         if (unlikely(!ok))
1076                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1077         return ok;
1078 }
1079
1080 /* TODO merge common code with w_e_send_csum */
1081 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1082 {
1083         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1084         sector_t sector = e->sector;
1085         unsigned int size = e->size;
1086         int digest_size;
1087         void *digest;
1088         int ok = 1;
1089
1090         if (unlikely(cancel))
1091                 goto out;
1092
1093         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1094         digest = kmalloc(digest_size, GFP_NOIO);
1095         if (!digest) {
1096                 ok = 0; /* terminate the connection in case the allocation failed */
1097                 goto out;
1098         }
1099
1100         if (likely(!(e->flags & EE_WAS_ERROR)))
1101                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1102         else
1103                 memset(digest, 0, digest_size);
1104
1105         /* Free e and pages before send.
1106          * In case we block on congestion, we could otherwise run into
1107          * some distributed deadlock, if the other side blocks on
1108          * congestion as well, because our receiver blocks in
1109          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1110         drbd_free_ee(mdev, e);
1111         e = NULL;
1112         inc_rs_pending(mdev);
1113         ok = drbd_send_drequest_csum(mdev, sector, size,
1114                                      digest, digest_size,
1115                                      P_OV_REPLY);
1116         if (!ok)
1117                 dec_rs_pending(mdev);
1118         kfree(digest);
1119
1120 out:
1121         if (e)
1122                 drbd_free_ee(mdev, e);
1123         dec_unacked(mdev);
1124         return ok;
1125 }
1126
1127 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1128 {
1129         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1130                 mdev->ov_last_oos_size += size>>9;
1131         } else {
1132                 mdev->ov_last_oos_start = sector;
1133                 mdev->ov_last_oos_size = size>>9;
1134         }
1135         drbd_set_out_of_sync(mdev, sector, size);
1136 }
1137
1138 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1139 {
1140         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1141         struct digest_info *di;
1142         void *digest;
1143         sector_t sector = e->sector;
1144         unsigned int size = e->size;
1145         int digest_size;
1146         int ok, eq = 0;
1147
1148         if (unlikely(cancel)) {
1149                 drbd_free_ee(mdev, e);
1150                 dec_unacked(mdev);
1151                 return 1;
1152         }
1153
1154         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1155          * the resync lru has been cleaned up already */
1156         if (get_ldev(mdev)) {
1157                 drbd_rs_complete_io(mdev, e->sector);
1158                 put_ldev(mdev);
1159         }
1160
1161         di = e->digest;
1162
1163         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1164                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1165                 digest = kmalloc(digest_size, GFP_NOIO);
1166                 if (digest) {
1167                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1168
1169                         D_ASSERT(digest_size == di->digest_size);
1170                         eq = !memcmp(digest, di->digest, digest_size);
1171                         kfree(digest);
1172                 }
1173         }
1174
1175                 /* Free e and pages before send.
1176                  * In case we block on congestion, we could otherwise run into
1177                  * some distributed deadlock, if the other side blocks on
1178                  * congestion as well, because our receiver blocks in
1179                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1180         drbd_free_ee(mdev, e);
1181         if (!eq)
1182                 drbd_ov_oos_found(mdev, sector, size);
1183         else
1184                 ov_oos_print(mdev);
1185
1186         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1187                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1188
1189         dec_unacked(mdev);
1190
1191         --mdev->ov_left;
1192
1193         /* let's advance progress step marks only for every other megabyte */
1194         if ((mdev->ov_left & 0x200) == 0x200)
1195                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1196
1197         if (mdev->ov_left == 0) {
1198                 ov_oos_print(mdev);
1199                 drbd_resync_finished(mdev);
1200         }
1201
1202         return ok;
1203 }
1204
1205 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1206 {
1207         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1208         complete(&b->done);
1209         return 1;
1210 }
1211
1212 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1213 {
1214         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1215         struct p_barrier *p = &mdev->data.sbuf.barrier;
1216         int ok = 1;
1217
1218         /* really avoid racing with tl_clear.  w.cb may have been referenced
1219          * just before it was reassigned and re-queued, so double check that.
1220          * actually, this race was harmless, since we only try to send the
1221          * barrier packet here, and otherwise do nothing with the object.
1222          * but compare with the head of w_clear_epoch */
1223         spin_lock_irq(&mdev->req_lock);
1224         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1225                 cancel = 1;
1226         spin_unlock_irq(&mdev->req_lock);
1227         if (cancel)
1228                 return 1;
1229
1230         if (!drbd_get_data_sock(mdev))
1231                 return 0;
1232         p->barrier = b->br_number;
1233         /* inc_ap_pending was done where this was queued.
1234          * dec_ap_pending will be done in got_BarrierAck
1235          * or (on connection loss) in w_clear_epoch.  */
1236         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1237                                 (struct p_header80 *)p, sizeof(*p), 0);
1238         drbd_put_data_sock(mdev);
1239
1240         return ok;
1241 }
1242
1243 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1244 {
1245         if (cancel)
1246                 return 1;
1247         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1248 }
1249
1250 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1251 {
1252         struct drbd_request *req = container_of(w, struct drbd_request, w);
1253         int ok;
1254
1255         if (unlikely(cancel)) {
1256                 req_mod(req, send_canceled);
1257                 return 1;
1258         }
1259
1260         ok = drbd_send_oos(mdev, req);
1261         req_mod(req, oos_handed_to_network);
1262
1263         return ok;
1264 }
1265
1266 /**
1267  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1268  * @mdev:       DRBD device.
1269  * @w:          work object.
1270  * @cancel:     The connection will be closed anyways
1271  */
1272 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1273 {
1274         struct drbd_request *req = container_of(w, struct drbd_request, w);
1275         int ok;
1276
1277         if (unlikely(cancel)) {
1278                 req_mod(req, send_canceled);
1279                 return 1;
1280         }
1281
1282         ok = drbd_send_dblock(mdev, req);
1283         req_mod(req, ok ? handed_over_to_network : send_failed);
1284
1285         return ok;
1286 }
1287
1288 /**
1289  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1290  * @mdev:       DRBD device.
1291  * @w:          work object.
1292  * @cancel:     The connection will be closed anyways
1293  */
1294 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1295 {
1296         struct drbd_request *req = container_of(w, struct drbd_request, w);
1297         int ok;
1298
1299         if (unlikely(cancel)) {
1300                 req_mod(req, send_canceled);
1301                 return 1;
1302         }
1303
1304         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1305                                 (unsigned long)req);
1306
1307         if (!ok) {
1308                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1309                  * so this is probably redundant */
1310                 if (mdev->state.conn >= C_CONNECTED)
1311                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1312         }
1313         req_mod(req, ok ? handed_over_to_network : send_failed);
1314
1315         return ok;
1316 }
1317
1318 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1319 {
1320         struct drbd_request *req = container_of(w, struct drbd_request, w);
1321
1322         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1323                 drbd_al_begin_io(mdev, req->sector);
1324         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1325            theoretically. Practically it can not deadlock, since this is
1326            only used when unfreezing IOs. All the extents of the requests
1327            that made it into the TL are already active */
1328
1329         drbd_req_make_private_bio(req, req->master_bio);
1330         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1331         generic_make_request(req->private_bio);
1332
1333         return 1;
1334 }
1335
1336 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1337 {
1338         struct drbd_conf *odev = mdev;
1339
1340         while (1) {
1341                 if (odev->sync_conf.after == -1)
1342                         return 1;
1343                 odev = minor_to_mdev(odev->sync_conf.after);
1344                 ERR_IF(!odev) return 1;
1345                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1346                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1347                     odev->state.aftr_isp || odev->state.peer_isp ||
1348                     odev->state.user_isp)
1349                         return 0;
1350         }
1351 }
1352
1353 /**
1354  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1355  * @mdev:       DRBD device.
1356  *
1357  * Called from process context only (admin command and after_state_ch).
1358  */
1359 static int _drbd_pause_after(struct drbd_conf *mdev)
1360 {
1361         struct drbd_conf *odev;
1362         int i, rv = 0;
1363
1364         for (i = 0; i < minor_count; i++) {
1365                 odev = minor_to_mdev(i);
1366                 if (!odev)
1367                         continue;
1368                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1369                         continue;
1370                 if (!_drbd_may_sync_now(odev))
1371                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1372                                != SS_NOTHING_TO_DO);
1373         }
1374
1375         return rv;
1376 }
1377
1378 /**
1379  * _drbd_resume_next() - Resume resync on all devices that may resync now
1380  * @mdev:       DRBD device.
1381  *
1382  * Called from process context only (admin command and worker).
1383  */
1384 static int _drbd_resume_next(struct drbd_conf *mdev)
1385 {
1386         struct drbd_conf *odev;
1387         int i, rv = 0;
1388
1389         for (i = 0; i < minor_count; i++) {
1390                 odev = minor_to_mdev(i);
1391                 if (!odev)
1392                         continue;
1393                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1394                         continue;
1395                 if (odev->state.aftr_isp) {
1396                         if (_drbd_may_sync_now(odev))
1397                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1398                                                         CS_HARD, NULL)
1399                                        != SS_NOTHING_TO_DO) ;
1400                 }
1401         }
1402         return rv;
1403 }
1404
1405 void resume_next_sg(struct drbd_conf *mdev)
1406 {
1407         write_lock_irq(&global_state_lock);
1408         _drbd_resume_next(mdev);
1409         write_unlock_irq(&global_state_lock);
1410 }
1411
1412 void suspend_other_sg(struct drbd_conf *mdev)
1413 {
1414         write_lock_irq(&global_state_lock);
1415         _drbd_pause_after(mdev);
1416         write_unlock_irq(&global_state_lock);
1417 }
1418
1419 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1420 {
1421         struct drbd_conf *odev;
1422
1423         if (o_minor == -1)
1424                 return NO_ERROR;
1425         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1426                 return ERR_SYNC_AFTER;
1427
1428         /* check for loops */
1429         odev = minor_to_mdev(o_minor);
1430         while (1) {
1431                 if (odev == mdev)
1432                         return ERR_SYNC_AFTER_CYCLE;
1433
1434                 /* dependency chain ends here, no cycles. */
1435                 if (odev->sync_conf.after == -1)
1436                         return NO_ERROR;
1437
1438                 /* follow the dependency chain */
1439                 odev = minor_to_mdev(odev->sync_conf.after);
1440         }
1441 }
1442
1443 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1444 {
1445         int changes;
1446         int retcode;
1447
1448         write_lock_irq(&global_state_lock);
1449         retcode = sync_after_error(mdev, na);
1450         if (retcode == NO_ERROR) {
1451                 mdev->sync_conf.after = na;
1452                 do {
1453                         changes  = _drbd_pause_after(mdev);
1454                         changes |= _drbd_resume_next(mdev);
1455                 } while (changes);
1456         }
1457         write_unlock_irq(&global_state_lock);
1458         return retcode;
1459 }
1460
1461 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1462 {
1463         atomic_set(&mdev->rs_sect_in, 0);
1464         atomic_set(&mdev->rs_sect_ev, 0);
1465         mdev->rs_in_flight = 0;
1466         mdev->rs_planed = 0;
1467         spin_lock(&mdev->peer_seq_lock);
1468         fifo_set(&mdev->rs_plan_s, 0);
1469         spin_unlock(&mdev->peer_seq_lock);
1470 }
1471
1472 /**
1473  * drbd_start_resync() - Start the resync process
1474  * @mdev:       DRBD device.
1475  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1476  *
1477  * This function might bring you directly into one of the
1478  * C_PAUSED_SYNC_* states.
1479  */
1480 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1481 {
1482         union drbd_state ns;
1483         int r;
1484
1485         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1486                 dev_err(DEV, "Resync already running!\n");
1487                 return;
1488         }
1489
1490         if (mdev->state.conn < C_AHEAD) {
1491                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1492                 drbd_rs_cancel_all(mdev);
1493                 /* This should be done when we abort the resync. We definitely do not
1494                    want to have this for connections going back and forth between
1495                    Ahead/Behind and SyncSource/SyncTarget */
1496         }
1497
1498         if (side == C_SYNC_TARGET) {
1499                 /* Since application IO was locked out during C_WF_BITMAP_T and
1500                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1501                    we check that we might make the data inconsistent. */
1502                 r = drbd_khelper(mdev, "before-resync-target");
1503                 r = (r >> 8) & 0xff;
1504                 if (r > 0) {
1505                         dev_info(DEV, "before-resync-target handler returned %d, "
1506                              "dropping connection.\n", r);
1507                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1508                         return;
1509                 }
1510         } else /* C_SYNC_SOURCE */ {
1511                 r = drbd_khelper(mdev, "before-resync-source");
1512                 r = (r >> 8) & 0xff;
1513                 if (r > 0) {
1514                         if (r == 3) {
1515                                 dev_info(DEV, "before-resync-source handler returned %d, "
1516                                          "ignoring. Old userland tools?", r);
1517                         } else {
1518                                 dev_info(DEV, "before-resync-source handler returned %d, "
1519                                          "dropping connection.\n", r);
1520                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1521                                 return;
1522                         }
1523                 }
1524         }
1525
1526         drbd_state_lock(mdev);
1527
1528         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1529                 drbd_state_unlock(mdev);
1530                 return;
1531         }
1532
1533         write_lock_irq(&global_state_lock);
1534         ns = mdev->state;
1535
1536         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1537
1538         ns.conn = side;
1539
1540         if (side == C_SYNC_TARGET)
1541                 ns.disk = D_INCONSISTENT;
1542         else /* side == C_SYNC_SOURCE */
1543                 ns.pdsk = D_INCONSISTENT;
1544
1545         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1546         ns = mdev->state;
1547
1548         if (ns.conn < C_CONNECTED)
1549                 r = SS_UNKNOWN_ERROR;
1550
1551         if (r == SS_SUCCESS) {
1552                 unsigned long tw = drbd_bm_total_weight(mdev);
1553                 unsigned long now = jiffies;
1554                 int i;
1555
1556                 mdev->rs_failed    = 0;
1557                 mdev->rs_paused    = 0;
1558                 mdev->rs_same_csum = 0;
1559                 mdev->rs_last_events = 0;
1560                 mdev->rs_last_sect_ev = 0;
1561                 mdev->rs_total     = tw;
1562                 mdev->rs_start     = now;
1563                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1564                         mdev->rs_mark_left[i] = tw;
1565                         mdev->rs_mark_time[i] = now;
1566                 }
1567                 _drbd_pause_after(mdev);
1568         }
1569         write_unlock_irq(&global_state_lock);
1570
1571         if (r == SS_SUCCESS) {
1572                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1573                      drbd_conn_str(ns.conn),
1574                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1575                      (unsigned long) mdev->rs_total);
1576                 if (side == C_SYNC_TARGET)
1577                         mdev->bm_resync_fo = 0;
1578
1579                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1580                  * with w_send_oos, or the sync target will get confused as to
1581                  * how much bits to resync.  We cannot do that always, because for an
1582                  * empty resync and protocol < 95, we need to do it here, as we call
1583                  * drbd_resync_finished from here in that case.
1584                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1585                  * and from after_state_ch otherwise. */
1586                 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1587                         drbd_gen_and_send_sync_uuid(mdev);
1588
1589                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1590                         /* This still has a race (about when exactly the peers
1591                          * detect connection loss) that can lead to a full sync
1592                          * on next handshake. In 8.3.9 we fixed this with explicit
1593                          * resync-finished notifications, but the fix
1594                          * introduces a protocol change.  Sleeping for some
1595                          * time longer than the ping interval + timeout on the
1596                          * SyncSource, to give the SyncTarget the chance to
1597                          * detect connection loss, then waiting for a ping
1598                          * response (implicit in drbd_resync_finished) reduces
1599                          * the race considerably, but does not solve it. */
1600                         if (side == C_SYNC_SOURCE)
1601                                 schedule_timeout_interruptible(
1602                                         mdev->net_conf->ping_int * HZ +
1603                                         mdev->net_conf->ping_timeo*HZ/9);
1604                         drbd_resync_finished(mdev);
1605                 }
1606
1607                 drbd_rs_controller_reset(mdev);
1608                 /* ns.conn may already be != mdev->state.conn,
1609                  * we may have been paused in between, or become paused until
1610                  * the timer triggers.
1611                  * No matter, that is handled in resync_timer_fn() */
1612                 if (ns.conn == C_SYNC_TARGET)
1613                         mod_timer(&mdev->resync_timer, jiffies);
1614
1615                 drbd_md_sync(mdev);
1616         }
1617         put_ldev(mdev);
1618         drbd_state_unlock(mdev);
1619 }
1620
1621 int drbd_worker(struct drbd_thread *thi)
1622 {
1623         struct drbd_conf *mdev = thi->mdev;
1624         struct drbd_work *w = NULL;
1625         LIST_HEAD(work_list);
1626         int intr = 0, i;
1627
1628         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1629
1630         while (get_t_state(thi) == Running) {
1631                 drbd_thread_current_set_cpu(mdev);
1632
1633                 if (down_trylock(&mdev->data.work.s)) {
1634                         mutex_lock(&mdev->data.mutex);
1635                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1636                                 drbd_tcp_uncork(mdev->data.socket);
1637                         mutex_unlock(&mdev->data.mutex);
1638
1639                         intr = down_interruptible(&mdev->data.work.s);
1640
1641                         mutex_lock(&mdev->data.mutex);
1642                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1643                                 drbd_tcp_cork(mdev->data.socket);
1644                         mutex_unlock(&mdev->data.mutex);
1645                 }
1646
1647                 if (intr) {
1648                         D_ASSERT(intr == -EINTR);
1649                         flush_signals(current);
1650                         ERR_IF (get_t_state(thi) == Running)
1651                                 continue;
1652                         break;
1653                 }
1654
1655                 if (get_t_state(thi) != Running)
1656                         break;
1657                 /* With this break, we have done a down() but not consumed
1658                    the entry from the list. The cleanup code takes care of
1659                    this...   */
1660
1661                 w = NULL;
1662                 spin_lock_irq(&mdev->data.work.q_lock);
1663                 ERR_IF(list_empty(&mdev->data.work.q)) {
1664                         /* something terribly wrong in our logic.
1665                          * we were able to down() the semaphore,
1666                          * but the list is empty... doh.
1667                          *
1668                          * what is the best thing to do now?
1669                          * try again from scratch, restarting the receiver,
1670                          * asender, whatnot? could break even more ugly,
1671                          * e.g. when we are primary, but no good local data.
1672                          *
1673                          * I'll try to get away just starting over this loop.
1674                          */
1675                         spin_unlock_irq(&mdev->data.work.q_lock);
1676                         continue;
1677                 }
1678                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1679                 list_del_init(&w->list);
1680                 spin_unlock_irq(&mdev->data.work.q_lock);
1681
1682                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1683                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1684                         if (mdev->state.conn >= C_CONNECTED)
1685                                 drbd_force_state(mdev,
1686                                                 NS(conn, C_NETWORK_FAILURE));
1687                 }
1688         }
1689         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1690         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1691
1692         spin_lock_irq(&mdev->data.work.q_lock);
1693         i = 0;
1694         while (!list_empty(&mdev->data.work.q)) {
1695                 list_splice_init(&mdev->data.work.q, &work_list);
1696                 spin_unlock_irq(&mdev->data.work.q_lock);
1697
1698                 while (!list_empty(&work_list)) {
1699                         w = list_entry(work_list.next, struct drbd_work, list);
1700                         list_del_init(&w->list);
1701                         w->cb(mdev, w, 1);
1702                         i++; /* dead debugging code */
1703                 }
1704
1705                 spin_lock_irq(&mdev->data.work.q_lock);
1706         }
1707         sema_init(&mdev->data.work.s, 0);
1708         /* DANGEROUS race: if someone did queue his work within the spinlock,
1709          * but up() ed outside the spinlock, we could get an up() on the
1710          * semaphore without corresponding list entry.
1711          * So don't do that.
1712          */
1713         spin_unlock_irq(&mdev->data.work.q_lock);
1714
1715         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1716         /* _drbd_set_state only uses stop_nowait.
1717          * wait here for the Exiting receiver. */
1718         drbd_thread_stop(&mdev->receiver);
1719         drbd_mdev_cleanup(mdev);
1720
1721         dev_info(DEV, "worker terminated\n");
1722
1723         clear_bit(DEVICE_DYING, &mdev->flags);
1724         clear_bit(CONFIG_PENDING, &mdev->flags);
1725         wake_up(&mdev->state_wait);
1726
1727         return 0;
1728 }