Merge branch 'gpio/merge' of git://git.secretlab.ca/git/linux-2.6
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73
74         md_io = (struct drbd_md_io *)bio->bi_private;
75         md_io->error = error;
76
77         complete(&md_io->event);
78 }
79
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84 {
85         unsigned long flags = 0;
86         struct drbd_conf *mdev = e->mdev;
87
88         D_ASSERT(e->block_id != ID_VACANT);
89
90         spin_lock_irqsave(&mdev->req_lock, flags);
91         mdev->read_cnt += e->size >> 9;
92         list_del(&e->w.list);
93         if (list_empty(&mdev->read_ee))
94                 wake_up(&mdev->ee_wait);
95         if (test_bit(__EE_WAS_ERROR, &e->flags))
96                 __drbd_chk_io_error(mdev, false);
97         spin_unlock_irqrestore(&mdev->req_lock, flags);
98
99         drbd_queue_work(&mdev->data.work, &e->w);
100         put_ldev(mdev);
101 }
102
103 /* writes on behalf of the partner, or resync writes,
104  * "submitted" by the receiver, final stage.  */
105 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
106 {
107         unsigned long flags = 0;
108         struct drbd_conf *mdev = e->mdev;
109         sector_t e_sector;
110         int do_wake;
111         int is_syncer_req;
112         int do_al_complete_io;
113
114         D_ASSERT(e->block_id != ID_VACANT);
115
116         /* after we moved e to done_ee,
117          * we may no longer access it,
118          * it may be freed/reused already!
119          * (as soon as we release the req_lock) */
120         e_sector = e->sector;
121         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
122         is_syncer_req = is_syncer_block_id(e->block_id);
123
124         spin_lock_irqsave(&mdev->req_lock, flags);
125         mdev->writ_cnt += e->size >> 9;
126         list_del(&e->w.list); /* has been on active_ee or sync_ee */
127         list_add_tail(&e->w.list, &mdev->done_ee);
128
129         /* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
130          * neither did we wake possibly waiting conflicting requests.
131          * done from "drbd_process_done_ee" within the appropriate w.cb
132          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
133
134         do_wake = is_syncer_req
135                 ? list_empty(&mdev->sync_ee)
136                 : list_empty(&mdev->active_ee);
137
138         if (test_bit(__EE_WAS_ERROR, &e->flags))
139                 __drbd_chk_io_error(mdev, false);
140         spin_unlock_irqrestore(&mdev->req_lock, flags);
141
142         if (is_syncer_req)
143                 drbd_rs_complete_io(mdev, e_sector);
144
145         if (do_wake)
146                 wake_up(&mdev->ee_wait);
147
148         if (do_al_complete_io)
149                 drbd_al_complete_io(mdev, e_sector);
150
151         wake_asender(mdev);
152         put_ldev(mdev);
153 }
154
155 /* writes on behalf of the partner, or resync writes,
156  * "submitted" by the receiver.
157  */
158 void drbd_endio_sec(struct bio *bio, int error)
159 {
160         struct drbd_epoch_entry *e = bio->bi_private;
161         struct drbd_conf *mdev = e->mdev;
162         int uptodate = bio_flagged(bio, BIO_UPTODATE);
163         int is_write = bio_data_dir(bio) == WRITE;
164
165         if (error && __ratelimit(&drbd_ratelimit_state))
166                 dev_warn(DEV, "%s: error=%d s=%llus\n",
167                                 is_write ? "write" : "read", error,
168                                 (unsigned long long)e->sector);
169         if (!error && !uptodate) {
170                 if (__ratelimit(&drbd_ratelimit_state))
171                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172                                         is_write ? "write" : "read",
173                                         (unsigned long long)e->sector);
174                 /* strange behavior of some lower level drivers...
175                  * fail the request by clearing the uptodate flag,
176                  * but do not return any error?! */
177                 error = -EIO;
178         }
179
180         if (error)
181                 set_bit(__EE_WAS_ERROR, &e->flags);
182
183         bio_put(bio); /* no need for the bio anymore */
184         if (atomic_dec_and_test(&e->pending_bios)) {
185                 if (is_write)
186                         drbd_endio_write_sec_final(e);
187                 else
188                         drbd_endio_read_sec_final(e);
189         }
190 }
191
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196         unsigned long flags;
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         struct bio_and_error m;
200         enum drbd_req_event what;
201         int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203         if (!error && !uptodate) {
204                 dev_warn(DEV, "p %s: setting error to -EIO\n",
205                          bio_data_dir(bio) == WRITE ? "write" : "read");
206                 /* strange behavior of some lower level drivers...
207                  * fail the request by clearing the uptodate flag,
208                  * but do not return any error?! */
209                 error = -EIO;
210         }
211
212         /* to avoid recursion in __req_mod */
213         if (unlikely(error)) {
214                 what = (bio_data_dir(bio) == WRITE)
215                         ? write_completed_with_error
216                         : (bio_rw(bio) == READ)
217                           ? read_completed_with_error
218                           : read_ahead_completed_with_error;
219         } else
220                 what = completed_ok;
221
222         bio_put(req->private_bio);
223         req->private_bio = ERR_PTR(error);
224
225         /* not req_mod(), we need irqsave here! */
226         spin_lock_irqsave(&mdev->req_lock, flags);
227         __req_mod(req, what, &m);
228         spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230         if (m.bio)
231                 complete_master_bio(mdev, &m);
232 }
233
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236         struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238         /* We should not detach for read io-error,
239          * but try to WRITE the P_DATA_REPLY to the failed location,
240          * to give the disk the chance to relocate that block */
241
242         spin_lock_irq(&mdev->req_lock);
243         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244                 _req_mod(req, read_retry_remote_canceled);
245                 spin_unlock_irq(&mdev->req_lock);
246                 return 1;
247         }
248         spin_unlock_irq(&mdev->req_lock);
249
250         return w_send_read_req(mdev, w, 0);
251 }
252
253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255         struct hash_desc desc;
256         struct scatterlist sg;
257         struct page *page = e->pages;
258         struct page *tmp;
259         unsigned len;
260
261         desc.tfm = tfm;
262         desc.flags = 0;
263
264         sg_init_table(&sg, 1);
265         crypto_hash_init(&desc);
266
267         while ((tmp = page_chain_next(page))) {
268                 /* all but the last page will be fully used */
269                 sg_set_page(&sg, page, PAGE_SIZE, 0);
270                 crypto_hash_update(&desc, &sg, sg.length);
271                 page = tmp;
272         }
273         /* and now the last, possibly only partially used page */
274         len = e->size & (PAGE_SIZE - 1);
275         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276         crypto_hash_update(&desc, &sg, sg.length);
277         crypto_hash_final(&desc, digest);
278 }
279
280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282         struct hash_desc desc;
283         struct scatterlist sg;
284         struct bio_vec *bvec;
285         int i;
286
287         desc.tfm = tfm;
288         desc.flags = 0;
289
290         sg_init_table(&sg, 1);
291         crypto_hash_init(&desc);
292
293         __bio_for_each_segment(bvec, bio, i, 0) {
294                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295                 crypto_hash_update(&desc, &sg, sg.length);
296         }
297         crypto_hash_final(&desc, digest);
298 }
299
300 /* TODO merge common code with w_e_end_ov_req */
301 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
302 {
303         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
304         int digest_size;
305         void *digest;
306         int ok = 1;
307
308         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
309
310         if (unlikely(cancel))
311                 goto out;
312
313         if (likely((e->flags & EE_WAS_ERROR) != 0))
314                 goto out;
315
316         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
317         digest = kmalloc(digest_size, GFP_NOIO);
318         if (digest) {
319                 sector_t sector = e->sector;
320                 unsigned int size = e->size;
321                 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
322                 /* Free e and pages before send.
323                  * In case we block on congestion, we could otherwise run into
324                  * some distributed deadlock, if the other side blocks on
325                  * congestion as well, because our receiver blocks in
326                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
327                 drbd_free_ee(mdev, e);
328                 e = NULL;
329                 inc_rs_pending(mdev);
330                 ok = drbd_send_drequest_csum(mdev, sector, size,
331                                              digest, digest_size,
332                                              P_CSUM_RS_REQUEST);
333                 kfree(digest);
334         } else {
335                 dev_err(DEV, "kmalloc() of digest failed.\n");
336                 ok = 0;
337         }
338
339 out:
340         if (e)
341                 drbd_free_ee(mdev, e);
342
343         if (unlikely(!ok))
344                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
345         return ok;
346 }
347
348 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
349
350 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
351 {
352         struct drbd_epoch_entry *e;
353
354         if (!get_ldev(mdev))
355                 return -EIO;
356
357         if (drbd_rs_should_slow_down(mdev, sector))
358                 goto defer;
359
360         /* GFP_TRY, because if there is no memory available right now, this may
361          * be rescheduled for later. It is "only" background resync, after all. */
362         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
363         if (!e)
364                 goto defer;
365
366         e->w.cb = w_e_send_csum;
367         spin_lock_irq(&mdev->req_lock);
368         list_add(&e->w.list, &mdev->read_ee);
369         spin_unlock_irq(&mdev->req_lock);
370
371         atomic_add(size >> 9, &mdev->rs_sect_ev);
372         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
373                 return 0;
374
375         /* If it failed because of ENOMEM, retry should help.  If it failed
376          * because bio_add_page failed (probably broken lower level driver),
377          * retry may or may not help.
378          * If it does not, you may need to force disconnect. */
379         spin_lock_irq(&mdev->req_lock);
380         list_del(&e->w.list);
381         spin_unlock_irq(&mdev->req_lock);
382
383         drbd_free_ee(mdev, e);
384 defer:
385         put_ldev(mdev);
386         return -EAGAIN;
387 }
388
389 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
390 {
391         switch (mdev->state.conn) {
392         case C_VERIFY_S:
393                 w_make_ov_request(mdev, w, cancel);
394                 break;
395         case C_SYNC_TARGET:
396                 w_make_resync_request(mdev, w, cancel);
397                 break;
398         }
399
400         return 1;
401 }
402
403 void resync_timer_fn(unsigned long data)
404 {
405         struct drbd_conf *mdev = (struct drbd_conf *) data;
406
407         if (list_empty(&mdev->resync_work.list))
408                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
409 }
410
411 static void fifo_set(struct fifo_buffer *fb, int value)
412 {
413         int i;
414
415         for (i = 0; i < fb->size; i++)
416                 fb->values[i] = value;
417 }
418
419 static int fifo_push(struct fifo_buffer *fb, int value)
420 {
421         int ov;
422
423         ov = fb->values[fb->head_index];
424         fb->values[fb->head_index++] = value;
425
426         if (fb->head_index >= fb->size)
427                 fb->head_index = 0;
428
429         return ov;
430 }
431
432 static void fifo_add_val(struct fifo_buffer *fb, int value)
433 {
434         int i;
435
436         for (i = 0; i < fb->size; i++)
437                 fb->values[i] += value;
438 }
439
440 static int drbd_rs_controller(struct drbd_conf *mdev)
441 {
442         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
443         unsigned int want;     /* The number of sectors we want in the proxy */
444         int req_sect; /* Number of sectors to request in this turn */
445         int correction; /* Number of sectors more we need in the proxy*/
446         int cps; /* correction per invocation of drbd_rs_controller() */
447         int steps; /* Number of time steps to plan ahead */
448         int curr_corr;
449         int max_sect;
450
451         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
452         mdev->rs_in_flight -= sect_in;
453
454         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
455
456         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
457
458         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
459                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
460         } else { /* normal path */
461                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
462                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
463         }
464
465         correction = want - mdev->rs_in_flight - mdev->rs_planed;
466
467         /* Plan ahead */
468         cps = correction / steps;
469         fifo_add_val(&mdev->rs_plan_s, cps);
470         mdev->rs_planed += cps * steps;
471
472         /* What we do in this step */
473         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
474         spin_unlock(&mdev->peer_seq_lock);
475         mdev->rs_planed -= curr_corr;
476
477         req_sect = sect_in + curr_corr;
478         if (req_sect < 0)
479                 req_sect = 0;
480
481         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
482         if (req_sect > max_sect)
483                 req_sect = max_sect;
484
485         /*
486         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
487                  sect_in, mdev->rs_in_flight, want, correction,
488                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
489         */
490
491         return req_sect;
492 }
493
494 static int drbd_rs_number_requests(struct drbd_conf *mdev)
495 {
496         int number;
497         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
498                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
499                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
500         } else {
501                 mdev->c_sync_rate = mdev->sync_conf.rate;
502                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
503         }
504
505         /* ignore the amount of pending requests, the resync controller should
506          * throttle down to incoming reply rate soon enough anyways. */
507         return number;
508 }
509
510 static int w_make_resync_request(struct drbd_conf *mdev,
511                                  struct drbd_work *w, int cancel)
512 {
513         unsigned long bit;
514         sector_t sector;
515         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
516         int max_bio_size;
517         int number, rollback_i, size;
518         int align, queued, sndbuf;
519         int i = 0;
520
521         if (unlikely(cancel))
522                 return 1;
523
524         if (mdev->rs_total == 0) {
525                 /* empty resync? */
526                 drbd_resync_finished(mdev);
527                 return 1;
528         }
529
530         if (!get_ldev(mdev)) {
531                 /* Since we only need to access mdev->rsync a
532                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
533                    to continue resync with a broken disk makes no sense at
534                    all */
535                 dev_err(DEV, "Disk broke down during resync!\n");
536                 return 1;
537         }
538
539         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
540         number = drbd_rs_number_requests(mdev);
541         if (number == 0)
542                 goto requeue;
543
544         for (i = 0; i < number; i++) {
545                 /* Stop generating RS requests, when half of the send buffer is filled */
546                 mutex_lock(&mdev->data.mutex);
547                 if (mdev->data.socket) {
548                         queued = mdev->data.socket->sk->sk_wmem_queued;
549                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
550                 } else {
551                         queued = 1;
552                         sndbuf = 0;
553                 }
554                 mutex_unlock(&mdev->data.mutex);
555                 if (queued > sndbuf / 2)
556                         goto requeue;
557
558 next_sector:
559                 size = BM_BLOCK_SIZE;
560                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
561
562                 if (bit == DRBD_END_OF_BITMAP) {
563                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
564                         put_ldev(mdev);
565                         return 1;
566                 }
567
568                 sector = BM_BIT_TO_SECT(bit);
569
570                 if (drbd_rs_should_slow_down(mdev, sector) ||
571                     drbd_try_rs_begin_io(mdev, sector)) {
572                         mdev->bm_resync_fo = bit;
573                         goto requeue;
574                 }
575                 mdev->bm_resync_fo = bit + 1;
576
577                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
578                         drbd_rs_complete_io(mdev, sector);
579                         goto next_sector;
580                 }
581
582 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
583                 /* try to find some adjacent bits.
584                  * we stop if we have already the maximum req size.
585                  *
586                  * Additionally always align bigger requests, in order to
587                  * be prepared for all stripe sizes of software RAIDs.
588                  */
589                 align = 1;
590                 rollback_i = i;
591                 for (;;) {
592                         if (size + BM_BLOCK_SIZE > max_bio_size)
593                                 break;
594
595                         /* Be always aligned */
596                         if (sector & ((1<<(align+3))-1))
597                                 break;
598
599                         /* do not cross extent boundaries */
600                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
601                                 break;
602                         /* now, is it actually dirty, after all?
603                          * caution, drbd_bm_test_bit is tri-state for some
604                          * obscure reason; ( b == 0 ) would get the out-of-band
605                          * only accidentally right because of the "oddly sized"
606                          * adjustment below */
607                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
608                                 break;
609                         bit++;
610                         size += BM_BLOCK_SIZE;
611                         if ((BM_BLOCK_SIZE << align) <= size)
612                                 align++;
613                         i++;
614                 }
615                 /* if we merged some,
616                  * reset the offset to start the next drbd_bm_find_next from */
617                 if (size > BM_BLOCK_SIZE)
618                         mdev->bm_resync_fo = bit + 1;
619 #endif
620
621                 /* adjust very last sectors, in case we are oddly sized */
622                 if (sector + (size>>9) > capacity)
623                         size = (capacity-sector)<<9;
624                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
625                         switch (read_for_csum(mdev, sector, size)) {
626                         case -EIO: /* Disk failure */
627                                 put_ldev(mdev);
628                                 return 0;
629                         case -EAGAIN: /* allocation failed, or ldev busy */
630                                 drbd_rs_complete_io(mdev, sector);
631                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
632                                 i = rollback_i;
633                                 goto requeue;
634                         case 0:
635                                 /* everything ok */
636                                 break;
637                         default:
638                                 BUG();
639                         }
640                 } else {
641                         inc_rs_pending(mdev);
642                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
643                                                sector, size, ID_SYNCER)) {
644                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
645                                 dec_rs_pending(mdev);
646                                 put_ldev(mdev);
647                                 return 0;
648                         }
649                 }
650         }
651
652         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
653                 /* last syncer _request_ was sent,
654                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
655                  * next sync group will resume), as soon as we receive the last
656                  * resync data block, and the last bit is cleared.
657                  * until then resync "work" is "inactive" ...
658                  */
659                 put_ldev(mdev);
660                 return 1;
661         }
662
663  requeue:
664         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
665         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
666         put_ldev(mdev);
667         return 1;
668 }
669
670 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
671 {
672         int number, i, size;
673         sector_t sector;
674         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
675
676         if (unlikely(cancel))
677                 return 1;
678
679         number = drbd_rs_number_requests(mdev);
680
681         sector = mdev->ov_position;
682         for (i = 0; i < number; i++) {
683                 if (sector >= capacity) {
684                         return 1;
685                 }
686
687                 size = BM_BLOCK_SIZE;
688
689                 if (drbd_rs_should_slow_down(mdev, sector) ||
690                     drbd_try_rs_begin_io(mdev, sector)) {
691                         mdev->ov_position = sector;
692                         goto requeue;
693                 }
694
695                 if (sector + (size>>9) > capacity)
696                         size = (capacity-sector)<<9;
697
698                 inc_rs_pending(mdev);
699                 if (!drbd_send_ov_request(mdev, sector, size)) {
700                         dec_rs_pending(mdev);
701                         return 0;
702                 }
703                 sector += BM_SECT_PER_BIT;
704         }
705         mdev->ov_position = sector;
706
707  requeue:
708         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710         return 1;
711 }
712
713
714 void start_resync_timer_fn(unsigned long data)
715 {
716         struct drbd_conf *mdev = (struct drbd_conf *) data;
717
718         drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
719 }
720
721 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
722 {
723         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
724                 dev_warn(DEV, "w_start_resync later...\n");
725                 mdev->start_resync_timer.expires = jiffies + HZ/10;
726                 add_timer(&mdev->start_resync_timer);
727                 return 1;
728         }
729
730         drbd_start_resync(mdev, C_SYNC_SOURCE);
731         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
732         return 1;
733 }
734
735 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
736 {
737         kfree(w);
738         ov_oos_print(mdev);
739         drbd_resync_finished(mdev);
740
741         return 1;
742 }
743
744 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
745 {
746         kfree(w);
747
748         drbd_resync_finished(mdev);
749
750         return 1;
751 }
752
753 static void ping_peer(struct drbd_conf *mdev)
754 {
755         clear_bit(GOT_PING_ACK, &mdev->flags);
756         request_ping(mdev);
757         wait_event(mdev->misc_wait,
758                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
759 }
760
761 int drbd_resync_finished(struct drbd_conf *mdev)
762 {
763         unsigned long db, dt, dbdt;
764         unsigned long n_oos;
765         union drbd_state os, ns;
766         struct drbd_work *w;
767         char *khelper_cmd = NULL;
768         int verify_done = 0;
769
770         /* Remove all elements from the resync LRU. Since future actions
771          * might set bits in the (main) bitmap, then the entries in the
772          * resync LRU would be wrong. */
773         if (drbd_rs_del_all(mdev)) {
774                 /* In case this is not possible now, most probably because
775                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
776                  * queue (or even the read operations for those packets
777                  * is not finished by now).   Retry in 100ms. */
778
779                 schedule_timeout_interruptible(HZ / 10);
780                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
781                 if (w) {
782                         w->cb = w_resync_finished;
783                         drbd_queue_work(&mdev->data.work, w);
784                         return 1;
785                 }
786                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
787         }
788
789         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
790         if (dt <= 0)
791                 dt = 1;
792         db = mdev->rs_total;
793         dbdt = Bit2KB(db/dt);
794         mdev->rs_paused /= HZ;
795
796         if (!get_ldev(mdev))
797                 goto out;
798
799         ping_peer(mdev);
800
801         spin_lock_irq(&mdev->req_lock);
802         os = mdev->state;
803
804         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
805
806         /* This protects us against multiple calls (that can happen in the presence
807            of application IO), and against connectivity loss just before we arrive here. */
808         if (os.conn <= C_CONNECTED)
809                 goto out_unlock;
810
811         ns = os;
812         ns.conn = C_CONNECTED;
813
814         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
815              verify_done ? "Online verify " : "Resync",
816              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
817
818         n_oos = drbd_bm_total_weight(mdev);
819
820         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
821                 if (n_oos) {
822                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
823                               n_oos, Bit2KB(1));
824                         khelper_cmd = "out-of-sync";
825                 }
826         } else {
827                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
828
829                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
830                         khelper_cmd = "after-resync-target";
831
832                 if (mdev->csums_tfm && mdev->rs_total) {
833                         const unsigned long s = mdev->rs_same_csum;
834                         const unsigned long t = mdev->rs_total;
835                         const int ratio =
836                                 (t == 0)     ? 0 :
837                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
838                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
839                              "transferred %luK total %luK\n",
840                              ratio,
841                              Bit2KB(mdev->rs_same_csum),
842                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
843                              Bit2KB(mdev->rs_total));
844                 }
845         }
846
847         if (mdev->rs_failed) {
848                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
849
850                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
851                         ns.disk = D_INCONSISTENT;
852                         ns.pdsk = D_UP_TO_DATE;
853                 } else {
854                         ns.disk = D_UP_TO_DATE;
855                         ns.pdsk = D_INCONSISTENT;
856                 }
857         } else {
858                 ns.disk = D_UP_TO_DATE;
859                 ns.pdsk = D_UP_TO_DATE;
860
861                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862                         if (mdev->p_uuid) {
863                                 int i;
864                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
865                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
866                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
867                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
868                         } else {
869                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
870                         }
871                 }
872
873                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
874                         /* for verify runs, we don't update uuids here,
875                          * so there would be nothing to report. */
876                         drbd_uuid_set_bm(mdev, 0UL);
877                         drbd_print_uuids(mdev, "updated UUIDs");
878                         if (mdev->p_uuid) {
879                                 /* Now the two UUID sets are equal, update what we
880                                  * know of the peer. */
881                                 int i;
882                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
883                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
884                         }
885                 }
886         }
887
888         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
889 out_unlock:
890         spin_unlock_irq(&mdev->req_lock);
891         put_ldev(mdev);
892 out:
893         mdev->rs_total  = 0;
894         mdev->rs_failed = 0;
895         mdev->rs_paused = 0;
896         if (verify_done)
897                 mdev->ov_start_sector = 0;
898
899         drbd_md_sync(mdev);
900
901         if (khelper_cmd)
902                 drbd_khelper(mdev, khelper_cmd);
903
904         return 1;
905 }
906
907 /* helper */
908 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
909 {
910         if (drbd_ee_has_active_page(e)) {
911                 /* This might happen if sendpage() has not finished */
912                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
913                 atomic_add(i, &mdev->pp_in_use_by_net);
914                 atomic_sub(i, &mdev->pp_in_use);
915                 spin_lock_irq(&mdev->req_lock);
916                 list_add_tail(&e->w.list, &mdev->net_ee);
917                 spin_unlock_irq(&mdev->req_lock);
918                 wake_up(&drbd_pp_wait);
919         } else
920                 drbd_free_ee(mdev, e);
921 }
922
923 /**
924  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
925  * @mdev:       DRBD device.
926  * @w:          work object.
927  * @cancel:     The connection will be closed anyways
928  */
929 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
930 {
931         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
932         int ok;
933
934         if (unlikely(cancel)) {
935                 drbd_free_ee(mdev, e);
936                 dec_unacked(mdev);
937                 return 1;
938         }
939
940         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
941                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
942         } else {
943                 if (__ratelimit(&drbd_ratelimit_state))
944                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
945                             (unsigned long long)e->sector);
946
947                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
948         }
949
950         dec_unacked(mdev);
951
952         move_to_net_ee_or_free(mdev, e);
953
954         if (unlikely(!ok))
955                 dev_err(DEV, "drbd_send_block() failed\n");
956         return ok;
957 }
958
959 /**
960  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
961  * @mdev:       DRBD device.
962  * @w:          work object.
963  * @cancel:     The connection will be closed anyways
964  */
965 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
966 {
967         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
968         int ok;
969
970         if (unlikely(cancel)) {
971                 drbd_free_ee(mdev, e);
972                 dec_unacked(mdev);
973                 return 1;
974         }
975
976         if (get_ldev_if_state(mdev, D_FAILED)) {
977                 drbd_rs_complete_io(mdev, e->sector);
978                 put_ldev(mdev);
979         }
980
981         if (mdev->state.conn == C_AHEAD) {
982                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
983         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
984                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
985                         inc_rs_pending(mdev);
986                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
987                 } else {
988                         if (__ratelimit(&drbd_ratelimit_state))
989                                 dev_err(DEV, "Not sending RSDataReply, "
990                                     "partner DISKLESS!\n");
991                         ok = 1;
992                 }
993         } else {
994                 if (__ratelimit(&drbd_ratelimit_state))
995                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
996                             (unsigned long long)e->sector);
997
998                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
999
1000                 /* update resync data with failure */
1001                 drbd_rs_failed_io(mdev, e->sector, e->size);
1002         }
1003
1004         dec_unacked(mdev);
1005
1006         move_to_net_ee_or_free(mdev, e);
1007
1008         if (unlikely(!ok))
1009                 dev_err(DEV, "drbd_send_block() failed\n");
1010         return ok;
1011 }
1012
1013 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014 {
1015         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1016         struct digest_info *di;
1017         int digest_size;
1018         void *digest = NULL;
1019         int ok, eq = 0;
1020
1021         if (unlikely(cancel)) {
1022                 drbd_free_ee(mdev, e);
1023                 dec_unacked(mdev);
1024                 return 1;
1025         }
1026
1027         if (get_ldev(mdev)) {
1028                 drbd_rs_complete_io(mdev, e->sector);
1029                 put_ldev(mdev);
1030         }
1031
1032         di = e->digest;
1033
1034         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1035                 /* quick hack to try to avoid a race against reconfiguration.
1036                  * a real fix would be much more involved,
1037                  * introducing more locking mechanisms */
1038                 if (mdev->csums_tfm) {
1039                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1040                         D_ASSERT(digest_size == di->digest_size);
1041                         digest = kmalloc(digest_size, GFP_NOIO);
1042                 }
1043                 if (digest) {
1044                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1045                         eq = !memcmp(digest, di->digest, digest_size);
1046                         kfree(digest);
1047                 }
1048
1049                 if (eq) {
1050                         drbd_set_in_sync(mdev, e->sector, e->size);
1051                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1052                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1053                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1054                 } else {
1055                         inc_rs_pending(mdev);
1056                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1057                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1058                         kfree(di);
1059                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1060                 }
1061         } else {
1062                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1063                 if (__ratelimit(&drbd_ratelimit_state))
1064                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1065         }
1066
1067         dec_unacked(mdev);
1068         move_to_net_ee_or_free(mdev, e);
1069
1070         if (unlikely(!ok))
1071                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1072         return ok;
1073 }
1074
1075 /* TODO merge common code with w_e_send_csum */
1076 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1077 {
1078         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1079         sector_t sector = e->sector;
1080         unsigned int size = e->size;
1081         int digest_size;
1082         void *digest;
1083         int ok = 1;
1084
1085         if (unlikely(cancel))
1086                 goto out;
1087
1088         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1089         digest = kmalloc(digest_size, GFP_NOIO);
1090         if (!digest) {
1091                 ok = 0; /* terminate the connection in case the allocation failed */
1092                 goto out;
1093         }
1094
1095         if (likely(!(e->flags & EE_WAS_ERROR)))
1096                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097         else
1098                 memset(digest, 0, digest_size);
1099
1100         /* Free e and pages before send.
1101          * In case we block on congestion, we could otherwise run into
1102          * some distributed deadlock, if the other side blocks on
1103          * congestion as well, because our receiver blocks in
1104          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1105         drbd_free_ee(mdev, e);
1106         e = NULL;
1107         inc_rs_pending(mdev);
1108         ok = drbd_send_drequest_csum(mdev, sector, size,
1109                                      digest, digest_size,
1110                                      P_OV_REPLY);
1111         if (!ok)
1112                 dec_rs_pending(mdev);
1113         kfree(digest);
1114
1115 out:
1116         if (e)
1117                 drbd_free_ee(mdev, e);
1118         dec_unacked(mdev);
1119         return ok;
1120 }
1121
1122 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1123 {
1124         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1125                 mdev->ov_last_oos_size += size>>9;
1126         } else {
1127                 mdev->ov_last_oos_start = sector;
1128                 mdev->ov_last_oos_size = size>>9;
1129         }
1130         drbd_set_out_of_sync(mdev, sector, size);
1131 }
1132
1133 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1134 {
1135         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1136         struct digest_info *di;
1137         void *digest;
1138         sector_t sector = e->sector;
1139         unsigned int size = e->size;
1140         int digest_size;
1141         int ok, eq = 0;
1142
1143         if (unlikely(cancel)) {
1144                 drbd_free_ee(mdev, e);
1145                 dec_unacked(mdev);
1146                 return 1;
1147         }
1148
1149         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150          * the resync lru has been cleaned up already */
1151         if (get_ldev(mdev)) {
1152                 drbd_rs_complete_io(mdev, e->sector);
1153                 put_ldev(mdev);
1154         }
1155
1156         di = e->digest;
1157
1158         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160                 digest = kmalloc(digest_size, GFP_NOIO);
1161                 if (digest) {
1162                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163
1164                         D_ASSERT(digest_size == di->digest_size);
1165                         eq = !memcmp(digest, di->digest, digest_size);
1166                         kfree(digest);
1167                 }
1168         }
1169
1170                 /* Free e and pages before send.
1171                  * In case we block on congestion, we could otherwise run into
1172                  * some distributed deadlock, if the other side blocks on
1173                  * congestion as well, because our receiver blocks in
1174                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1175         drbd_free_ee(mdev, e);
1176         if (!eq)
1177                 drbd_ov_oos_found(mdev, sector, size);
1178         else
1179                 ov_oos_print(mdev);
1180
1181         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1182                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183
1184         dec_unacked(mdev);
1185
1186         --mdev->ov_left;
1187
1188         /* let's advance progress step marks only for every other megabyte */
1189         if ((mdev->ov_left & 0x200) == 0x200)
1190                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1191
1192         if (mdev->ov_left == 0) {
1193                 ov_oos_print(mdev);
1194                 drbd_resync_finished(mdev);
1195         }
1196
1197         return ok;
1198 }
1199
1200 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1201 {
1202         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1203         complete(&b->done);
1204         return 1;
1205 }
1206
1207 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1208 {
1209         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1210         struct p_barrier *p = &mdev->data.sbuf.barrier;
1211         int ok = 1;
1212
1213         /* really avoid racing with tl_clear.  w.cb may have been referenced
1214          * just before it was reassigned and re-queued, so double check that.
1215          * actually, this race was harmless, since we only try to send the
1216          * barrier packet here, and otherwise do nothing with the object.
1217          * but compare with the head of w_clear_epoch */
1218         spin_lock_irq(&mdev->req_lock);
1219         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1220                 cancel = 1;
1221         spin_unlock_irq(&mdev->req_lock);
1222         if (cancel)
1223                 return 1;
1224
1225         if (!drbd_get_data_sock(mdev))
1226                 return 0;
1227         p->barrier = b->br_number;
1228         /* inc_ap_pending was done where this was queued.
1229          * dec_ap_pending will be done in got_BarrierAck
1230          * or (on connection loss) in w_clear_epoch.  */
1231         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1232                                 (struct p_header80 *)p, sizeof(*p), 0);
1233         drbd_put_data_sock(mdev);
1234
1235         return ok;
1236 }
1237
1238 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239 {
1240         if (cancel)
1241                 return 1;
1242         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1243 }
1244
1245 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246 {
1247         struct drbd_request *req = container_of(w, struct drbd_request, w);
1248         int ok;
1249
1250         if (unlikely(cancel)) {
1251                 req_mod(req, send_canceled);
1252                 return 1;
1253         }
1254
1255         ok = drbd_send_oos(mdev, req);
1256         req_mod(req, oos_handed_to_network);
1257
1258         return ok;
1259 }
1260
1261 /**
1262  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1263  * @mdev:       DRBD device.
1264  * @w:          work object.
1265  * @cancel:     The connection will be closed anyways
1266  */
1267 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268 {
1269         struct drbd_request *req = container_of(w, struct drbd_request, w);
1270         int ok;
1271
1272         if (unlikely(cancel)) {
1273                 req_mod(req, send_canceled);
1274                 return 1;
1275         }
1276
1277         ok = drbd_send_dblock(mdev, req);
1278         req_mod(req, ok ? handed_over_to_network : send_failed);
1279
1280         return ok;
1281 }
1282
1283 /**
1284  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1285  * @mdev:       DRBD device.
1286  * @w:          work object.
1287  * @cancel:     The connection will be closed anyways
1288  */
1289 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1290 {
1291         struct drbd_request *req = container_of(w, struct drbd_request, w);
1292         int ok;
1293
1294         if (unlikely(cancel)) {
1295                 req_mod(req, send_canceled);
1296                 return 1;
1297         }
1298
1299         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1300                                 (unsigned long)req);
1301
1302         if (!ok) {
1303                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1304                  * so this is probably redundant */
1305                 if (mdev->state.conn >= C_CONNECTED)
1306                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1307         }
1308         req_mod(req, ok ? handed_over_to_network : send_failed);
1309
1310         return ok;
1311 }
1312
1313 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1314 {
1315         struct drbd_request *req = container_of(w, struct drbd_request, w);
1316
1317         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1318                 drbd_al_begin_io(mdev, req->sector);
1319         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1320            theoretically. Practically it can not deadlock, since this is
1321            only used when unfreezing IOs. All the extents of the requests
1322            that made it into the TL are already active */
1323
1324         drbd_req_make_private_bio(req, req->master_bio);
1325         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1326         generic_make_request(req->private_bio);
1327
1328         return 1;
1329 }
1330
1331 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1332 {
1333         struct drbd_conf *odev = mdev;
1334
1335         while (1) {
1336                 if (odev->sync_conf.after == -1)
1337                         return 1;
1338                 odev = minor_to_mdev(odev->sync_conf.after);
1339                 ERR_IF(!odev) return 1;
1340                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1341                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1342                     odev->state.aftr_isp || odev->state.peer_isp ||
1343                     odev->state.user_isp)
1344                         return 0;
1345         }
1346 }
1347
1348 /**
1349  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1350  * @mdev:       DRBD device.
1351  *
1352  * Called from process context only (admin command and after_state_ch).
1353  */
1354 static int _drbd_pause_after(struct drbd_conf *mdev)
1355 {
1356         struct drbd_conf *odev;
1357         int i, rv = 0;
1358
1359         for (i = 0; i < minor_count; i++) {
1360                 odev = minor_to_mdev(i);
1361                 if (!odev)
1362                         continue;
1363                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1364                         continue;
1365                 if (!_drbd_may_sync_now(odev))
1366                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1367                                != SS_NOTHING_TO_DO);
1368         }
1369
1370         return rv;
1371 }
1372
1373 /**
1374  * _drbd_resume_next() - Resume resync on all devices that may resync now
1375  * @mdev:       DRBD device.
1376  *
1377  * Called from process context only (admin command and worker).
1378  */
1379 static int _drbd_resume_next(struct drbd_conf *mdev)
1380 {
1381         struct drbd_conf *odev;
1382         int i, rv = 0;
1383
1384         for (i = 0; i < minor_count; i++) {
1385                 odev = minor_to_mdev(i);
1386                 if (!odev)
1387                         continue;
1388                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1389                         continue;
1390                 if (odev->state.aftr_isp) {
1391                         if (_drbd_may_sync_now(odev))
1392                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1393                                                         CS_HARD, NULL)
1394                                        != SS_NOTHING_TO_DO) ;
1395                 }
1396         }
1397         return rv;
1398 }
1399
1400 void resume_next_sg(struct drbd_conf *mdev)
1401 {
1402         write_lock_irq(&global_state_lock);
1403         _drbd_resume_next(mdev);
1404         write_unlock_irq(&global_state_lock);
1405 }
1406
1407 void suspend_other_sg(struct drbd_conf *mdev)
1408 {
1409         write_lock_irq(&global_state_lock);
1410         _drbd_pause_after(mdev);
1411         write_unlock_irq(&global_state_lock);
1412 }
1413
1414 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1415 {
1416         struct drbd_conf *odev;
1417
1418         if (o_minor == -1)
1419                 return NO_ERROR;
1420         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1421                 return ERR_SYNC_AFTER;
1422
1423         /* check for loops */
1424         odev = minor_to_mdev(o_minor);
1425         while (1) {
1426                 if (odev == mdev)
1427                         return ERR_SYNC_AFTER_CYCLE;
1428
1429                 /* dependency chain ends here, no cycles. */
1430                 if (odev->sync_conf.after == -1)
1431                         return NO_ERROR;
1432
1433                 /* follow the dependency chain */
1434                 odev = minor_to_mdev(odev->sync_conf.after);
1435         }
1436 }
1437
1438 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1439 {
1440         int changes;
1441         int retcode;
1442
1443         write_lock_irq(&global_state_lock);
1444         retcode = sync_after_error(mdev, na);
1445         if (retcode == NO_ERROR) {
1446                 mdev->sync_conf.after = na;
1447                 do {
1448                         changes  = _drbd_pause_after(mdev);
1449                         changes |= _drbd_resume_next(mdev);
1450                 } while (changes);
1451         }
1452         write_unlock_irq(&global_state_lock);
1453         return retcode;
1454 }
1455
1456 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1457 {
1458         atomic_set(&mdev->rs_sect_in, 0);
1459         atomic_set(&mdev->rs_sect_ev, 0);
1460         mdev->rs_in_flight = 0;
1461         mdev->rs_planed = 0;
1462         spin_lock(&mdev->peer_seq_lock);
1463         fifo_set(&mdev->rs_plan_s, 0);
1464         spin_unlock(&mdev->peer_seq_lock);
1465 }
1466
1467 /**
1468  * drbd_start_resync() - Start the resync process
1469  * @mdev:       DRBD device.
1470  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1471  *
1472  * This function might bring you directly into one of the
1473  * C_PAUSED_SYNC_* states.
1474  */
1475 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1476 {
1477         union drbd_state ns;
1478         int r;
1479
1480         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1481                 dev_err(DEV, "Resync already running!\n");
1482                 return;
1483         }
1484
1485         if (mdev->state.conn < C_AHEAD) {
1486                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1487                 drbd_rs_cancel_all(mdev);
1488                 /* This should be done when we abort the resync. We definitely do not
1489                    want to have this for connections going back and forth between
1490                    Ahead/Behind and SyncSource/SyncTarget */
1491         }
1492
1493         if (side == C_SYNC_TARGET) {
1494                 /* Since application IO was locked out during C_WF_BITMAP_T and
1495                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1496                    we check that we might make the data inconsistent. */
1497                 r = drbd_khelper(mdev, "before-resync-target");
1498                 r = (r >> 8) & 0xff;
1499                 if (r > 0) {
1500                         dev_info(DEV, "before-resync-target handler returned %d, "
1501                              "dropping connection.\n", r);
1502                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1503                         return;
1504                 }
1505         } else /* C_SYNC_SOURCE */ {
1506                 r = drbd_khelper(mdev, "before-resync-source");
1507                 r = (r >> 8) & 0xff;
1508                 if (r > 0) {
1509                         if (r == 3) {
1510                                 dev_info(DEV, "before-resync-source handler returned %d, "
1511                                          "ignoring. Old userland tools?", r);
1512                         } else {
1513                                 dev_info(DEV, "before-resync-source handler returned %d, "
1514                                          "dropping connection.\n", r);
1515                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1516                                 return;
1517                         }
1518                 }
1519         }
1520
1521         drbd_state_lock(mdev);
1522
1523         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1524                 drbd_state_unlock(mdev);
1525                 return;
1526         }
1527
1528         write_lock_irq(&global_state_lock);
1529         ns = mdev->state;
1530
1531         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1532
1533         ns.conn = side;
1534
1535         if (side == C_SYNC_TARGET)
1536                 ns.disk = D_INCONSISTENT;
1537         else /* side == C_SYNC_SOURCE */
1538                 ns.pdsk = D_INCONSISTENT;
1539
1540         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1541         ns = mdev->state;
1542
1543         if (ns.conn < C_CONNECTED)
1544                 r = SS_UNKNOWN_ERROR;
1545
1546         if (r == SS_SUCCESS) {
1547                 unsigned long tw = drbd_bm_total_weight(mdev);
1548                 unsigned long now = jiffies;
1549                 int i;
1550
1551                 mdev->rs_failed    = 0;
1552                 mdev->rs_paused    = 0;
1553                 mdev->rs_same_csum = 0;
1554                 mdev->rs_last_events = 0;
1555                 mdev->rs_last_sect_ev = 0;
1556                 mdev->rs_total     = tw;
1557                 mdev->rs_start     = now;
1558                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1559                         mdev->rs_mark_left[i] = tw;
1560                         mdev->rs_mark_time[i] = now;
1561                 }
1562                 _drbd_pause_after(mdev);
1563         }
1564         write_unlock_irq(&global_state_lock);
1565
1566         if (r == SS_SUCCESS) {
1567                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1568                      drbd_conn_str(ns.conn),
1569                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1570                      (unsigned long) mdev->rs_total);
1571                 if (side == C_SYNC_TARGET)
1572                         mdev->bm_resync_fo = 0;
1573
1574                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1575                  * with w_send_oos, or the sync target will get confused as to
1576                  * how much bits to resync.  We cannot do that always, because for an
1577                  * empty resync and protocol < 95, we need to do it here, as we call
1578                  * drbd_resync_finished from here in that case.
1579                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1580                  * and from after_state_ch otherwise. */
1581                 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1582                         drbd_gen_and_send_sync_uuid(mdev);
1583
1584                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1585                         /* This still has a race (about when exactly the peers
1586                          * detect connection loss) that can lead to a full sync
1587                          * on next handshake. In 8.3.9 we fixed this with explicit
1588                          * resync-finished notifications, but the fix
1589                          * introduces a protocol change.  Sleeping for some
1590                          * time longer than the ping interval + timeout on the
1591                          * SyncSource, to give the SyncTarget the chance to
1592                          * detect connection loss, then waiting for a ping
1593                          * response (implicit in drbd_resync_finished) reduces
1594                          * the race considerably, but does not solve it. */
1595                         if (side == C_SYNC_SOURCE)
1596                                 schedule_timeout_interruptible(
1597                                         mdev->net_conf->ping_int * HZ +
1598                                         mdev->net_conf->ping_timeo*HZ/9);
1599                         drbd_resync_finished(mdev);
1600                 }
1601
1602                 drbd_rs_controller_reset(mdev);
1603                 /* ns.conn may already be != mdev->state.conn,
1604                  * we may have been paused in between, or become paused until
1605                  * the timer triggers.
1606                  * No matter, that is handled in resync_timer_fn() */
1607                 if (ns.conn == C_SYNC_TARGET)
1608                         mod_timer(&mdev->resync_timer, jiffies);
1609
1610                 drbd_md_sync(mdev);
1611         }
1612         put_ldev(mdev);
1613         drbd_state_unlock(mdev);
1614 }
1615
1616 int drbd_worker(struct drbd_thread *thi)
1617 {
1618         struct drbd_conf *mdev = thi->mdev;
1619         struct drbd_work *w = NULL;
1620         LIST_HEAD(work_list);
1621         int intr = 0, i;
1622
1623         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1624
1625         while (get_t_state(thi) == Running) {
1626                 drbd_thread_current_set_cpu(mdev);
1627
1628                 if (down_trylock(&mdev->data.work.s)) {
1629                         mutex_lock(&mdev->data.mutex);
1630                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1631                                 drbd_tcp_uncork(mdev->data.socket);
1632                         mutex_unlock(&mdev->data.mutex);
1633
1634                         intr = down_interruptible(&mdev->data.work.s);
1635
1636                         mutex_lock(&mdev->data.mutex);
1637                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1638                                 drbd_tcp_cork(mdev->data.socket);
1639                         mutex_unlock(&mdev->data.mutex);
1640                 }
1641
1642                 if (intr) {
1643                         D_ASSERT(intr == -EINTR);
1644                         flush_signals(current);
1645                         ERR_IF (get_t_state(thi) == Running)
1646                                 continue;
1647                         break;
1648                 }
1649
1650                 if (get_t_state(thi) != Running)
1651                         break;
1652                 /* With this break, we have done a down() but not consumed
1653                    the entry from the list. The cleanup code takes care of
1654                    this...   */
1655
1656                 w = NULL;
1657                 spin_lock_irq(&mdev->data.work.q_lock);
1658                 ERR_IF(list_empty(&mdev->data.work.q)) {
1659                         /* something terribly wrong in our logic.
1660                          * we were able to down() the semaphore,
1661                          * but the list is empty... doh.
1662                          *
1663                          * what is the best thing to do now?
1664                          * try again from scratch, restarting the receiver,
1665                          * asender, whatnot? could break even more ugly,
1666                          * e.g. when we are primary, but no good local data.
1667                          *
1668                          * I'll try to get away just starting over this loop.
1669                          */
1670                         spin_unlock_irq(&mdev->data.work.q_lock);
1671                         continue;
1672                 }
1673                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1674                 list_del_init(&w->list);
1675                 spin_unlock_irq(&mdev->data.work.q_lock);
1676
1677                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1678                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1679                         if (mdev->state.conn >= C_CONNECTED)
1680                                 drbd_force_state(mdev,
1681                                                 NS(conn, C_NETWORK_FAILURE));
1682                 }
1683         }
1684         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1685         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1686
1687         spin_lock_irq(&mdev->data.work.q_lock);
1688         i = 0;
1689         while (!list_empty(&mdev->data.work.q)) {
1690                 list_splice_init(&mdev->data.work.q, &work_list);
1691                 spin_unlock_irq(&mdev->data.work.q_lock);
1692
1693                 while (!list_empty(&work_list)) {
1694                         w = list_entry(work_list.next, struct drbd_work, list);
1695                         list_del_init(&w->list);
1696                         w->cb(mdev, w, 1);
1697                         i++; /* dead debugging code */
1698                 }
1699
1700                 spin_lock_irq(&mdev->data.work.q_lock);
1701         }
1702         sema_init(&mdev->data.work.s, 0);
1703         /* DANGEROUS race: if someone did queue his work within the spinlock,
1704          * but up() ed outside the spinlock, we could get an up() on the
1705          * semaphore without corresponding list entry.
1706          * So don't do that.
1707          */
1708         spin_unlock_irq(&mdev->data.work.q_lock);
1709
1710         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1711         /* _drbd_set_state only uses stop_nowait.
1712          * wait here for the Exiting receiver. */
1713         drbd_thread_stop(&mdev->receiver);
1714         drbd_mdev_cleanup(mdev);
1715
1716         dev_info(DEV, "worker terminated\n");
1717
1718         clear_bit(DEVICE_DYING, &mdev->flags);
1719         clear_bit(CONFIG_PENDING, &mdev->flags);
1720         wake_up(&mdev->state_wait);
1721
1722         return 0;
1723 }