drbd: New packet for Ahead/Behind mode: P_OUT_OF_SYNC
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42
43
44
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60
61
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73         struct drbd_md_io *md_io;
74
75         md_io = (struct drbd_md_io *)bio->bi_private;
76         md_io->error = error;
77
78         complete(&md_io->event);
79 }
80
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86         unsigned long flags = 0;
87         struct drbd_conf *mdev = e->mdev;
88
89         D_ASSERT(e->block_id != ID_VACANT);
90
91         spin_lock_irqsave(&mdev->req_lock, flags);
92         mdev->read_cnt += e->size >> 9;
93         list_del(&e->w.list);
94         if (list_empty(&mdev->read_ee))
95                 wake_up(&mdev->ee_wait);
96         if (test_bit(__EE_WAS_ERROR, &e->flags))
97                 __drbd_chk_io_error(mdev, FALSE);
98         spin_unlock_irqrestore(&mdev->req_lock, flags);
99
100         drbd_queue_work(&mdev->data.work, &e->w);
101         put_ldev(mdev);
102 }
103
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108         unsigned long flags = 0;
109         struct drbd_conf *mdev = e->mdev;
110         sector_t e_sector;
111         int do_wake;
112         int is_syncer_req;
113         int do_al_complete_io;
114
115         D_ASSERT(e->block_id != ID_VACANT);
116
117         /* after we moved e to done_ee,
118          * we may no longer access it,
119          * it may be freed/reused already!
120          * (as soon as we release the req_lock) */
121         e_sector = e->sector;
122         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123         is_syncer_req = is_syncer_block_id(e->block_id);
124
125         spin_lock_irqsave(&mdev->req_lock, flags);
126         mdev->writ_cnt += e->size >> 9;
127         list_del(&e->w.list); /* has been on active_ee or sync_ee */
128         list_add_tail(&e->w.list, &mdev->done_ee);
129
130         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131          * neither did we wake possibly waiting conflicting requests.
132          * done from "drbd_process_done_ee" within the appropriate w.cb
133          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134
135         do_wake = is_syncer_req
136                 ? list_empty(&mdev->sync_ee)
137                 : list_empty(&mdev->active_ee);
138
139         if (test_bit(__EE_WAS_ERROR, &e->flags))
140                 __drbd_chk_io_error(mdev, FALSE);
141         spin_unlock_irqrestore(&mdev->req_lock, flags);
142
143         if (is_syncer_req)
144                 drbd_rs_complete_io(mdev, e_sector);
145
146         if (do_wake)
147                 wake_up(&mdev->ee_wait);
148
149         if (do_al_complete_io)
150                 drbd_al_complete_io(mdev, e_sector);
151
152         wake_asender(mdev);
153         put_ldev(mdev);
154 }
155
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161         struct drbd_epoch_entry *e = bio->bi_private;
162         struct drbd_conf *mdev = e->mdev;
163         int uptodate = bio_flagged(bio, BIO_UPTODATE);
164         int is_write = bio_data_dir(bio) == WRITE;
165
166         if (error)
167                 dev_warn(DEV, "%s: error=%d s=%llus\n",
168                                 is_write ? "write" : "read", error,
169                                 (unsigned long long)e->sector);
170         if (!error && !uptodate) {
171                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172                                 is_write ? "write" : "read",
173                                 (unsigned long long)e->sector);
174                 /* strange behavior of some lower level drivers...
175                  * fail the request by clearing the uptodate flag,
176                  * but do not return any error?! */
177                 error = -EIO;
178         }
179
180         if (error)
181                 set_bit(__EE_WAS_ERROR, &e->flags);
182
183         bio_put(bio); /* no need for the bio anymore */
184         if (atomic_dec_and_test(&e->pending_bios)) {
185                 if (is_write)
186                         drbd_endio_write_sec_final(e);
187                 else
188                         drbd_endio_read_sec_final(e);
189         }
190 }
191
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196         unsigned long flags;
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         struct bio_and_error m;
200         enum drbd_req_event what;
201         int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203         if (!error && !uptodate) {
204                 dev_warn(DEV, "p %s: setting error to -EIO\n",
205                          bio_data_dir(bio) == WRITE ? "write" : "read");
206                 /* strange behavior of some lower level drivers...
207                  * fail the request by clearing the uptodate flag,
208                  * but do not return any error?! */
209                 error = -EIO;
210         }
211
212         /* to avoid recursion in __req_mod */
213         if (unlikely(error)) {
214                 what = (bio_data_dir(bio) == WRITE)
215                         ? write_completed_with_error
216                         : (bio_rw(bio) == READ)
217                           ? read_completed_with_error
218                           : read_ahead_completed_with_error;
219         } else
220                 what = completed_ok;
221
222         bio_put(req->private_bio);
223         req->private_bio = ERR_PTR(error);
224
225         /* not req_mod(), we need irqsave here! */
226         spin_lock_irqsave(&mdev->req_lock, flags);
227         __req_mod(req, what, &m);
228         spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230         if (m.bio)
231                 complete_master_bio(mdev, &m);
232 }
233
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236         struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238         /* We should not detach for read io-error,
239          * but try to WRITE the P_DATA_REPLY to the failed location,
240          * to give the disk the chance to relocate that block */
241
242         spin_lock_irq(&mdev->req_lock);
243         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244                 _req_mod(req, read_retry_remote_canceled);
245                 spin_unlock_irq(&mdev->req_lock);
246                 return 1;
247         }
248         spin_unlock_irq(&mdev->req_lock);
249
250         return w_send_read_req(mdev, w, 0);
251 }
252
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255         ERR_IF(cancel) return 1;
256         dev_err(DEV, "resync inactive, but callback triggered??\n");
257         return 1; /* Simply ignore this! */
258 }
259
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262         struct hash_desc desc;
263         struct scatterlist sg;
264         struct page *page = e->pages;
265         struct page *tmp;
266         unsigned len;
267
268         desc.tfm = tfm;
269         desc.flags = 0;
270
271         sg_init_table(&sg, 1);
272         crypto_hash_init(&desc);
273
274         while ((tmp = page_chain_next(page))) {
275                 /* all but the last page will be fully used */
276                 sg_set_page(&sg, page, PAGE_SIZE, 0);
277                 crypto_hash_update(&desc, &sg, sg.length);
278                 page = tmp;
279         }
280         /* and now the last, possibly only partially used page */
281         len = e->size & (PAGE_SIZE - 1);
282         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283         crypto_hash_update(&desc, &sg, sg.length);
284         crypto_hash_final(&desc, digest);
285 }
286
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289         struct hash_desc desc;
290         struct scatterlist sg;
291         struct bio_vec *bvec;
292         int i;
293
294         desc.tfm = tfm;
295         desc.flags = 0;
296
297         sg_init_table(&sg, 1);
298         crypto_hash_init(&desc);
299
300         __bio_for_each_segment(bvec, bio, i, 0) {
301                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302                 crypto_hash_update(&desc, &sg, sg.length);
303         }
304         crypto_hash_final(&desc, digest);
305 }
306
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310         int digest_size;
311         void *digest;
312         int ok;
313
314         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315
316         if (unlikely(cancel)) {
317                 drbd_free_ee(mdev, e);
318                 return 1;
319         }
320
321         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323                 digest = kmalloc(digest_size, GFP_NOIO);
324                 if (digest) {
325                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326
327                         inc_rs_pending(mdev);
328                         ok = drbd_send_drequest_csum(mdev,
329                                                      e->sector,
330                                                      e->size,
331                                                      digest,
332                                                      digest_size,
333                                                      P_CSUM_RS_REQUEST);
334                         kfree(digest);
335                 } else {
336                         dev_err(DEV, "kmalloc() of digest failed.\n");
337                         ok = 0;
338                 }
339         } else
340                 ok = 1;
341
342         drbd_free_ee(mdev, e);
343
344         if (unlikely(!ok))
345                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346         return ok;
347 }
348
349 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
350
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353         struct drbd_epoch_entry *e;
354
355         if (!get_ldev(mdev))
356                 return -EIO;
357
358         if (drbd_rs_should_slow_down(mdev))
359                 goto defer;
360
361         /* GFP_TRY, because if there is no memory available right now, this may
362          * be rescheduled for later. It is "only" background resync, after all. */
363         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364         if (!e)
365                 goto defer;
366
367         e->w.cb = w_e_send_csum;
368         spin_lock_irq(&mdev->req_lock);
369         list_add(&e->w.list, &mdev->read_ee);
370         spin_unlock_irq(&mdev->req_lock);
371
372         atomic_add(size >> 9, &mdev->rs_sect_ev);
373         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374                 return 0;
375
376         /* drbd_submit_ee currently fails for one reason only:
377          * not being able to allocate enough bios.
378          * Is dropping the connection going to help? */
379         spin_lock_irq(&mdev->req_lock);
380         list_del(&e->w.list);
381         spin_unlock_irq(&mdev->req_lock);
382
383         drbd_free_ee(mdev, e);
384 defer:
385         put_ldev(mdev);
386         return -EAGAIN;
387 }
388
389 void resync_timer_fn(unsigned long data)
390 {
391         struct drbd_conf *mdev = (struct drbd_conf *) data;
392         int queue;
393
394         queue = 1;
395         switch (mdev->state.conn) {
396         case C_VERIFY_S:
397                 mdev->resync_work.cb = w_make_ov_request;
398                 break;
399         case C_SYNC_TARGET:
400                 mdev->resync_work.cb = w_make_resync_request;
401                 break;
402         default:
403                 queue = 0;
404                 mdev->resync_work.cb = w_resync_inactive;
405         }
406
407         /* harmless race: list_empty outside data.work.q_lock */
408         if (list_empty(&mdev->resync_work.list) && queue)
409                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414         int i;
415
416         for (i = 0; i < fb->size; i++)
417                 fb->values[i] = value;
418 }
419
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422         int ov;
423
424         ov = fb->values[fb->head_index];
425         fb->values[fb->head_index++] = value;
426
427         if (fb->head_index >= fb->size)
428                 fb->head_index = 0;
429
430         return ov;
431 }
432
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435         int i;
436
437         for (i = 0; i < fb->size; i++)
438                 fb->values[i] += value;
439 }
440
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444         unsigned int want;     /* The number of sectors we want in the proxy */
445         int req_sect; /* Number of sectors to request in this turn */
446         int correction; /* Number of sectors more we need in the proxy*/
447         int cps; /* correction per invocation of drbd_rs_controller() */
448         int steps; /* Number of time steps to plan ahead */
449         int curr_corr;
450         int max_sect;
451
452         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453         mdev->rs_in_flight -= sect_in;
454
455         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456
457         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458
459         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461         } else { /* normal path */
462                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464         }
465
466         correction = want - mdev->rs_in_flight - mdev->rs_planed;
467
468         /* Plan ahead */
469         cps = correction / steps;
470         fifo_add_val(&mdev->rs_plan_s, cps);
471         mdev->rs_planed += cps * steps;
472
473         /* What we do in this step */
474         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475         spin_unlock(&mdev->peer_seq_lock);
476         mdev->rs_planed -= curr_corr;
477
478         req_sect = sect_in + curr_corr;
479         if (req_sect < 0)
480                 req_sect = 0;
481
482         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483         if (req_sect > max_sect)
484                 req_sect = max_sect;
485
486         /*
487         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488                  sect_in, mdev->rs_in_flight, want, correction,
489                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
490         */
491
492         return req_sect;
493 }
494
495 int drbd_rs_number_requests(struct drbd_conf *mdev)
496 {
497         int number;
498         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
499                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
500                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
501         } else {
502                 mdev->c_sync_rate = mdev->sync_conf.rate;
503                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
504         }
505
506         /* Throttle resync on lower level disk activity, which may also be
507          * caused by application IO on Primary/SyncTarget.
508          * Keep this after the call to drbd_rs_controller, as that assumes
509          * to be called as precisely as possible every SLEEP_TIME,
510          * and would be confused otherwise. */
511         if (number && drbd_rs_should_slow_down(mdev)) {
512                 mdev->c_sync_rate = 1;
513                 number = 0;
514         }
515
516         /* ignore the amount of pending requests, the resync controller should
517          * throttle down to incoming reply rate soon enough anyways. */
518         return number;
519 }
520
521 int w_make_resync_request(struct drbd_conf *mdev,
522                 struct drbd_work *w, int cancel)
523 {
524         unsigned long bit;
525         sector_t sector;
526         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
527         int max_bio_size;
528         int number, rollback_i, size;
529         int align, queued, sndbuf;
530         int i = 0;
531
532         if (unlikely(cancel))
533                 return 1;
534
535         if (unlikely(mdev->state.conn < C_CONNECTED)) {
536                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
537                 return 0;
538         }
539
540         if (mdev->state.conn != C_SYNC_TARGET)
541                 dev_err(DEV, "%s in w_make_resync_request\n",
542                         drbd_conn_str(mdev->state.conn));
543
544         if (mdev->rs_total == 0) {
545                 /* empty resync? */
546                 drbd_resync_finished(mdev);
547                 return 1;
548         }
549
550         if (!get_ldev(mdev)) {
551                 /* Since we only need to access mdev->rsync a
552                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
553                    to continue resync with a broken disk makes no sense at
554                    all */
555                 dev_err(DEV, "Disk broke down during resync!\n");
556                 mdev->resync_work.cb = w_resync_inactive;
557                 return 1;
558         }
559
560         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
561          * if it should be necessary */
562         max_bio_size =
563                 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
564                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
565
566         number = drbd_rs_number_requests(mdev);
567         if (number == 0)
568                 goto requeue;
569
570         for (i = 0; i < number; i++) {
571                 /* Stop generating RS requests, when half of the send buffer is filled */
572                 mutex_lock(&mdev->data.mutex);
573                 if (mdev->data.socket) {
574                         queued = mdev->data.socket->sk->sk_wmem_queued;
575                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
576                 } else {
577                         queued = 1;
578                         sndbuf = 0;
579                 }
580                 mutex_unlock(&mdev->data.mutex);
581                 if (queued > sndbuf / 2)
582                         goto requeue;
583
584 next_sector:
585                 size = BM_BLOCK_SIZE;
586                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
587
588                 if (bit == -1UL) {
589                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
590                         mdev->resync_work.cb = w_resync_inactive;
591                         put_ldev(mdev);
592                         return 1;
593                 }
594
595                 sector = BM_BIT_TO_SECT(bit);
596
597                 if (drbd_try_rs_begin_io(mdev, sector)) {
598                         mdev->bm_resync_fo = bit;
599                         goto requeue;
600                 }
601                 mdev->bm_resync_fo = bit + 1;
602
603                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
604                         drbd_rs_complete_io(mdev, sector);
605                         goto next_sector;
606                 }
607
608 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
609                 /* try to find some adjacent bits.
610                  * we stop if we have already the maximum req size.
611                  *
612                  * Additionally always align bigger requests, in order to
613                  * be prepared for all stripe sizes of software RAIDs.
614                  */
615                 align = 1;
616                 rollback_i = i;
617                 for (;;) {
618                         if (size + BM_BLOCK_SIZE > max_bio_size)
619                                 break;
620
621                         /* Be always aligned */
622                         if (sector & ((1<<(align+3))-1))
623                                 break;
624
625                         /* do not cross extent boundaries */
626                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
627                                 break;
628                         /* now, is it actually dirty, after all?
629                          * caution, drbd_bm_test_bit is tri-state for some
630                          * obscure reason; ( b == 0 ) would get the out-of-band
631                          * only accidentally right because of the "oddly sized"
632                          * adjustment below */
633                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
634                                 break;
635                         bit++;
636                         size += BM_BLOCK_SIZE;
637                         if ((BM_BLOCK_SIZE << align) <= size)
638                                 align++;
639                         i++;
640                 }
641                 /* if we merged some,
642                  * reset the offset to start the next drbd_bm_find_next from */
643                 if (size > BM_BLOCK_SIZE)
644                         mdev->bm_resync_fo = bit + 1;
645 #endif
646
647                 /* adjust very last sectors, in case we are oddly sized */
648                 if (sector + (size>>9) > capacity)
649                         size = (capacity-sector)<<9;
650                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
651                         switch (read_for_csum(mdev, sector, size)) {
652                         case -EIO: /* Disk failure */
653                                 put_ldev(mdev);
654                                 return 0;
655                         case -EAGAIN: /* allocation failed, or ldev busy */
656                                 drbd_rs_complete_io(mdev, sector);
657                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
658                                 i = rollback_i;
659                                 goto requeue;
660                         case 0:
661                                 /* everything ok */
662                                 break;
663                         default:
664                                 BUG();
665                         }
666                 } else {
667                         inc_rs_pending(mdev);
668                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669                                                sector, size, ID_SYNCER)) {
670                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
671                                 dec_rs_pending(mdev);
672                                 put_ldev(mdev);
673                                 return 0;
674                         }
675                 }
676         }
677
678         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
679                 /* last syncer _request_ was sent,
680                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
681                  * next sync group will resume), as soon as we receive the last
682                  * resync data block, and the last bit is cleared.
683                  * until then resync "work" is "inactive" ...
684                  */
685                 mdev->resync_work.cb = w_resync_inactive;
686                 put_ldev(mdev);
687                 return 1;
688         }
689
690  requeue:
691         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693         put_ldev(mdev);
694         return 1;
695 }
696
697 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699         int number, i, size;
700         sector_t sector;
701         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
702
703         if (unlikely(cancel))
704                 return 1;
705
706         if (unlikely(mdev->state.conn < C_CONNECTED)) {
707                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
708                 return 0;
709         }
710
711         number = drbd_rs_number_requests(mdev);
712
713         sector = mdev->ov_position;
714         for (i = 0; i < number; i++) {
715                 if (sector >= capacity) {
716                         mdev->resync_work.cb = w_resync_inactive;
717                         return 1;
718                 }
719
720                 size = BM_BLOCK_SIZE;
721
722                 if (drbd_try_rs_begin_io(mdev, sector)) {
723                         mdev->ov_position = sector;
724                         goto requeue;
725                 }
726
727                 if (sector + (size>>9) > capacity)
728                         size = (capacity-sector)<<9;
729
730                 inc_rs_pending(mdev);
731                 if (!drbd_send_ov_request(mdev, sector, size)) {
732                         dec_rs_pending(mdev);
733                         return 0;
734                 }
735                 sector += BM_SECT_PER_BIT;
736         }
737         mdev->ov_position = sector;
738
739  requeue:
740         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
741         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
742         return 1;
743 }
744
745 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
746 {
747         kfree(w);
748         ov_oos_print(mdev);
749         drbd_resync_finished(mdev);
750
751         return 1;
752 }
753
754 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756         kfree(w);
757
758         drbd_resync_finished(mdev);
759
760         return 1;
761 }
762
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765         clear_bit(GOT_PING_ACK, &mdev->flags);
766         request_ping(mdev);
767         wait_event(mdev->misc_wait,
768                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
769 }
770
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773         unsigned long db, dt, dbdt;
774         unsigned long n_oos;
775         union drbd_state os, ns;
776         struct drbd_work *w;
777         char *khelper_cmd = NULL;
778         int verify_done = 0;
779
780         /* Remove all elements from the resync LRU. Since future actions
781          * might set bits in the (main) bitmap, then the entries in the
782          * resync LRU would be wrong. */
783         if (drbd_rs_del_all(mdev)) {
784                 /* In case this is not possible now, most probably because
785                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
786                  * queue (or even the read operations for those packets
787                  * is not finished by now).   Retry in 100ms. */
788
789                 __set_current_state(TASK_INTERRUPTIBLE);
790                 schedule_timeout(HZ / 10);
791                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792                 if (w) {
793                         w->cb = w_resync_finished;
794                         drbd_queue_work(&mdev->data.work, w);
795                         return 1;
796                 }
797                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798         }
799
800         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801         if (dt <= 0)
802                 dt = 1;
803         db = mdev->rs_total;
804         dbdt = Bit2KB(db/dt);
805         mdev->rs_paused /= HZ;
806
807         if (!get_ldev(mdev))
808                 goto out;
809
810         ping_peer(mdev);
811
812         spin_lock_irq(&mdev->req_lock);
813         os = mdev->state;
814
815         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
816
817         /* This protects us against multiple calls (that can happen in the presence
818            of application IO), and against connectivity loss just before we arrive here. */
819         if (os.conn <= C_CONNECTED)
820                 goto out_unlock;
821
822         ns = os;
823         ns.conn = C_CONNECTED;
824
825         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
826              verify_done ? "Online verify " : "Resync",
827              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
828
829         n_oos = drbd_bm_total_weight(mdev);
830
831         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
832                 if (n_oos) {
833                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
834                               n_oos, Bit2KB(1));
835                         khelper_cmd = "out-of-sync";
836                 }
837         } else {
838                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
839
840                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
841                         khelper_cmd = "after-resync-target";
842
843                 if (mdev->csums_tfm && mdev->rs_total) {
844                         const unsigned long s = mdev->rs_same_csum;
845                         const unsigned long t = mdev->rs_total;
846                         const int ratio =
847                                 (t == 0)     ? 0 :
848                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
849                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
850                              "transferred %luK total %luK\n",
851                              ratio,
852                              Bit2KB(mdev->rs_same_csum),
853                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
854                              Bit2KB(mdev->rs_total));
855                 }
856         }
857
858         if (mdev->rs_failed) {
859                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
860
861                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862                         ns.disk = D_INCONSISTENT;
863                         ns.pdsk = D_UP_TO_DATE;
864                 } else {
865                         ns.disk = D_UP_TO_DATE;
866                         ns.pdsk = D_INCONSISTENT;
867                 }
868         } else {
869                 ns.disk = D_UP_TO_DATE;
870                 ns.pdsk = D_UP_TO_DATE;
871
872                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
873                         if (mdev->p_uuid) {
874                                 int i;
875                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
876                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
877                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
878                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
879                         } else {
880                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
881                         }
882                 }
883
884                 drbd_uuid_set_bm(mdev, 0UL);
885
886                 if (mdev->p_uuid) {
887                         /* Now the two UUID sets are equal, update what we
888                          * know of the peer. */
889                         int i;
890                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
891                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
892                 }
893         }
894
895         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
896 out_unlock:
897         spin_unlock_irq(&mdev->req_lock);
898         put_ldev(mdev);
899 out:
900         mdev->rs_total  = 0;
901         mdev->rs_failed = 0;
902         mdev->rs_paused = 0;
903         if (verify_done)
904                 mdev->ov_start_sector = 0;
905
906         drbd_md_sync(mdev);
907
908         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
909                 dev_info(DEV, "Writing the whole bitmap\n");
910                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
911         }
912
913         if (khelper_cmd)
914                 drbd_khelper(mdev, khelper_cmd);
915
916         return 1;
917 }
918
919 /* helper */
920 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
921 {
922         if (drbd_ee_has_active_page(e)) {
923                 /* This might happen if sendpage() has not finished */
924                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
925                 atomic_add(i, &mdev->pp_in_use_by_net);
926                 atomic_sub(i, &mdev->pp_in_use);
927                 spin_lock_irq(&mdev->req_lock);
928                 list_add_tail(&e->w.list, &mdev->net_ee);
929                 spin_unlock_irq(&mdev->req_lock);
930                 wake_up(&drbd_pp_wait);
931         } else
932                 drbd_free_ee(mdev, e);
933 }
934
935 /**
936  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
937  * @mdev:       DRBD device.
938  * @w:          work object.
939  * @cancel:     The connection will be closed anyways
940  */
941 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
942 {
943         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
944         int ok;
945
946         if (unlikely(cancel)) {
947                 drbd_free_ee(mdev, e);
948                 dec_unacked(mdev);
949                 return 1;
950         }
951
952         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
953                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
954         } else {
955                 if (__ratelimit(&drbd_ratelimit_state))
956                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
957                             (unsigned long long)e->sector);
958
959                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
960         }
961
962         dec_unacked(mdev);
963
964         move_to_net_ee_or_free(mdev, e);
965
966         if (unlikely(!ok))
967                 dev_err(DEV, "drbd_send_block() failed\n");
968         return ok;
969 }
970
971 /**
972  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
973  * @mdev:       DRBD device.
974  * @w:          work object.
975  * @cancel:     The connection will be closed anyways
976  */
977 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
978 {
979         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
980         int ok;
981
982         if (unlikely(cancel)) {
983                 drbd_free_ee(mdev, e);
984                 dec_unacked(mdev);
985                 return 1;
986         }
987
988         if (get_ldev_if_state(mdev, D_FAILED)) {
989                 drbd_rs_complete_io(mdev, e->sector);
990                 put_ldev(mdev);
991         }
992
993         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
994                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
995                         inc_rs_pending(mdev);
996                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
997                 } else {
998                         if (__ratelimit(&drbd_ratelimit_state))
999                                 dev_err(DEV, "Not sending RSDataReply, "
1000                                     "partner DISKLESS!\n");
1001                         ok = 1;
1002                 }
1003         } else {
1004                 if (__ratelimit(&drbd_ratelimit_state))
1005                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1006                             (unsigned long long)e->sector);
1007
1008                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1009
1010                 /* update resync data with failure */
1011                 drbd_rs_failed_io(mdev, e->sector, e->size);
1012         }
1013
1014         dec_unacked(mdev);
1015
1016         move_to_net_ee_or_free(mdev, e);
1017
1018         if (unlikely(!ok))
1019                 dev_err(DEV, "drbd_send_block() failed\n");
1020         return ok;
1021 }
1022
1023 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1024 {
1025         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1026         struct digest_info *di;
1027         int digest_size;
1028         void *digest = NULL;
1029         int ok, eq = 0;
1030
1031         if (unlikely(cancel)) {
1032                 drbd_free_ee(mdev, e);
1033                 dec_unacked(mdev);
1034                 return 1;
1035         }
1036
1037         if (get_ldev(mdev)) {
1038                 drbd_rs_complete_io(mdev, e->sector);
1039                 put_ldev(mdev);
1040         }
1041
1042         di = e->digest;
1043
1044         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1045                 /* quick hack to try to avoid a race against reconfiguration.
1046                  * a real fix would be much more involved,
1047                  * introducing more locking mechanisms */
1048                 if (mdev->csums_tfm) {
1049                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1050                         D_ASSERT(digest_size == di->digest_size);
1051                         digest = kmalloc(digest_size, GFP_NOIO);
1052                 }
1053                 if (digest) {
1054                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1055                         eq = !memcmp(digest, di->digest, digest_size);
1056                         kfree(digest);
1057                 }
1058
1059                 if (eq) {
1060                         drbd_set_in_sync(mdev, e->sector, e->size);
1061                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1062                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1063                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1064                 } else {
1065                         inc_rs_pending(mdev);
1066                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1067                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1068                         kfree(di);
1069                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1070                 }
1071         } else {
1072                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1073                 if (__ratelimit(&drbd_ratelimit_state))
1074                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1075         }
1076
1077         dec_unacked(mdev);
1078         move_to_net_ee_or_free(mdev, e);
1079
1080         if (unlikely(!ok))
1081                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1082         return ok;
1083 }
1084
1085 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088         int digest_size;
1089         void *digest;
1090         int ok = 1;
1091
1092         if (unlikely(cancel))
1093                 goto out;
1094
1095         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1096                 goto out;
1097
1098         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1099         /* FIXME if this allocation fails, online verify will not terminate! */
1100         digest = kmalloc(digest_size, GFP_NOIO);
1101         if (digest) {
1102                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1103                 inc_rs_pending(mdev);
1104                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1105                                              digest, digest_size, P_OV_REPLY);
1106                 if (!ok)
1107                         dec_rs_pending(mdev);
1108                 kfree(digest);
1109         }
1110
1111 out:
1112         drbd_free_ee(mdev, e);
1113
1114         dec_unacked(mdev);
1115
1116         return ok;
1117 }
1118
1119 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1120 {
1121         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1122                 mdev->ov_last_oos_size += size>>9;
1123         } else {
1124                 mdev->ov_last_oos_start = sector;
1125                 mdev->ov_last_oos_size = size>>9;
1126         }
1127         drbd_set_out_of_sync(mdev, sector, size);
1128         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1129 }
1130
1131 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1132 {
1133         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1134         struct digest_info *di;
1135         int digest_size;
1136         void *digest;
1137         int ok, eq = 0;
1138
1139         if (unlikely(cancel)) {
1140                 drbd_free_ee(mdev, e);
1141                 dec_unacked(mdev);
1142                 return 1;
1143         }
1144
1145         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1146          * the resync lru has been cleaned up already */
1147         if (get_ldev(mdev)) {
1148                 drbd_rs_complete_io(mdev, e->sector);
1149                 put_ldev(mdev);
1150         }
1151
1152         di = e->digest;
1153
1154         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1155                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1156                 digest = kmalloc(digest_size, GFP_NOIO);
1157                 if (digest) {
1158                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1159
1160                         D_ASSERT(digest_size == di->digest_size);
1161                         eq = !memcmp(digest, di->digest, digest_size);
1162                         kfree(digest);
1163                 }
1164         } else {
1165                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1166                 if (__ratelimit(&drbd_ratelimit_state))
1167                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1168         }
1169
1170         dec_unacked(mdev);
1171         if (!eq)
1172                 drbd_ov_oos_found(mdev, e->sector, e->size);
1173         else
1174                 ov_oos_print(mdev);
1175
1176         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1177                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1178
1179         drbd_free_ee(mdev, e);
1180
1181         --mdev->ov_left;
1182
1183         /* let's advance progress step marks only for every other megabyte */
1184         if ((mdev->ov_left & 0x200) == 0x200)
1185                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1186
1187         if (mdev->ov_left == 0) {
1188                 ov_oos_print(mdev);
1189                 drbd_resync_finished(mdev);
1190         }
1191
1192         return ok;
1193 }
1194
1195 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1196 {
1197         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1198         complete(&b->done);
1199         return 1;
1200 }
1201
1202 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1203 {
1204         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1205         struct p_barrier *p = &mdev->data.sbuf.barrier;
1206         int ok = 1;
1207
1208         /* really avoid racing with tl_clear.  w.cb may have been referenced
1209          * just before it was reassigned and re-queued, so double check that.
1210          * actually, this race was harmless, since we only try to send the
1211          * barrier packet here, and otherwise do nothing with the object.
1212          * but compare with the head of w_clear_epoch */
1213         spin_lock_irq(&mdev->req_lock);
1214         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1215                 cancel = 1;
1216         spin_unlock_irq(&mdev->req_lock);
1217         if (cancel)
1218                 return 1;
1219
1220         if (!drbd_get_data_sock(mdev))
1221                 return 0;
1222         p->barrier = b->br_number;
1223         /* inc_ap_pending was done where this was queued.
1224          * dec_ap_pending will be done in got_BarrierAck
1225          * or (on connection loss) in w_clear_epoch.  */
1226         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1227                                 (struct p_header80 *)p, sizeof(*p), 0);
1228         drbd_put_data_sock(mdev);
1229
1230         return ok;
1231 }
1232
1233 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1234 {
1235         if (cancel)
1236                 return 1;
1237         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1238 }
1239
1240 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1241 {
1242         struct drbd_request *req = container_of(w, struct drbd_request, w);
1243         int ok;
1244
1245         if (unlikely(cancel)) {
1246                 req_mod(req, send_canceled);
1247                 return 1;
1248         }
1249
1250         ok = drbd_send_oos(mdev, req);
1251         req_mod(req, oos_handed_to_network);
1252
1253         return ok;
1254 }
1255
1256 /**
1257  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1258  * @mdev:       DRBD device.
1259  * @w:          work object.
1260  * @cancel:     The connection will be closed anyways
1261  */
1262 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1263 {
1264         struct drbd_request *req = container_of(w, struct drbd_request, w);
1265         int ok;
1266
1267         if (unlikely(cancel)) {
1268                 req_mod(req, send_canceled);
1269                 return 1;
1270         }
1271
1272         ok = drbd_send_dblock(mdev, req);
1273         req_mod(req, ok ? handed_over_to_network : send_failed);
1274
1275         return ok;
1276 }
1277
1278 /**
1279  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1280  * @mdev:       DRBD device.
1281  * @w:          work object.
1282  * @cancel:     The connection will be closed anyways
1283  */
1284 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1285 {
1286         struct drbd_request *req = container_of(w, struct drbd_request, w);
1287         int ok;
1288
1289         if (unlikely(cancel)) {
1290                 req_mod(req, send_canceled);
1291                 return 1;
1292         }
1293
1294         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1295                                 (unsigned long)req);
1296
1297         if (!ok) {
1298                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1299                  * so this is probably redundant */
1300                 if (mdev->state.conn >= C_CONNECTED)
1301                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1302         }
1303         req_mod(req, ok ? handed_over_to_network : send_failed);
1304
1305         return ok;
1306 }
1307
1308 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1309 {
1310         struct drbd_request *req = container_of(w, struct drbd_request, w);
1311
1312         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1313                 drbd_al_begin_io(mdev, req->sector);
1314         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1315            theoretically. Practically it can not deadlock, since this is
1316            only used when unfreezing IOs. All the extents of the requests
1317            that made it into the TL are already active */
1318
1319         drbd_req_make_private_bio(req, req->master_bio);
1320         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1321         generic_make_request(req->private_bio);
1322
1323         return 1;
1324 }
1325
1326 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1327 {
1328         struct drbd_conf *odev = mdev;
1329
1330         while (1) {
1331                 if (odev->sync_conf.after == -1)
1332                         return 1;
1333                 odev = minor_to_mdev(odev->sync_conf.after);
1334                 ERR_IF(!odev) return 1;
1335                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1336                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1337                     odev->state.aftr_isp || odev->state.peer_isp ||
1338                     odev->state.user_isp)
1339                         return 0;
1340         }
1341 }
1342
1343 /**
1344  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1345  * @mdev:       DRBD device.
1346  *
1347  * Called from process context only (admin command and after_state_ch).
1348  */
1349 static int _drbd_pause_after(struct drbd_conf *mdev)
1350 {
1351         struct drbd_conf *odev;
1352         int i, rv = 0;
1353
1354         for (i = 0; i < minor_count; i++) {
1355                 odev = minor_to_mdev(i);
1356                 if (!odev)
1357                         continue;
1358                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1359                         continue;
1360                 if (!_drbd_may_sync_now(odev))
1361                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1362                                != SS_NOTHING_TO_DO);
1363         }
1364
1365         return rv;
1366 }
1367
1368 /**
1369  * _drbd_resume_next() - Resume resync on all devices that may resync now
1370  * @mdev:       DRBD device.
1371  *
1372  * Called from process context only (admin command and worker).
1373  */
1374 static int _drbd_resume_next(struct drbd_conf *mdev)
1375 {
1376         struct drbd_conf *odev;
1377         int i, rv = 0;
1378
1379         for (i = 0; i < minor_count; i++) {
1380                 odev = minor_to_mdev(i);
1381                 if (!odev)
1382                         continue;
1383                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1384                         continue;
1385                 if (odev->state.aftr_isp) {
1386                         if (_drbd_may_sync_now(odev))
1387                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1388                                                         CS_HARD, NULL)
1389                                        != SS_NOTHING_TO_DO) ;
1390                 }
1391         }
1392         return rv;
1393 }
1394
1395 void resume_next_sg(struct drbd_conf *mdev)
1396 {
1397         write_lock_irq(&global_state_lock);
1398         _drbd_resume_next(mdev);
1399         write_unlock_irq(&global_state_lock);
1400 }
1401
1402 void suspend_other_sg(struct drbd_conf *mdev)
1403 {
1404         write_lock_irq(&global_state_lock);
1405         _drbd_pause_after(mdev);
1406         write_unlock_irq(&global_state_lock);
1407 }
1408
1409 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1410 {
1411         struct drbd_conf *odev;
1412
1413         if (o_minor == -1)
1414                 return NO_ERROR;
1415         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1416                 return ERR_SYNC_AFTER;
1417
1418         /* check for loops */
1419         odev = minor_to_mdev(o_minor);
1420         while (1) {
1421                 if (odev == mdev)
1422                         return ERR_SYNC_AFTER_CYCLE;
1423
1424                 /* dependency chain ends here, no cycles. */
1425                 if (odev->sync_conf.after == -1)
1426                         return NO_ERROR;
1427
1428                 /* follow the dependency chain */
1429                 odev = minor_to_mdev(odev->sync_conf.after);
1430         }
1431 }
1432
1433 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1434 {
1435         int changes;
1436         int retcode;
1437
1438         write_lock_irq(&global_state_lock);
1439         retcode = sync_after_error(mdev, na);
1440         if (retcode == NO_ERROR) {
1441                 mdev->sync_conf.after = na;
1442                 do {
1443                         changes  = _drbd_pause_after(mdev);
1444                         changes |= _drbd_resume_next(mdev);
1445                 } while (changes);
1446         }
1447         write_unlock_irq(&global_state_lock);
1448         return retcode;
1449 }
1450
1451 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1452 {
1453         atomic_set(&mdev->rs_sect_in, 0);
1454         atomic_set(&mdev->rs_sect_ev, 0);
1455         mdev->rs_in_flight = 0;
1456         mdev->rs_planed = 0;
1457         spin_lock(&mdev->peer_seq_lock);
1458         fifo_set(&mdev->rs_plan_s, 0);
1459         spin_unlock(&mdev->peer_seq_lock);
1460 }
1461
1462 /**
1463  * drbd_start_resync() - Start the resync process
1464  * @mdev:       DRBD device.
1465  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1466  *
1467  * This function might bring you directly into one of the
1468  * C_PAUSED_SYNC_* states.
1469  */
1470 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1471 {
1472         union drbd_state ns;
1473         int r;
1474
1475         if (mdev->state.conn >= C_SYNC_SOURCE) {
1476                 dev_err(DEV, "Resync already running!\n");
1477                 return;
1478         }
1479
1480         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1481         drbd_rs_cancel_all(mdev);
1482
1483         if (side == C_SYNC_TARGET) {
1484                 /* Since application IO was locked out during C_WF_BITMAP_T and
1485                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1486                    we check that we might make the data inconsistent. */
1487                 r = drbd_khelper(mdev, "before-resync-target");
1488                 r = (r >> 8) & 0xff;
1489                 if (r > 0) {
1490                         dev_info(DEV, "before-resync-target handler returned %d, "
1491                              "dropping connection.\n", r);
1492                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1493                         return;
1494                 }
1495         }
1496
1497         drbd_state_lock(mdev);
1498
1499         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1500                 drbd_state_unlock(mdev);
1501                 return;
1502         }
1503
1504         if (side == C_SYNC_TARGET) {
1505                 mdev->bm_resync_fo = 0;
1506         } else /* side == C_SYNC_SOURCE */ {
1507                 u64 uuid;
1508
1509                 get_random_bytes(&uuid, sizeof(u64));
1510                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1511                 drbd_send_sync_uuid(mdev, uuid);
1512
1513                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1514         }
1515
1516         write_lock_irq(&global_state_lock);
1517         ns = mdev->state;
1518
1519         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1520
1521         ns.conn = side;
1522
1523         if (side == C_SYNC_TARGET)
1524                 ns.disk = D_INCONSISTENT;
1525         else /* side == C_SYNC_SOURCE */
1526                 ns.pdsk = D_INCONSISTENT;
1527
1528         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1529         ns = mdev->state;
1530
1531         if (ns.conn < C_CONNECTED)
1532                 r = SS_UNKNOWN_ERROR;
1533
1534         if (r == SS_SUCCESS) {
1535                 unsigned long tw = drbd_bm_total_weight(mdev);
1536                 unsigned long now = jiffies;
1537                 int i;
1538
1539                 mdev->rs_failed    = 0;
1540                 mdev->rs_paused    = 0;
1541                 mdev->rs_same_csum = 0;
1542                 mdev->rs_last_events = 0;
1543                 mdev->rs_last_sect_ev = 0;
1544                 mdev->rs_total     = tw;
1545                 mdev->rs_start     = now;
1546                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1547                         mdev->rs_mark_left[i] = tw;
1548                         mdev->rs_mark_time[i] = now;
1549                 }
1550                 _drbd_pause_after(mdev);
1551         }
1552         write_unlock_irq(&global_state_lock);
1553         put_ldev(mdev);
1554
1555         if (r == SS_SUCCESS) {
1556                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1557                      drbd_conn_str(ns.conn),
1558                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1559                      (unsigned long) mdev->rs_total);
1560
1561                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1562                         /* This still has a race (about when exactly the peers
1563                          * detect connection loss) that can lead to a full sync
1564                          * on next handshake. In 8.3.9 we fixed this with explicit
1565                          * resync-finished notifications, but the fix
1566                          * introduces a protocol change.  Sleeping for some
1567                          * time longer than the ping interval + timeout on the
1568                          * SyncSource, to give the SyncTarget the chance to
1569                          * detect connection loss, then waiting for a ping
1570                          * response (implicit in drbd_resync_finished) reduces
1571                          * the race considerably, but does not solve it. */
1572                         if (side == C_SYNC_SOURCE)
1573                                 schedule_timeout_interruptible(
1574                                         mdev->net_conf->ping_int * HZ +
1575                                         mdev->net_conf->ping_timeo*HZ/9);
1576                         drbd_resync_finished(mdev);
1577                 }
1578
1579                 drbd_rs_controller_reset(mdev);
1580                 /* ns.conn may already be != mdev->state.conn,
1581                  * we may have been paused in between, or become paused until
1582                  * the timer triggers.
1583                  * No matter, that is handled in resync_timer_fn() */
1584                 if (ns.conn == C_SYNC_TARGET)
1585                         mod_timer(&mdev->resync_timer, jiffies);
1586
1587                 drbd_md_sync(mdev);
1588         }
1589         drbd_state_unlock(mdev);
1590 }
1591
1592 int drbd_worker(struct drbd_thread *thi)
1593 {
1594         struct drbd_conf *mdev = thi->mdev;
1595         struct drbd_work *w = NULL;
1596         LIST_HEAD(work_list);
1597         int intr = 0, i;
1598
1599         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1600
1601         while (get_t_state(thi) == Running) {
1602                 drbd_thread_current_set_cpu(mdev);
1603
1604                 if (down_trylock(&mdev->data.work.s)) {
1605                         mutex_lock(&mdev->data.mutex);
1606                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1607                                 drbd_tcp_uncork(mdev->data.socket);
1608                         mutex_unlock(&mdev->data.mutex);
1609
1610                         intr = down_interruptible(&mdev->data.work.s);
1611
1612                         mutex_lock(&mdev->data.mutex);
1613                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1614                                 drbd_tcp_cork(mdev->data.socket);
1615                         mutex_unlock(&mdev->data.mutex);
1616                 }
1617
1618                 if (intr) {
1619                         D_ASSERT(intr == -EINTR);
1620                         flush_signals(current);
1621                         ERR_IF (get_t_state(thi) == Running)
1622                                 continue;
1623                         break;
1624                 }
1625
1626                 if (get_t_state(thi) != Running)
1627                         break;
1628                 /* With this break, we have done a down() but not consumed
1629                    the entry from the list. The cleanup code takes care of
1630                    this...   */
1631
1632                 w = NULL;
1633                 spin_lock_irq(&mdev->data.work.q_lock);
1634                 ERR_IF(list_empty(&mdev->data.work.q)) {
1635                         /* something terribly wrong in our logic.
1636                          * we were able to down() the semaphore,
1637                          * but the list is empty... doh.
1638                          *
1639                          * what is the best thing to do now?
1640                          * try again from scratch, restarting the receiver,
1641                          * asender, whatnot? could break even more ugly,
1642                          * e.g. when we are primary, but no good local data.
1643                          *
1644                          * I'll try to get away just starting over this loop.
1645                          */
1646                         spin_unlock_irq(&mdev->data.work.q_lock);
1647                         continue;
1648                 }
1649                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1650                 list_del_init(&w->list);
1651                 spin_unlock_irq(&mdev->data.work.q_lock);
1652
1653                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1654                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1655                         if (mdev->state.conn >= C_CONNECTED)
1656                                 drbd_force_state(mdev,
1657                                                 NS(conn, C_NETWORK_FAILURE));
1658                 }
1659         }
1660         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1661         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1662
1663         spin_lock_irq(&mdev->data.work.q_lock);
1664         i = 0;
1665         while (!list_empty(&mdev->data.work.q)) {
1666                 list_splice_init(&mdev->data.work.q, &work_list);
1667                 spin_unlock_irq(&mdev->data.work.q_lock);
1668
1669                 while (!list_empty(&work_list)) {
1670                         w = list_entry(work_list.next, struct drbd_work, list);
1671                         list_del_init(&w->list);
1672                         w->cb(mdev, w, 1);
1673                         i++; /* dead debugging code */
1674                 }
1675
1676                 spin_lock_irq(&mdev->data.work.q_lock);
1677         }
1678         sema_init(&mdev->data.work.s, 0);
1679         /* DANGEROUS race: if someone did queue his work within the spinlock,
1680          * but up() ed outside the spinlock, we could get an up() on the
1681          * semaphore without corresponding list entry.
1682          * So don't do that.
1683          */
1684         spin_unlock_irq(&mdev->data.work.q_lock);
1685
1686         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1687         /* _drbd_set_state only uses stop_nowait.
1688          * wait here for the Exiting receiver. */
1689         drbd_thread_stop(&mdev->receiver);
1690         drbd_mdev_cleanup(mdev);
1691
1692         dev_info(DEV, "worker terminated\n");
1693
1694         clear_bit(DEVICE_DYING, &mdev->flags);
1695         clear_bit(CONFIG_PENDING, &mdev->flags);
1696         wake_up(&mdev->state_wait);
1697
1698         return 0;
1699 }