drbd: properly use max_hw_sectors to limit the our bio size
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42
43
44
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60
61
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73         struct drbd_md_io *md_io;
74
75         md_io = (struct drbd_md_io *)bio->bi_private;
76         md_io->error = error;
77
78         complete(&md_io->event);
79 }
80
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86         unsigned long flags = 0;
87         struct drbd_conf *mdev = e->mdev;
88
89         D_ASSERT(e->block_id != ID_VACANT);
90
91         spin_lock_irqsave(&mdev->req_lock, flags);
92         mdev->read_cnt += e->size >> 9;
93         list_del(&e->w.list);
94         if (list_empty(&mdev->read_ee))
95                 wake_up(&mdev->ee_wait);
96         if (test_bit(__EE_WAS_ERROR, &e->flags))
97                 __drbd_chk_io_error(mdev, FALSE);
98         spin_unlock_irqrestore(&mdev->req_lock, flags);
99
100         drbd_queue_work(&mdev->data.work, &e->w);
101         put_ldev(mdev);
102 }
103
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108         unsigned long flags = 0;
109         struct drbd_conf *mdev = e->mdev;
110         sector_t e_sector;
111         int do_wake;
112         int is_syncer_req;
113         int do_al_complete_io;
114
115         D_ASSERT(e->block_id != ID_VACANT);
116
117         /* after we moved e to done_ee,
118          * we may no longer access it,
119          * it may be freed/reused already!
120          * (as soon as we release the req_lock) */
121         e_sector = e->sector;
122         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123         is_syncer_req = is_syncer_block_id(e->block_id);
124
125         spin_lock_irqsave(&mdev->req_lock, flags);
126         mdev->writ_cnt += e->size >> 9;
127         list_del(&e->w.list); /* has been on active_ee or sync_ee */
128         list_add_tail(&e->w.list, &mdev->done_ee);
129
130         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131          * neither did we wake possibly waiting conflicting requests.
132          * done from "drbd_process_done_ee" within the appropriate w.cb
133          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134
135         do_wake = is_syncer_req
136                 ? list_empty(&mdev->sync_ee)
137                 : list_empty(&mdev->active_ee);
138
139         if (test_bit(__EE_WAS_ERROR, &e->flags))
140                 __drbd_chk_io_error(mdev, FALSE);
141         spin_unlock_irqrestore(&mdev->req_lock, flags);
142
143         if (is_syncer_req)
144                 drbd_rs_complete_io(mdev, e_sector);
145
146         if (do_wake)
147                 wake_up(&mdev->ee_wait);
148
149         if (do_al_complete_io)
150                 drbd_al_complete_io(mdev, e_sector);
151
152         wake_asender(mdev);
153         put_ldev(mdev);
154 }
155
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161         struct drbd_epoch_entry *e = bio->bi_private;
162         struct drbd_conf *mdev = e->mdev;
163         int uptodate = bio_flagged(bio, BIO_UPTODATE);
164         int is_write = bio_data_dir(bio) == WRITE;
165
166         if (error)
167                 dev_warn(DEV, "%s: error=%d s=%llus\n",
168                                 is_write ? "write" : "read", error,
169                                 (unsigned long long)e->sector);
170         if (!error && !uptodate) {
171                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172                                 is_write ? "write" : "read",
173                                 (unsigned long long)e->sector);
174                 /* strange behavior of some lower level drivers...
175                  * fail the request by clearing the uptodate flag,
176                  * but do not return any error?! */
177                 error = -EIO;
178         }
179
180         if (error)
181                 set_bit(__EE_WAS_ERROR, &e->flags);
182
183         bio_put(bio); /* no need for the bio anymore */
184         if (atomic_dec_and_test(&e->pending_bios)) {
185                 if (is_write)
186                         drbd_endio_write_sec_final(e);
187                 else
188                         drbd_endio_read_sec_final(e);
189         }
190 }
191
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196         unsigned long flags;
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         struct bio_and_error m;
200         enum drbd_req_event what;
201         int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203         if (!error && !uptodate) {
204                 dev_warn(DEV, "p %s: setting error to -EIO\n",
205                          bio_data_dir(bio) == WRITE ? "write" : "read");
206                 /* strange behavior of some lower level drivers...
207                  * fail the request by clearing the uptodate flag,
208                  * but do not return any error?! */
209                 error = -EIO;
210         }
211
212         /* to avoid recursion in __req_mod */
213         if (unlikely(error)) {
214                 what = (bio_data_dir(bio) == WRITE)
215                         ? write_completed_with_error
216                         : (bio_rw(bio) == READ)
217                           ? read_completed_with_error
218                           : read_ahead_completed_with_error;
219         } else
220                 what = completed_ok;
221
222         bio_put(req->private_bio);
223         req->private_bio = ERR_PTR(error);
224
225         /* not req_mod(), we need irqsave here! */
226         spin_lock_irqsave(&mdev->req_lock, flags);
227         __req_mod(req, what, &m);
228         spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230         if (m.bio)
231                 complete_master_bio(mdev, &m);
232 }
233
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236         struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238         /* We should not detach for read io-error,
239          * but try to WRITE the P_DATA_REPLY to the failed location,
240          * to give the disk the chance to relocate that block */
241
242         spin_lock_irq(&mdev->req_lock);
243         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244                 _req_mod(req, read_retry_remote_canceled);
245                 spin_unlock_irq(&mdev->req_lock);
246                 return 1;
247         }
248         spin_unlock_irq(&mdev->req_lock);
249
250         return w_send_read_req(mdev, w, 0);
251 }
252
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255         ERR_IF(cancel) return 1;
256         dev_err(DEV, "resync inactive, but callback triggered??\n");
257         return 1; /* Simply ignore this! */
258 }
259
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262         struct hash_desc desc;
263         struct scatterlist sg;
264         struct page *page = e->pages;
265         struct page *tmp;
266         unsigned len;
267
268         desc.tfm = tfm;
269         desc.flags = 0;
270
271         sg_init_table(&sg, 1);
272         crypto_hash_init(&desc);
273
274         while ((tmp = page_chain_next(page))) {
275                 /* all but the last page will be fully used */
276                 sg_set_page(&sg, page, PAGE_SIZE, 0);
277                 crypto_hash_update(&desc, &sg, sg.length);
278                 page = tmp;
279         }
280         /* and now the last, possibly only partially used page */
281         len = e->size & (PAGE_SIZE - 1);
282         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283         crypto_hash_update(&desc, &sg, sg.length);
284         crypto_hash_final(&desc, digest);
285 }
286
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289         struct hash_desc desc;
290         struct scatterlist sg;
291         struct bio_vec *bvec;
292         int i;
293
294         desc.tfm = tfm;
295         desc.flags = 0;
296
297         sg_init_table(&sg, 1);
298         crypto_hash_init(&desc);
299
300         __bio_for_each_segment(bvec, bio, i, 0) {
301                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302                 crypto_hash_update(&desc, &sg, sg.length);
303         }
304         crypto_hash_final(&desc, digest);
305 }
306
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310         int digest_size;
311         void *digest;
312         int ok;
313
314         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315
316         if (unlikely(cancel)) {
317                 drbd_free_ee(mdev, e);
318                 return 1;
319         }
320
321         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323                 digest = kmalloc(digest_size, GFP_NOIO);
324                 if (digest) {
325                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326
327                         inc_rs_pending(mdev);
328                         ok = drbd_send_drequest_csum(mdev,
329                                                      e->sector,
330                                                      e->size,
331                                                      digest,
332                                                      digest_size,
333                                                      P_CSUM_RS_REQUEST);
334                         kfree(digest);
335                 } else {
336                         dev_err(DEV, "kmalloc() of digest failed.\n");
337                         ok = 0;
338                 }
339         } else
340                 ok = 1;
341
342         drbd_free_ee(mdev, e);
343
344         if (unlikely(!ok))
345                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346         return ok;
347 }
348
349 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
350
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353         struct drbd_epoch_entry *e;
354
355         if (!get_ldev(mdev))
356                 return -EIO;
357
358         if (drbd_rs_should_slow_down(mdev))
359                 goto defer;
360
361         /* GFP_TRY, because if there is no memory available right now, this may
362          * be rescheduled for later. It is "only" background resync, after all. */
363         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364         if (!e)
365                 goto defer;
366
367         e->w.cb = w_e_send_csum;
368         spin_lock_irq(&mdev->req_lock);
369         list_add(&e->w.list, &mdev->read_ee);
370         spin_unlock_irq(&mdev->req_lock);
371
372         atomic_add(size >> 9, &mdev->rs_sect_ev);
373         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374                 return 0;
375
376         /* drbd_submit_ee currently fails for one reason only:
377          * not being able to allocate enough bios.
378          * Is dropping the connection going to help? */
379         spin_lock_irq(&mdev->req_lock);
380         list_del(&e->w.list);
381         spin_unlock_irq(&mdev->req_lock);
382
383         drbd_free_ee(mdev, e);
384 defer:
385         put_ldev(mdev);
386         return -EAGAIN;
387 }
388
389 void resync_timer_fn(unsigned long data)
390 {
391         struct drbd_conf *mdev = (struct drbd_conf *) data;
392         int queue;
393
394         queue = 1;
395         switch (mdev->state.conn) {
396         case C_VERIFY_S:
397                 mdev->resync_work.cb = w_make_ov_request;
398                 break;
399         case C_SYNC_TARGET:
400                 mdev->resync_work.cb = w_make_resync_request;
401                 break;
402         default:
403                 queue = 0;
404                 mdev->resync_work.cb = w_resync_inactive;
405         }
406
407         /* harmless race: list_empty outside data.work.q_lock */
408         if (list_empty(&mdev->resync_work.list) && queue)
409                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414         int i;
415
416         for (i = 0; i < fb->size; i++)
417                 fb->values[i] = value;
418 }
419
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422         int ov;
423
424         ov = fb->values[fb->head_index];
425         fb->values[fb->head_index++] = value;
426
427         if (fb->head_index >= fb->size)
428                 fb->head_index = 0;
429
430         return ov;
431 }
432
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435         int i;
436
437         for (i = 0; i < fb->size; i++)
438                 fb->values[i] += value;
439 }
440
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444         unsigned int want;     /* The number of sectors we want in the proxy */
445         int req_sect; /* Number of sectors to request in this turn */
446         int correction; /* Number of sectors more we need in the proxy*/
447         int cps; /* correction per invocation of drbd_rs_controller() */
448         int steps; /* Number of time steps to plan ahead */
449         int curr_corr;
450         int max_sect;
451
452         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453         mdev->rs_in_flight -= sect_in;
454
455         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456
457         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458
459         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461         } else { /* normal path */
462                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464         }
465
466         correction = want - mdev->rs_in_flight - mdev->rs_planed;
467
468         /* Plan ahead */
469         cps = correction / steps;
470         fifo_add_val(&mdev->rs_plan_s, cps);
471         mdev->rs_planed += cps * steps;
472
473         /* What we do in this step */
474         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475         spin_unlock(&mdev->peer_seq_lock);
476         mdev->rs_planed -= curr_corr;
477
478         req_sect = sect_in + curr_corr;
479         if (req_sect < 0)
480                 req_sect = 0;
481
482         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483         if (req_sect > max_sect)
484                 req_sect = max_sect;
485
486         /*
487         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488                  sect_in, mdev->rs_in_flight, want, correction,
489                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
490         */
491
492         return req_sect;
493 }
494
495 int drbd_rs_number_requests(struct drbd_conf *mdev)
496 {
497         int number;
498         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
499                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
500                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
501         } else {
502                 mdev->c_sync_rate = mdev->sync_conf.rate;
503                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
504         }
505
506         /* Throttle resync on lower level disk activity, which may also be
507          * caused by application IO on Primary/SyncTarget.
508          * Keep this after the call to drbd_rs_controller, as that assumes
509          * to be called as precisely as possible every SLEEP_TIME,
510          * and would be confused otherwise. */
511         if (number && drbd_rs_should_slow_down(mdev)) {
512                 mdev->c_sync_rate = 1;
513                 number = 0;
514         }
515
516         /* ignore the amount of pending requests, the resync controller should
517          * throttle down to incoming reply rate soon enough anyways. */
518         return number;
519 }
520
521 int w_make_resync_request(struct drbd_conf *mdev,
522                 struct drbd_work *w, int cancel)
523 {
524         unsigned long bit;
525         sector_t sector;
526         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
527         int max_bio_size;
528         int number, rollback_i, size;
529         int align, queued, sndbuf;
530         int i = 0;
531
532         if (unlikely(cancel))
533                 return 1;
534
535         if (unlikely(mdev->state.conn < C_CONNECTED)) {
536                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
537                 return 0;
538         }
539
540         if (mdev->state.conn != C_SYNC_TARGET)
541                 dev_err(DEV, "%s in w_make_resync_request\n",
542                         drbd_conn_str(mdev->state.conn));
543
544         if (mdev->rs_total == 0) {
545                 /* empty resync? */
546                 drbd_resync_finished(mdev);
547                 return 1;
548         }
549
550         if (!get_ldev(mdev)) {
551                 /* Since we only need to access mdev->rsync a
552                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
553                    to continue resync with a broken disk makes no sense at
554                    all */
555                 dev_err(DEV, "Disk broke down during resync!\n");
556                 mdev->resync_work.cb = w_resync_inactive;
557                 return 1;
558         }
559
560         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
561          * if it should be necessary */
562         max_bio_size =
563                 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
564                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
565
566         number = drbd_rs_number_requests(mdev);
567         if (number == 0)
568                 goto requeue;
569
570         for (i = 0; i < number; i++) {
571                 /* Stop generating RS requests, when half of the send buffer is filled */
572                 mutex_lock(&mdev->data.mutex);
573                 if (mdev->data.socket) {
574                         queued = mdev->data.socket->sk->sk_wmem_queued;
575                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
576                 } else {
577                         queued = 1;
578                         sndbuf = 0;
579                 }
580                 mutex_unlock(&mdev->data.mutex);
581                 if (queued > sndbuf / 2)
582                         goto requeue;
583
584 next_sector:
585                 size = BM_BLOCK_SIZE;
586                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
587
588                 if (bit == -1UL) {
589                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
590                         mdev->resync_work.cb = w_resync_inactive;
591                         put_ldev(mdev);
592                         return 1;
593                 }
594
595                 sector = BM_BIT_TO_SECT(bit);
596
597                 if (drbd_try_rs_begin_io(mdev, sector)) {
598                         mdev->bm_resync_fo = bit;
599                         goto requeue;
600                 }
601                 mdev->bm_resync_fo = bit + 1;
602
603                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
604                         drbd_rs_complete_io(mdev, sector);
605                         goto next_sector;
606                 }
607
608 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
609                 /* try to find some adjacent bits.
610                  * we stop if we have already the maximum req size.
611                  *
612                  * Additionally always align bigger requests, in order to
613                  * be prepared for all stripe sizes of software RAIDs.
614                  */
615                 align = 1;
616                 rollback_i = i;
617                 for (;;) {
618                         if (size + BM_BLOCK_SIZE > max_bio_size)
619                                 break;
620
621                         /* Be always aligned */
622                         if (sector & ((1<<(align+3))-1))
623                                 break;
624
625                         /* do not cross extent boundaries */
626                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
627                                 break;
628                         /* now, is it actually dirty, after all?
629                          * caution, drbd_bm_test_bit is tri-state for some
630                          * obscure reason; ( b == 0 ) would get the out-of-band
631                          * only accidentally right because of the "oddly sized"
632                          * adjustment below */
633                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
634                                 break;
635                         bit++;
636                         size += BM_BLOCK_SIZE;
637                         if ((BM_BLOCK_SIZE << align) <= size)
638                                 align++;
639                         i++;
640                 }
641                 /* if we merged some,
642                  * reset the offset to start the next drbd_bm_find_next from */
643                 if (size > BM_BLOCK_SIZE)
644                         mdev->bm_resync_fo = bit + 1;
645 #endif
646
647                 /* adjust very last sectors, in case we are oddly sized */
648                 if (sector + (size>>9) > capacity)
649                         size = (capacity-sector)<<9;
650                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
651                         switch (read_for_csum(mdev, sector, size)) {
652                         case -EIO: /* Disk failure */
653                                 put_ldev(mdev);
654                                 return 0;
655                         case -EAGAIN: /* allocation failed, or ldev busy */
656                                 drbd_rs_complete_io(mdev, sector);
657                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
658                                 i = rollback_i;
659                                 goto requeue;
660                         case 0:
661                                 /* everything ok */
662                                 break;
663                         default:
664                                 BUG();
665                         }
666                 } else {
667                         inc_rs_pending(mdev);
668                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669                                                sector, size, ID_SYNCER)) {
670                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
671                                 dec_rs_pending(mdev);
672                                 put_ldev(mdev);
673                                 return 0;
674                         }
675                 }
676         }
677
678         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
679                 /* last syncer _request_ was sent,
680                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
681                  * next sync group will resume), as soon as we receive the last
682                  * resync data block, and the last bit is cleared.
683                  * until then resync "work" is "inactive" ...
684                  */
685                 mdev->resync_work.cb = w_resync_inactive;
686                 put_ldev(mdev);
687                 return 1;
688         }
689
690  requeue:
691         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693         put_ldev(mdev);
694         return 1;
695 }
696
697 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699         int number, i, size;
700         sector_t sector;
701         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
702
703         if (unlikely(cancel))
704                 return 1;
705
706         if (unlikely(mdev->state.conn < C_CONNECTED)) {
707                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
708                 return 0;
709         }
710
711         number = drbd_rs_number_requests(mdev);
712
713         sector = mdev->ov_position;
714         for (i = 0; i < number; i++) {
715                 if (sector >= capacity) {
716                         mdev->resync_work.cb = w_resync_inactive;
717                         return 1;
718                 }
719
720                 size = BM_BLOCK_SIZE;
721
722                 if (drbd_try_rs_begin_io(mdev, sector)) {
723                         mdev->ov_position = sector;
724                         goto requeue;
725                 }
726
727                 if (sector + (size>>9) > capacity)
728                         size = (capacity-sector)<<9;
729
730                 inc_rs_pending(mdev);
731                 if (!drbd_send_ov_request(mdev, sector, size)) {
732                         dec_rs_pending(mdev);
733                         return 0;
734                 }
735                 sector += BM_SECT_PER_BIT;
736         }
737         mdev->ov_position = sector;
738
739  requeue:
740         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
741         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
742         return 1;
743 }
744
745 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
746 {
747         kfree(w);
748         ov_oos_print(mdev);
749         drbd_resync_finished(mdev);
750
751         return 1;
752 }
753
754 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756         kfree(w);
757
758         drbd_resync_finished(mdev);
759
760         return 1;
761 }
762
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765         clear_bit(GOT_PING_ACK, &mdev->flags);
766         request_ping(mdev);
767         wait_event(mdev->misc_wait,
768                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
769 }
770
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773         unsigned long db, dt, dbdt;
774         unsigned long n_oos;
775         union drbd_state os, ns;
776         struct drbd_work *w;
777         char *khelper_cmd = NULL;
778         int verify_done = 0;
779
780         /* Remove all elements from the resync LRU. Since future actions
781          * might set bits in the (main) bitmap, then the entries in the
782          * resync LRU would be wrong. */
783         if (drbd_rs_del_all(mdev)) {
784                 /* In case this is not possible now, most probably because
785                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
786                  * queue (or even the read operations for those packets
787                  * is not finished by now).   Retry in 100ms. */
788
789                 __set_current_state(TASK_INTERRUPTIBLE);
790                 schedule_timeout(HZ / 10);
791                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792                 if (w) {
793                         w->cb = w_resync_finished;
794                         drbd_queue_work(&mdev->data.work, w);
795                         return 1;
796                 }
797                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798         }
799
800         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801         if (dt <= 0)
802                 dt = 1;
803         db = mdev->rs_total;
804         dbdt = Bit2KB(db/dt);
805         mdev->rs_paused /= HZ;
806
807         if (!get_ldev(mdev))
808                 goto out;
809
810         ping_peer(mdev);
811
812         spin_lock_irq(&mdev->req_lock);
813         os = mdev->state;
814
815         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
816
817         /* This protects us against multiple calls (that can happen in the presence
818            of application IO), and against connectivity loss just before we arrive here. */
819         if (os.conn <= C_CONNECTED)
820                 goto out_unlock;
821
822         ns = os;
823         ns.conn = C_CONNECTED;
824
825         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
826              verify_done ? "Online verify " : "Resync",
827              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
828
829         n_oos = drbd_bm_total_weight(mdev);
830
831         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
832                 if (n_oos) {
833                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
834                               n_oos, Bit2KB(1));
835                         khelper_cmd = "out-of-sync";
836                 }
837         } else {
838                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
839
840                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
841                         khelper_cmd = "after-resync-target";
842
843                 if (mdev->csums_tfm && mdev->rs_total) {
844                         const unsigned long s = mdev->rs_same_csum;
845                         const unsigned long t = mdev->rs_total;
846                         const int ratio =
847                                 (t == 0)     ? 0 :
848                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
849                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
850                              "transferred %luK total %luK\n",
851                              ratio,
852                              Bit2KB(mdev->rs_same_csum),
853                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
854                              Bit2KB(mdev->rs_total));
855                 }
856         }
857
858         if (mdev->rs_failed) {
859                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
860
861                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862                         ns.disk = D_INCONSISTENT;
863                         ns.pdsk = D_UP_TO_DATE;
864                 } else {
865                         ns.disk = D_UP_TO_DATE;
866                         ns.pdsk = D_INCONSISTENT;
867                 }
868         } else {
869                 ns.disk = D_UP_TO_DATE;
870                 ns.pdsk = D_UP_TO_DATE;
871
872                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
873                         if (mdev->p_uuid) {
874                                 int i;
875                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
876                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
877                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
878                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
879                         } else {
880                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
881                         }
882                 }
883
884                 drbd_uuid_set_bm(mdev, 0UL);
885
886                 if (mdev->p_uuid) {
887                         /* Now the two UUID sets are equal, update what we
888                          * know of the peer. */
889                         int i;
890                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
891                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
892                 }
893         }
894
895         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
896 out_unlock:
897         spin_unlock_irq(&mdev->req_lock);
898         put_ldev(mdev);
899 out:
900         mdev->rs_total  = 0;
901         mdev->rs_failed = 0;
902         mdev->rs_paused = 0;
903         if (verify_done)
904                 mdev->ov_start_sector = 0;
905
906         drbd_md_sync(mdev);
907
908         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
909                 dev_info(DEV, "Writing the whole bitmap\n");
910                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
911         }
912
913         if (khelper_cmd)
914                 drbd_khelper(mdev, khelper_cmd);
915
916         return 1;
917 }
918
919 /* helper */
920 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
921 {
922         if (drbd_ee_has_active_page(e)) {
923                 /* This might happen if sendpage() has not finished */
924                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
925                 atomic_add(i, &mdev->pp_in_use_by_net);
926                 atomic_sub(i, &mdev->pp_in_use);
927                 spin_lock_irq(&mdev->req_lock);
928                 list_add_tail(&e->w.list, &mdev->net_ee);
929                 spin_unlock_irq(&mdev->req_lock);
930                 wake_up(&drbd_pp_wait);
931         } else
932                 drbd_free_ee(mdev, e);
933 }
934
935 /**
936  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
937  * @mdev:       DRBD device.
938  * @w:          work object.
939  * @cancel:     The connection will be closed anyways
940  */
941 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
942 {
943         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
944         int ok;
945
946         if (unlikely(cancel)) {
947                 drbd_free_ee(mdev, e);
948                 dec_unacked(mdev);
949                 return 1;
950         }
951
952         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
953                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
954         } else {
955                 if (__ratelimit(&drbd_ratelimit_state))
956                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
957                             (unsigned long long)e->sector);
958
959                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
960         }
961
962         dec_unacked(mdev);
963
964         move_to_net_ee_or_free(mdev, e);
965
966         if (unlikely(!ok))
967                 dev_err(DEV, "drbd_send_block() failed\n");
968         return ok;
969 }
970
971 /**
972  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
973  * @mdev:       DRBD device.
974  * @w:          work object.
975  * @cancel:     The connection will be closed anyways
976  */
977 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
978 {
979         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
980         int ok;
981
982         if (unlikely(cancel)) {
983                 drbd_free_ee(mdev, e);
984                 dec_unacked(mdev);
985                 return 1;
986         }
987
988         if (get_ldev_if_state(mdev, D_FAILED)) {
989                 drbd_rs_complete_io(mdev, e->sector);
990                 put_ldev(mdev);
991         }
992
993         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
994                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
995                         inc_rs_pending(mdev);
996                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
997                 } else {
998                         if (__ratelimit(&drbd_ratelimit_state))
999                                 dev_err(DEV, "Not sending RSDataReply, "
1000                                     "partner DISKLESS!\n");
1001                         ok = 1;
1002                 }
1003         } else {
1004                 if (__ratelimit(&drbd_ratelimit_state))
1005                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1006                             (unsigned long long)e->sector);
1007
1008                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1009
1010                 /* update resync data with failure */
1011                 drbd_rs_failed_io(mdev, e->sector, e->size);
1012         }
1013
1014         dec_unacked(mdev);
1015
1016         move_to_net_ee_or_free(mdev, e);
1017
1018         if (unlikely(!ok))
1019                 dev_err(DEV, "drbd_send_block() failed\n");
1020         return ok;
1021 }
1022
1023 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1024 {
1025         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1026         struct digest_info *di;
1027         int digest_size;
1028         void *digest = NULL;
1029         int ok, eq = 0;
1030
1031         if (unlikely(cancel)) {
1032                 drbd_free_ee(mdev, e);
1033                 dec_unacked(mdev);
1034                 return 1;
1035         }
1036
1037         if (get_ldev(mdev)) {
1038                 drbd_rs_complete_io(mdev, e->sector);
1039                 put_ldev(mdev);
1040         }
1041
1042         di = e->digest;
1043
1044         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1045                 /* quick hack to try to avoid a race against reconfiguration.
1046                  * a real fix would be much more involved,
1047                  * introducing more locking mechanisms */
1048                 if (mdev->csums_tfm) {
1049                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1050                         D_ASSERT(digest_size == di->digest_size);
1051                         digest = kmalloc(digest_size, GFP_NOIO);
1052                 }
1053                 if (digest) {
1054                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1055                         eq = !memcmp(digest, di->digest, digest_size);
1056                         kfree(digest);
1057                 }
1058
1059                 if (eq) {
1060                         drbd_set_in_sync(mdev, e->sector, e->size);
1061                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1062                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1063                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1064                 } else {
1065                         inc_rs_pending(mdev);
1066                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1067                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1068                         kfree(di);
1069                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1070                 }
1071         } else {
1072                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1073                 if (__ratelimit(&drbd_ratelimit_state))
1074                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1075         }
1076
1077         dec_unacked(mdev);
1078         move_to_net_ee_or_free(mdev, e);
1079
1080         if (unlikely(!ok))
1081                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1082         return ok;
1083 }
1084
1085 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088         int digest_size;
1089         void *digest;
1090         int ok = 1;
1091
1092         if (unlikely(cancel))
1093                 goto out;
1094
1095         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1096                 goto out;
1097
1098         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1099         /* FIXME if this allocation fails, online verify will not terminate! */
1100         digest = kmalloc(digest_size, GFP_NOIO);
1101         if (digest) {
1102                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1103                 inc_rs_pending(mdev);
1104                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1105                                              digest, digest_size, P_OV_REPLY);
1106                 if (!ok)
1107                         dec_rs_pending(mdev);
1108                 kfree(digest);
1109         }
1110
1111 out:
1112         drbd_free_ee(mdev, e);
1113
1114         dec_unacked(mdev);
1115
1116         return ok;
1117 }
1118
1119 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1120 {
1121         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1122                 mdev->ov_last_oos_size += size>>9;
1123         } else {
1124                 mdev->ov_last_oos_start = sector;
1125                 mdev->ov_last_oos_size = size>>9;
1126         }
1127         drbd_set_out_of_sync(mdev, sector, size);
1128         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1129 }
1130
1131 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1132 {
1133         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1134         struct digest_info *di;
1135         int digest_size;
1136         void *digest;
1137         int ok, eq = 0;
1138
1139         if (unlikely(cancel)) {
1140                 drbd_free_ee(mdev, e);
1141                 dec_unacked(mdev);
1142                 return 1;
1143         }
1144
1145         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1146          * the resync lru has been cleaned up already */
1147         if (get_ldev(mdev)) {
1148                 drbd_rs_complete_io(mdev, e->sector);
1149                 put_ldev(mdev);
1150         }
1151
1152         di = e->digest;
1153
1154         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1155                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1156                 digest = kmalloc(digest_size, GFP_NOIO);
1157                 if (digest) {
1158                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1159
1160                         D_ASSERT(digest_size == di->digest_size);
1161                         eq = !memcmp(digest, di->digest, digest_size);
1162                         kfree(digest);
1163                 }
1164         } else {
1165                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1166                 if (__ratelimit(&drbd_ratelimit_state))
1167                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1168         }
1169
1170         dec_unacked(mdev);
1171         if (!eq)
1172                 drbd_ov_oos_found(mdev, e->sector, e->size);
1173         else
1174                 ov_oos_print(mdev);
1175
1176         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1177                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1178
1179         drbd_free_ee(mdev, e);
1180
1181         --mdev->ov_left;
1182
1183         /* let's advance progress step marks only for every other megabyte */
1184         if ((mdev->ov_left & 0x200) == 0x200)
1185                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1186
1187         if (mdev->ov_left == 0) {
1188                 ov_oos_print(mdev);
1189                 drbd_resync_finished(mdev);
1190         }
1191
1192         return ok;
1193 }
1194
1195 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1196 {
1197         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1198         complete(&b->done);
1199         return 1;
1200 }
1201
1202 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1203 {
1204         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1205         struct p_barrier *p = &mdev->data.sbuf.barrier;
1206         int ok = 1;
1207
1208         /* really avoid racing with tl_clear.  w.cb may have been referenced
1209          * just before it was reassigned and re-queued, so double check that.
1210          * actually, this race was harmless, since we only try to send the
1211          * barrier packet here, and otherwise do nothing with the object.
1212          * but compare with the head of w_clear_epoch */
1213         spin_lock_irq(&mdev->req_lock);
1214         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1215                 cancel = 1;
1216         spin_unlock_irq(&mdev->req_lock);
1217         if (cancel)
1218                 return 1;
1219
1220         if (!drbd_get_data_sock(mdev))
1221                 return 0;
1222         p->barrier = b->br_number;
1223         /* inc_ap_pending was done where this was queued.
1224          * dec_ap_pending will be done in got_BarrierAck
1225          * or (on connection loss) in w_clear_epoch.  */
1226         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1227                                 (struct p_header80 *)p, sizeof(*p), 0);
1228         drbd_put_data_sock(mdev);
1229
1230         return ok;
1231 }
1232
1233 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1234 {
1235         if (cancel)
1236                 return 1;
1237         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1238 }
1239
1240 /**
1241  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1242  * @mdev:       DRBD device.
1243  * @w:          work object.
1244  * @cancel:     The connection will be closed anyways
1245  */
1246 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247 {
1248         struct drbd_request *req = container_of(w, struct drbd_request, w);
1249         int ok;
1250
1251         if (unlikely(cancel)) {
1252                 req_mod(req, send_canceled);
1253                 return 1;
1254         }
1255
1256         ok = drbd_send_dblock(mdev, req);
1257         req_mod(req, ok ? handed_over_to_network : send_failed);
1258
1259         return ok;
1260 }
1261
1262 /**
1263  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1264  * @mdev:       DRBD device.
1265  * @w:          work object.
1266  * @cancel:     The connection will be closed anyways
1267  */
1268 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1269 {
1270         struct drbd_request *req = container_of(w, struct drbd_request, w);
1271         int ok;
1272
1273         if (unlikely(cancel)) {
1274                 req_mod(req, send_canceled);
1275                 return 1;
1276         }
1277
1278         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1279                                 (unsigned long)req);
1280
1281         if (!ok) {
1282                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1283                  * so this is probably redundant */
1284                 if (mdev->state.conn >= C_CONNECTED)
1285                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1286         }
1287         req_mod(req, ok ? handed_over_to_network : send_failed);
1288
1289         return ok;
1290 }
1291
1292 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1293 {
1294         struct drbd_request *req = container_of(w, struct drbd_request, w);
1295
1296         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1297                 drbd_al_begin_io(mdev, req->sector);
1298         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1299            theoretically. Practically it can not deadlock, since this is
1300            only used when unfreezing IOs. All the extents of the requests
1301            that made it into the TL are already active */
1302
1303         drbd_req_make_private_bio(req, req->master_bio);
1304         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1305         generic_make_request(req->private_bio);
1306
1307         return 1;
1308 }
1309
1310 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1311 {
1312         struct drbd_conf *odev = mdev;
1313
1314         while (1) {
1315                 if (odev->sync_conf.after == -1)
1316                         return 1;
1317                 odev = minor_to_mdev(odev->sync_conf.after);
1318                 ERR_IF(!odev) return 1;
1319                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1320                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1321                     odev->state.aftr_isp || odev->state.peer_isp ||
1322                     odev->state.user_isp)
1323                         return 0;
1324         }
1325 }
1326
1327 /**
1328  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1329  * @mdev:       DRBD device.
1330  *
1331  * Called from process context only (admin command and after_state_ch).
1332  */
1333 static int _drbd_pause_after(struct drbd_conf *mdev)
1334 {
1335         struct drbd_conf *odev;
1336         int i, rv = 0;
1337
1338         for (i = 0; i < minor_count; i++) {
1339                 odev = minor_to_mdev(i);
1340                 if (!odev)
1341                         continue;
1342                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1343                         continue;
1344                 if (!_drbd_may_sync_now(odev))
1345                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1346                                != SS_NOTHING_TO_DO);
1347         }
1348
1349         return rv;
1350 }
1351
1352 /**
1353  * _drbd_resume_next() - Resume resync on all devices that may resync now
1354  * @mdev:       DRBD device.
1355  *
1356  * Called from process context only (admin command and worker).
1357  */
1358 static int _drbd_resume_next(struct drbd_conf *mdev)
1359 {
1360         struct drbd_conf *odev;
1361         int i, rv = 0;
1362
1363         for (i = 0; i < minor_count; i++) {
1364                 odev = minor_to_mdev(i);
1365                 if (!odev)
1366                         continue;
1367                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1368                         continue;
1369                 if (odev->state.aftr_isp) {
1370                         if (_drbd_may_sync_now(odev))
1371                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1372                                                         CS_HARD, NULL)
1373                                        != SS_NOTHING_TO_DO) ;
1374                 }
1375         }
1376         return rv;
1377 }
1378
1379 void resume_next_sg(struct drbd_conf *mdev)
1380 {
1381         write_lock_irq(&global_state_lock);
1382         _drbd_resume_next(mdev);
1383         write_unlock_irq(&global_state_lock);
1384 }
1385
1386 void suspend_other_sg(struct drbd_conf *mdev)
1387 {
1388         write_lock_irq(&global_state_lock);
1389         _drbd_pause_after(mdev);
1390         write_unlock_irq(&global_state_lock);
1391 }
1392
1393 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1394 {
1395         struct drbd_conf *odev;
1396
1397         if (o_minor == -1)
1398                 return NO_ERROR;
1399         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1400                 return ERR_SYNC_AFTER;
1401
1402         /* check for loops */
1403         odev = minor_to_mdev(o_minor);
1404         while (1) {
1405                 if (odev == mdev)
1406                         return ERR_SYNC_AFTER_CYCLE;
1407
1408                 /* dependency chain ends here, no cycles. */
1409                 if (odev->sync_conf.after == -1)
1410                         return NO_ERROR;
1411
1412                 /* follow the dependency chain */
1413                 odev = minor_to_mdev(odev->sync_conf.after);
1414         }
1415 }
1416
1417 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1418 {
1419         int changes;
1420         int retcode;
1421
1422         write_lock_irq(&global_state_lock);
1423         retcode = sync_after_error(mdev, na);
1424         if (retcode == NO_ERROR) {
1425                 mdev->sync_conf.after = na;
1426                 do {
1427                         changes  = _drbd_pause_after(mdev);
1428                         changes |= _drbd_resume_next(mdev);
1429                 } while (changes);
1430         }
1431         write_unlock_irq(&global_state_lock);
1432         return retcode;
1433 }
1434
1435 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1436 {
1437         atomic_set(&mdev->rs_sect_in, 0);
1438         atomic_set(&mdev->rs_sect_ev, 0);
1439         mdev->rs_in_flight = 0;
1440         mdev->rs_planed = 0;
1441         spin_lock(&mdev->peer_seq_lock);
1442         fifo_set(&mdev->rs_plan_s, 0);
1443         spin_unlock(&mdev->peer_seq_lock);
1444 }
1445
1446 /**
1447  * drbd_start_resync() - Start the resync process
1448  * @mdev:       DRBD device.
1449  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1450  *
1451  * This function might bring you directly into one of the
1452  * C_PAUSED_SYNC_* states.
1453  */
1454 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1455 {
1456         union drbd_state ns;
1457         int r;
1458
1459         if (mdev->state.conn >= C_SYNC_SOURCE) {
1460                 dev_err(DEV, "Resync already running!\n");
1461                 return;
1462         }
1463
1464         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1465         drbd_rs_cancel_all(mdev);
1466
1467         if (side == C_SYNC_TARGET) {
1468                 /* Since application IO was locked out during C_WF_BITMAP_T and
1469                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1470                    we check that we might make the data inconsistent. */
1471                 r = drbd_khelper(mdev, "before-resync-target");
1472                 r = (r >> 8) & 0xff;
1473                 if (r > 0) {
1474                         dev_info(DEV, "before-resync-target handler returned %d, "
1475                              "dropping connection.\n", r);
1476                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1477                         return;
1478                 }
1479         }
1480
1481         drbd_state_lock(mdev);
1482
1483         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1484                 drbd_state_unlock(mdev);
1485                 return;
1486         }
1487
1488         if (side == C_SYNC_TARGET) {
1489                 mdev->bm_resync_fo = 0;
1490         } else /* side == C_SYNC_SOURCE */ {
1491                 u64 uuid;
1492
1493                 get_random_bytes(&uuid, sizeof(u64));
1494                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1495                 drbd_send_sync_uuid(mdev, uuid);
1496
1497                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1498         }
1499
1500         write_lock_irq(&global_state_lock);
1501         ns = mdev->state;
1502
1503         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1504
1505         ns.conn = side;
1506
1507         if (side == C_SYNC_TARGET)
1508                 ns.disk = D_INCONSISTENT;
1509         else /* side == C_SYNC_SOURCE */
1510                 ns.pdsk = D_INCONSISTENT;
1511
1512         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1513         ns = mdev->state;
1514
1515         if (ns.conn < C_CONNECTED)
1516                 r = SS_UNKNOWN_ERROR;
1517
1518         if (r == SS_SUCCESS) {
1519                 unsigned long tw = drbd_bm_total_weight(mdev);
1520                 unsigned long now = jiffies;
1521                 int i;
1522
1523                 mdev->rs_failed    = 0;
1524                 mdev->rs_paused    = 0;
1525                 mdev->rs_same_csum = 0;
1526                 mdev->rs_last_events = 0;
1527                 mdev->rs_last_sect_ev = 0;
1528                 mdev->rs_total     = tw;
1529                 mdev->rs_start     = now;
1530                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1531                         mdev->rs_mark_left[i] = tw;
1532                         mdev->rs_mark_time[i] = now;
1533                 }
1534                 _drbd_pause_after(mdev);
1535         }
1536         write_unlock_irq(&global_state_lock);
1537         put_ldev(mdev);
1538
1539         if (r == SS_SUCCESS) {
1540                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1541                      drbd_conn_str(ns.conn),
1542                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1543                      (unsigned long) mdev->rs_total);
1544
1545                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1546                         /* This still has a race (about when exactly the peers
1547                          * detect connection loss) that can lead to a full sync
1548                          * on next handshake. In 8.3.9 we fixed this with explicit
1549                          * resync-finished notifications, but the fix
1550                          * introduces a protocol change.  Sleeping for some
1551                          * time longer than the ping interval + timeout on the
1552                          * SyncSource, to give the SyncTarget the chance to
1553                          * detect connection loss, then waiting for a ping
1554                          * response (implicit in drbd_resync_finished) reduces
1555                          * the race considerably, but does not solve it. */
1556                         if (side == C_SYNC_SOURCE)
1557                                 schedule_timeout_interruptible(
1558                                         mdev->net_conf->ping_int * HZ +
1559                                         mdev->net_conf->ping_timeo*HZ/9);
1560                         drbd_resync_finished(mdev);
1561                 }
1562
1563                 drbd_rs_controller_reset(mdev);
1564                 /* ns.conn may already be != mdev->state.conn,
1565                  * we may have been paused in between, or become paused until
1566                  * the timer triggers.
1567                  * No matter, that is handled in resync_timer_fn() */
1568                 if (ns.conn == C_SYNC_TARGET)
1569                         mod_timer(&mdev->resync_timer, jiffies);
1570
1571                 drbd_md_sync(mdev);
1572         }
1573         drbd_state_unlock(mdev);
1574 }
1575
1576 int drbd_worker(struct drbd_thread *thi)
1577 {
1578         struct drbd_conf *mdev = thi->mdev;
1579         struct drbd_work *w = NULL;
1580         LIST_HEAD(work_list);
1581         int intr = 0, i;
1582
1583         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1584
1585         while (get_t_state(thi) == Running) {
1586                 drbd_thread_current_set_cpu(mdev);
1587
1588                 if (down_trylock(&mdev->data.work.s)) {
1589                         mutex_lock(&mdev->data.mutex);
1590                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1591                                 drbd_tcp_uncork(mdev->data.socket);
1592                         mutex_unlock(&mdev->data.mutex);
1593
1594                         intr = down_interruptible(&mdev->data.work.s);
1595
1596                         mutex_lock(&mdev->data.mutex);
1597                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1598                                 drbd_tcp_cork(mdev->data.socket);
1599                         mutex_unlock(&mdev->data.mutex);
1600                 }
1601
1602                 if (intr) {
1603                         D_ASSERT(intr == -EINTR);
1604                         flush_signals(current);
1605                         ERR_IF (get_t_state(thi) == Running)
1606                                 continue;
1607                         break;
1608                 }
1609
1610                 if (get_t_state(thi) != Running)
1611                         break;
1612                 /* With this break, we have done a down() but not consumed
1613                    the entry from the list. The cleanup code takes care of
1614                    this...   */
1615
1616                 w = NULL;
1617                 spin_lock_irq(&mdev->data.work.q_lock);
1618                 ERR_IF(list_empty(&mdev->data.work.q)) {
1619                         /* something terribly wrong in our logic.
1620                          * we were able to down() the semaphore,
1621                          * but the list is empty... doh.
1622                          *
1623                          * what is the best thing to do now?
1624                          * try again from scratch, restarting the receiver,
1625                          * asender, whatnot? could break even more ugly,
1626                          * e.g. when we are primary, but no good local data.
1627                          *
1628                          * I'll try to get away just starting over this loop.
1629                          */
1630                         spin_unlock_irq(&mdev->data.work.q_lock);
1631                         continue;
1632                 }
1633                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1634                 list_del_init(&w->list);
1635                 spin_unlock_irq(&mdev->data.work.q_lock);
1636
1637                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1638                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1639                         if (mdev->state.conn >= C_CONNECTED)
1640                                 drbd_force_state(mdev,
1641                                                 NS(conn, C_NETWORK_FAILURE));
1642                 }
1643         }
1644         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1645         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1646
1647         spin_lock_irq(&mdev->data.work.q_lock);
1648         i = 0;
1649         while (!list_empty(&mdev->data.work.q)) {
1650                 list_splice_init(&mdev->data.work.q, &work_list);
1651                 spin_unlock_irq(&mdev->data.work.q_lock);
1652
1653                 while (!list_empty(&work_list)) {
1654                         w = list_entry(work_list.next, struct drbd_work, list);
1655                         list_del_init(&w->list);
1656                         w->cb(mdev, w, 1);
1657                         i++; /* dead debugging code */
1658                 }
1659
1660                 spin_lock_irq(&mdev->data.work.q_lock);
1661         }
1662         sema_init(&mdev->data.work.s, 0);
1663         /* DANGEROUS race: if someone did queue his work within the spinlock,
1664          * but up() ed outside the spinlock, we could get an up() on the
1665          * semaphore without corresponding list entry.
1666          * So don't do that.
1667          */
1668         spin_unlock_irq(&mdev->data.work.q_lock);
1669
1670         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1671         /* _drbd_set_state only uses stop_nowait.
1672          * wait here for the Exiting receiver. */
1673         drbd_thread_stop(&mdev->receiver);
1674         drbd_mdev_cleanup(mdev);
1675
1676         dev_info(DEV, "worker terminated\n");
1677
1678         clear_bit(DEVICE_DYING, &mdev->flags);
1679         clear_bit(CONFIG_PENDING, &mdev->flags);
1680         wake_up(&mdev->state_wait);
1681
1682         return 0;
1683 }