Merge branch 'devel' of git://git.kernel.org/pub/scm/linux/kernel/git/mchehab/edac
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61
62
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74         struct drbd_md_io *md_io;
75
76         md_io = (struct drbd_md_io *)bio->bi_private;
77         md_io->error = error;
78
79         complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87         unsigned long flags = 0;
88         struct drbd_conf *mdev = e->mdev;
89
90         D_ASSERT(e->block_id != ID_VACANT);
91
92         spin_lock_irqsave(&mdev->req_lock, flags);
93         mdev->read_cnt += e->size >> 9;
94         list_del(&e->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &e->flags))
98                 __drbd_chk_io_error(mdev, FALSE);
99         spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101         drbd_queue_work(&mdev->data.work, &e->w);
102         put_ldev(mdev);
103 }
104
105 static int is_failed_barrier(int ee_flags)
106 {
107         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108                         == (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_conf *mdev = e->mdev;
117         sector_t e_sector;
118         int do_wake;
119         int is_syncer_req;
120         int do_al_complete_io;
121
122         /* if this is a failed barrier request, disable use of barriers,
123          * and schedule for resubmission */
124         if (is_failed_barrier(e->flags)) {
125                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126                 spin_lock_irqsave(&mdev->req_lock, flags);
127                 list_del(&e->w.list);
128                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129                 e->w.cb = w_e_reissue;
130                 /* put_ldev actually happens below, once we come here again. */
131                 __release(local);
132                 spin_unlock_irqrestore(&mdev->req_lock, flags);
133                 drbd_queue_work(&mdev->data.work, &e->w);
134                 return;
135         }
136
137         D_ASSERT(e->block_id != ID_VACANT);
138
139         /* after we moved e to done_ee,
140          * we may no longer access it,
141          * it may be freed/reused already!
142          * (as soon as we release the req_lock) */
143         e_sector = e->sector;
144         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145         is_syncer_req = is_syncer_block_id(e->block_id);
146
147         spin_lock_irqsave(&mdev->req_lock, flags);
148         mdev->writ_cnt += e->size >> 9;
149         list_del(&e->w.list); /* has been on active_ee or sync_ee */
150         list_add_tail(&e->w.list, &mdev->done_ee);
151
152         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153          * neither did we wake possibly waiting conflicting requests.
154          * done from "drbd_process_done_ee" within the appropriate w.cb
155          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157         do_wake = is_syncer_req
158                 ? list_empty(&mdev->sync_ee)
159                 : list_empty(&mdev->active_ee);
160
161         if (test_bit(__EE_WAS_ERROR, &e->flags))
162                 __drbd_chk_io_error(mdev, FALSE);
163         spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165         if (is_syncer_req)
166                 drbd_rs_complete_io(mdev, e_sector);
167
168         if (do_wake)
169                 wake_up(&mdev->ee_wait);
170
171         if (do_al_complete_io)
172                 drbd_al_complete_io(mdev, e_sector);
173
174         wake_asender(mdev);
175         put_ldev(mdev);
176 }
177
178 /* writes on behalf of the partner, or resync writes,
179  * "submitted" by the receiver.
180  */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183         struct drbd_epoch_entry *e = bio->bi_private;
184         struct drbd_conf *mdev = e->mdev;
185         int uptodate = bio_flagged(bio, BIO_UPTODATE);
186         int is_write = bio_data_dir(bio) == WRITE;
187
188         if (error)
189                 dev_warn(DEV, "%s: error=%d s=%llus\n",
190                                 is_write ? "write" : "read", error,
191                                 (unsigned long long)e->sector);
192         if (!error && !uptodate) {
193                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194                                 is_write ? "write" : "read",
195                                 (unsigned long long)e->sector);
196                 /* strange behavior of some lower level drivers...
197                  * fail the request by clearing the uptodate flag,
198                  * but do not return any error?! */
199                 error = -EIO;
200         }
201
202         if (error)
203                 set_bit(__EE_WAS_ERROR, &e->flags);
204
205         bio_put(bio); /* no need for the bio anymore */
206         if (atomic_dec_and_test(&e->pending_bios)) {
207                 if (is_write)
208                         drbd_endio_write_sec_final(e);
209                 else
210                         drbd_endio_read_sec_final(e);
211         }
212 }
213
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215  */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218         struct drbd_request *req = bio->bi_private;
219         struct drbd_conf *mdev = req->mdev;
220         enum drbd_req_event what;
221         int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
223         if (!error && !uptodate) {
224                 dev_warn(DEV, "p %s: setting error to -EIO\n",
225                          bio_data_dir(bio) == WRITE ? "write" : "read");
226                 /* strange behavior of some lower level drivers...
227                  * fail the request by clearing the uptodate flag,
228                  * but do not return any error?! */
229                 error = -EIO;
230         }
231
232         /* to avoid recursion in __req_mod */
233         if (unlikely(error)) {
234                 what = (bio_data_dir(bio) == WRITE)
235                         ? write_completed_with_error
236                         : (bio_rw(bio) == READ)
237                           ? read_completed_with_error
238                           : read_ahead_completed_with_error;
239         } else
240                 what = completed_ok;
241
242         bio_put(req->private_bio);
243         req->private_bio = ERR_PTR(error);
244
245         req_mod(req, what);
246 }
247
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249 {
250         struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252         /* We should not detach for read io-error,
253          * but try to WRITE the P_DATA_REPLY to the failed location,
254          * to give the disk the chance to relocate that block */
255
256         spin_lock_irq(&mdev->req_lock);
257         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258                 _req_mod(req, read_retry_remote_canceled);
259                 spin_unlock_irq(&mdev->req_lock);
260                 return 1;
261         }
262         spin_unlock_irq(&mdev->req_lock);
263
264         return w_send_read_req(mdev, w, 0);
265 }
266
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268 {
269         ERR_IF(cancel) return 1;
270         dev_err(DEV, "resync inactive, but callback triggered??\n");
271         return 1; /* Simply ignore this! */
272 }
273
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275 {
276         struct hash_desc desc;
277         struct scatterlist sg;
278         struct page *page = e->pages;
279         struct page *tmp;
280         unsigned len;
281
282         desc.tfm = tfm;
283         desc.flags = 0;
284
285         sg_init_table(&sg, 1);
286         crypto_hash_init(&desc);
287
288         while ((tmp = page_chain_next(page))) {
289                 /* all but the last page will be fully used */
290                 sg_set_page(&sg, page, PAGE_SIZE, 0);
291                 crypto_hash_update(&desc, &sg, sg.length);
292                 page = tmp;
293         }
294         /* and now the last, possibly only partially used page */
295         len = e->size & (PAGE_SIZE - 1);
296         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297         crypto_hash_update(&desc, &sg, sg.length);
298         crypto_hash_final(&desc, digest);
299 }
300
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302 {
303         struct hash_desc desc;
304         struct scatterlist sg;
305         struct bio_vec *bvec;
306         int i;
307
308         desc.tfm = tfm;
309         desc.flags = 0;
310
311         sg_init_table(&sg, 1);
312         crypto_hash_init(&desc);
313
314         __bio_for_each_segment(bvec, bio, i, 0) {
315                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316                 crypto_hash_update(&desc, &sg, sg.length);
317         }
318         crypto_hash_final(&desc, digest);
319 }
320
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322 {
323         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324         int digest_size;
325         void *digest;
326         int ok;
327
328         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330         if (unlikely(cancel)) {
331                 drbd_free_ee(mdev, e);
332                 return 1;
333         }
334
335         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337                 digest = kmalloc(digest_size, GFP_NOIO);
338                 if (digest) {
339                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340
341                         inc_rs_pending(mdev);
342                         ok = drbd_send_drequest_csum(mdev,
343                                                      e->sector,
344                                                      e->size,
345                                                      digest,
346                                                      digest_size,
347                                                      P_CSUM_RS_REQUEST);
348                         kfree(digest);
349                 } else {
350                         dev_err(DEV, "kmalloc() of digest failed.\n");
351                         ok = 0;
352                 }
353         } else
354                 ok = 1;
355
356         drbd_free_ee(mdev, e);
357
358         if (unlikely(!ok))
359                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360         return ok;
361 }
362
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366 {
367         struct drbd_epoch_entry *e;
368
369         if (!get_ldev(mdev))
370                 return -EIO;
371
372         if (drbd_rs_should_slow_down(mdev))
373                 goto defer;
374
375         /* GFP_TRY, because if there is no memory available right now, this may
376          * be rescheduled for later. It is "only" background resync, after all. */
377         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378         if (!e)
379                 goto defer;
380
381         e->w.cb = w_e_send_csum;
382         spin_lock_irq(&mdev->req_lock);
383         list_add(&e->w.list, &mdev->read_ee);
384         spin_unlock_irq(&mdev->req_lock);
385
386         atomic_add(size >> 9, &mdev->rs_sect_ev);
387         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388                 return 0;
389
390         /* drbd_submit_ee currently fails for one reason only:
391          * not being able to allocate enough bios.
392          * Is dropping the connection going to help? */
393         spin_lock_irq(&mdev->req_lock);
394         list_del(&e->w.list);
395         spin_unlock_irq(&mdev->req_lock);
396
397         drbd_free_ee(mdev, e);
398 defer:
399         put_ldev(mdev);
400         return -EAGAIN;
401 }
402
403 void resync_timer_fn(unsigned long data)
404 {
405         struct drbd_conf *mdev = (struct drbd_conf *) data;
406         int queue;
407
408         queue = 1;
409         switch (mdev->state.conn) {
410         case C_VERIFY_S:
411                 mdev->resync_work.cb = w_make_ov_request;
412                 break;
413         case C_SYNC_TARGET:
414                 mdev->resync_work.cb = w_make_resync_request;
415                 break;
416         default:
417                 queue = 0;
418                 mdev->resync_work.cb = w_resync_inactive;
419         }
420
421         /* harmless race: list_empty outside data.work.q_lock */
422         if (list_empty(&mdev->resync_work.list) && queue)
423                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
424 }
425
426 static void fifo_set(struct fifo_buffer *fb, int value)
427 {
428         int i;
429
430         for (i = 0; i < fb->size; i++)
431                 fb->values[i] = value;
432 }
433
434 static int fifo_push(struct fifo_buffer *fb, int value)
435 {
436         int ov;
437
438         ov = fb->values[fb->head_index];
439         fb->values[fb->head_index++] = value;
440
441         if (fb->head_index >= fb->size)
442                 fb->head_index = 0;
443
444         return ov;
445 }
446
447 static void fifo_add_val(struct fifo_buffer *fb, int value)
448 {
449         int i;
450
451         for (i = 0; i < fb->size; i++)
452                 fb->values[i] += value;
453 }
454
455 int drbd_rs_controller(struct drbd_conf *mdev)
456 {
457         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
458         unsigned int want;     /* The number of sectors we want in the proxy */
459         int req_sect; /* Number of sectors to request in this turn */
460         int correction; /* Number of sectors more we need in the proxy*/
461         int cps; /* correction per invocation of drbd_rs_controller() */
462         int steps; /* Number of time steps to plan ahead */
463         int curr_corr;
464         int max_sect;
465
466         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
467         mdev->rs_in_flight -= sect_in;
468
469         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
470
471         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
472
473         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
475         } else { /* normal path */
476                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
477                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
478         }
479
480         correction = want - mdev->rs_in_flight - mdev->rs_planed;
481
482         /* Plan ahead */
483         cps = correction / steps;
484         fifo_add_val(&mdev->rs_plan_s, cps);
485         mdev->rs_planed += cps * steps;
486
487         /* What we do in this step */
488         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
489         spin_unlock(&mdev->peer_seq_lock);
490         mdev->rs_planed -= curr_corr;
491
492         req_sect = sect_in + curr_corr;
493         if (req_sect < 0)
494                 req_sect = 0;
495
496         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
497         if (req_sect > max_sect)
498                 req_sect = max_sect;
499
500         /*
501         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
502                  sect_in, mdev->rs_in_flight, want, correction,
503                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
504         */
505
506         return req_sect;
507 }
508
509 int w_make_resync_request(struct drbd_conf *mdev,
510                 struct drbd_work *w, int cancel)
511 {
512         unsigned long bit;
513         sector_t sector;
514         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
515         int max_segment_size;
516         int number, rollback_i, size, pe, mx;
517         int align, queued, sndbuf;
518         int i = 0;
519
520         if (unlikely(cancel))
521                 return 1;
522
523         if (unlikely(mdev->state.conn < C_CONNECTED)) {
524                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
525                 return 0;
526         }
527
528         if (mdev->state.conn != C_SYNC_TARGET)
529                 dev_err(DEV, "%s in w_make_resync_request\n",
530                         drbd_conn_str(mdev->state.conn));
531
532         if (mdev->rs_total == 0) {
533                 /* empty resync? */
534                 drbd_resync_finished(mdev);
535                 return 1;
536         }
537
538         if (!get_ldev(mdev)) {
539                 /* Since we only need to access mdev->rsync a
540                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
541                    to continue resync with a broken disk makes no sense at
542                    all */
543                 dev_err(DEV, "Disk broke down during resync!\n");
544                 mdev->resync_work.cb = w_resync_inactive;
545                 return 1;
546         }
547
548         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
549          * if it should be necessary */
550         max_segment_size =
551                 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
552                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
553
554         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
555                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
556                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
557         } else {
558                 mdev->c_sync_rate = mdev->sync_conf.rate;
559                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
560         }
561
562         /* Throttle resync on lower level disk activity, which may also be
563          * caused by application IO on Primary/SyncTarget.
564          * Keep this after the call to drbd_rs_controller, as that assumes
565          * to be called as precisely as possible every SLEEP_TIME,
566          * and would be confused otherwise. */
567         if (drbd_rs_should_slow_down(mdev))
568                 goto requeue;
569
570         mutex_lock(&mdev->data.mutex);
571         if (mdev->data.socket)
572                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
573         else
574                 mx = 1;
575         mutex_unlock(&mdev->data.mutex);
576
577         /* For resync rates >160MB/sec, allow more pending RS requests */
578         if (number > mx)
579                 mx = number;
580
581         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
582         pe = atomic_read(&mdev->rs_pending_cnt);
583         if ((pe + number) > mx) {
584                 number = mx - pe;
585         }
586
587         for (i = 0; i < number; i++) {
588                 /* Stop generating RS requests, when half of the send buffer is filled */
589                 mutex_lock(&mdev->data.mutex);
590                 if (mdev->data.socket) {
591                         queued = mdev->data.socket->sk->sk_wmem_queued;
592                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
593                 } else {
594                         queued = 1;
595                         sndbuf = 0;
596                 }
597                 mutex_unlock(&mdev->data.mutex);
598                 if (queued > sndbuf / 2)
599                         goto requeue;
600
601 next_sector:
602                 size = BM_BLOCK_SIZE;
603                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
604
605                 if (bit == -1UL) {
606                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
607                         mdev->resync_work.cb = w_resync_inactive;
608                         put_ldev(mdev);
609                         return 1;
610                 }
611
612                 sector = BM_BIT_TO_SECT(bit);
613
614                 if (drbd_try_rs_begin_io(mdev, sector)) {
615                         mdev->bm_resync_fo = bit;
616                         goto requeue;
617                 }
618                 mdev->bm_resync_fo = bit + 1;
619
620                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
621                         drbd_rs_complete_io(mdev, sector);
622                         goto next_sector;
623                 }
624
625 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
626                 /* try to find some adjacent bits.
627                  * we stop if we have already the maximum req size.
628                  *
629                  * Additionally always align bigger requests, in order to
630                  * be prepared for all stripe sizes of software RAIDs.
631                  */
632                 align = 1;
633                 rollback_i = i;
634                 for (;;) {
635                         if (size + BM_BLOCK_SIZE > max_segment_size)
636                                 break;
637
638                         /* Be always aligned */
639                         if (sector & ((1<<(align+3))-1))
640                                 break;
641
642                         /* do not cross extent boundaries */
643                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
644                                 break;
645                         /* now, is it actually dirty, after all?
646                          * caution, drbd_bm_test_bit is tri-state for some
647                          * obscure reason; ( b == 0 ) would get the out-of-band
648                          * only accidentally right because of the "oddly sized"
649                          * adjustment below */
650                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
651                                 break;
652                         bit++;
653                         size += BM_BLOCK_SIZE;
654                         if ((BM_BLOCK_SIZE << align) <= size)
655                                 align++;
656                         i++;
657                 }
658                 /* if we merged some,
659                  * reset the offset to start the next drbd_bm_find_next from */
660                 if (size > BM_BLOCK_SIZE)
661                         mdev->bm_resync_fo = bit + 1;
662 #endif
663
664                 /* adjust very last sectors, in case we are oddly sized */
665                 if (sector + (size>>9) > capacity)
666                         size = (capacity-sector)<<9;
667                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
668                         switch (read_for_csum(mdev, sector, size)) {
669                         case -EIO: /* Disk failure */
670                                 put_ldev(mdev);
671                                 return 0;
672                         case -EAGAIN: /* allocation failed, or ldev busy */
673                                 drbd_rs_complete_io(mdev, sector);
674                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
675                                 i = rollback_i;
676                                 goto requeue;
677                         case 0:
678                                 /* everything ok */
679                                 break;
680                         default:
681                                 BUG();
682                         }
683                 } else {
684                         inc_rs_pending(mdev);
685                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
686                                                sector, size, ID_SYNCER)) {
687                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
688                                 dec_rs_pending(mdev);
689                                 put_ldev(mdev);
690                                 return 0;
691                         }
692                 }
693         }
694
695         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
696                 /* last syncer _request_ was sent,
697                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
698                  * next sync group will resume), as soon as we receive the last
699                  * resync data block, and the last bit is cleared.
700                  * until then resync "work" is "inactive" ...
701                  */
702                 mdev->resync_work.cb = w_resync_inactive;
703                 put_ldev(mdev);
704                 return 1;
705         }
706
707  requeue:
708         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710         put_ldev(mdev);
711         return 1;
712 }
713
714 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
715 {
716         int number, i, size;
717         sector_t sector;
718         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
719
720         if (unlikely(cancel))
721                 return 1;
722
723         if (unlikely(mdev->state.conn < C_CONNECTED)) {
724                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
725                 return 0;
726         }
727
728         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
729         if (atomic_read(&mdev->rs_pending_cnt) > number)
730                 goto requeue;
731
732         number -= atomic_read(&mdev->rs_pending_cnt);
733
734         sector = mdev->ov_position;
735         for (i = 0; i < number; i++) {
736                 if (sector >= capacity) {
737                         mdev->resync_work.cb = w_resync_inactive;
738                         return 1;
739                 }
740
741                 size = BM_BLOCK_SIZE;
742
743                 if (drbd_try_rs_begin_io(mdev, sector)) {
744                         mdev->ov_position = sector;
745                         goto requeue;
746                 }
747
748                 if (sector + (size>>9) > capacity)
749                         size = (capacity-sector)<<9;
750
751                 inc_rs_pending(mdev);
752                 if (!drbd_send_ov_request(mdev, sector, size)) {
753                         dec_rs_pending(mdev);
754                         return 0;
755                 }
756                 sector += BM_SECT_PER_BIT;
757         }
758         mdev->ov_position = sector;
759
760  requeue:
761         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
762         return 1;
763 }
764
765
766 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
767 {
768         kfree(w);
769         ov_oos_print(mdev);
770         drbd_resync_finished(mdev);
771
772         return 1;
773 }
774
775 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
776 {
777         kfree(w);
778
779         drbd_resync_finished(mdev);
780
781         return 1;
782 }
783
784 static void ping_peer(struct drbd_conf *mdev)
785 {
786         clear_bit(GOT_PING_ACK, &mdev->flags);
787         request_ping(mdev);
788         wait_event(mdev->misc_wait,
789                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
790 }
791
792 int drbd_resync_finished(struct drbd_conf *mdev)
793 {
794         unsigned long db, dt, dbdt;
795         unsigned long n_oos;
796         union drbd_state os, ns;
797         struct drbd_work *w;
798         char *khelper_cmd = NULL;
799
800         /* Remove all elements from the resync LRU. Since future actions
801          * might set bits in the (main) bitmap, then the entries in the
802          * resync LRU would be wrong. */
803         if (drbd_rs_del_all(mdev)) {
804                 /* In case this is not possible now, most probably because
805                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
806                  * queue (or even the read operations for those packets
807                  * is not finished by now).   Retry in 100ms. */
808
809                 drbd_kick_lo(mdev);
810                 __set_current_state(TASK_INTERRUPTIBLE);
811                 schedule_timeout(HZ / 10);
812                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
813                 if (w) {
814                         w->cb = w_resync_finished;
815                         drbd_queue_work(&mdev->data.work, w);
816                         return 1;
817                 }
818                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
819         }
820
821         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
822         if (dt <= 0)
823                 dt = 1;
824         db = mdev->rs_total;
825         dbdt = Bit2KB(db/dt);
826         mdev->rs_paused /= HZ;
827
828         if (!get_ldev(mdev))
829                 goto out;
830
831         ping_peer(mdev);
832
833         spin_lock_irq(&mdev->req_lock);
834         os = mdev->state;
835
836         /* This protects us against multiple calls (that can happen in the presence
837            of application IO), and against connectivity loss just before we arrive here. */
838         if (os.conn <= C_CONNECTED)
839                 goto out_unlock;
840
841         ns = os;
842         ns.conn = C_CONNECTED;
843
844         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
845              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
846              "Online verify " : "Resync",
847              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
848
849         n_oos = drbd_bm_total_weight(mdev);
850
851         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
852                 if (n_oos) {
853                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
854                               n_oos, Bit2KB(1));
855                         khelper_cmd = "out-of-sync";
856                 }
857         } else {
858                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
859
860                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
861                         khelper_cmd = "after-resync-target";
862
863                 if (mdev->csums_tfm && mdev->rs_total) {
864                         const unsigned long s = mdev->rs_same_csum;
865                         const unsigned long t = mdev->rs_total;
866                         const int ratio =
867                                 (t == 0)     ? 0 :
868                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
869                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
870                              "transferred %luK total %luK\n",
871                              ratio,
872                              Bit2KB(mdev->rs_same_csum),
873                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
874                              Bit2KB(mdev->rs_total));
875                 }
876         }
877
878         if (mdev->rs_failed) {
879                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
880
881                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
882                         ns.disk = D_INCONSISTENT;
883                         ns.pdsk = D_UP_TO_DATE;
884                 } else {
885                         ns.disk = D_UP_TO_DATE;
886                         ns.pdsk = D_INCONSISTENT;
887                 }
888         } else {
889                 ns.disk = D_UP_TO_DATE;
890                 ns.pdsk = D_UP_TO_DATE;
891
892                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
893                         if (mdev->p_uuid) {
894                                 int i;
895                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
896                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
897                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
898                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
899                         } else {
900                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
901                         }
902                 }
903
904                 drbd_uuid_set_bm(mdev, 0UL);
905
906                 if (mdev->p_uuid) {
907                         /* Now the two UUID sets are equal, update what we
908                          * know of the peer. */
909                         int i;
910                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
911                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
912                 }
913         }
914
915         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
916 out_unlock:
917         spin_unlock_irq(&mdev->req_lock);
918         put_ldev(mdev);
919 out:
920         mdev->rs_total  = 0;
921         mdev->rs_failed = 0;
922         mdev->rs_paused = 0;
923         mdev->ov_start_sector = 0;
924
925         drbd_md_sync(mdev);
926
927         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
928                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
929                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
930         }
931
932         if (khelper_cmd)
933                 drbd_khelper(mdev, khelper_cmd);
934
935         return 1;
936 }
937
938 /* helper */
939 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
940 {
941         if (drbd_ee_has_active_page(e)) {
942                 /* This might happen if sendpage() has not finished */
943                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
944                 atomic_add(i, &mdev->pp_in_use_by_net);
945                 atomic_sub(i, &mdev->pp_in_use);
946                 spin_lock_irq(&mdev->req_lock);
947                 list_add_tail(&e->w.list, &mdev->net_ee);
948                 spin_unlock_irq(&mdev->req_lock);
949                 wake_up(&drbd_pp_wait);
950         } else
951                 drbd_free_ee(mdev, e);
952 }
953
954 /**
955  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
956  * @mdev:       DRBD device.
957  * @w:          work object.
958  * @cancel:     The connection will be closed anyways
959  */
960 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
961 {
962         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
963         int ok;
964
965         if (unlikely(cancel)) {
966                 drbd_free_ee(mdev, e);
967                 dec_unacked(mdev);
968                 return 1;
969         }
970
971         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
972                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
973         } else {
974                 if (__ratelimit(&drbd_ratelimit_state))
975                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
976                             (unsigned long long)e->sector);
977
978                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
979         }
980
981         dec_unacked(mdev);
982
983         move_to_net_ee_or_free(mdev, e);
984
985         if (unlikely(!ok))
986                 dev_err(DEV, "drbd_send_block() failed\n");
987         return ok;
988 }
989
990 /**
991  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
992  * @mdev:       DRBD device.
993  * @w:          work object.
994  * @cancel:     The connection will be closed anyways
995  */
996 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
997 {
998         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
999         int ok;
1000
1001         if (unlikely(cancel)) {
1002                 drbd_free_ee(mdev, e);
1003                 dec_unacked(mdev);
1004                 return 1;
1005         }
1006
1007         if (get_ldev_if_state(mdev, D_FAILED)) {
1008                 drbd_rs_complete_io(mdev, e->sector);
1009                 put_ldev(mdev);
1010         }
1011
1012         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1013                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1014                         inc_rs_pending(mdev);
1015                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1016                 } else {
1017                         if (__ratelimit(&drbd_ratelimit_state))
1018                                 dev_err(DEV, "Not sending RSDataReply, "
1019                                     "partner DISKLESS!\n");
1020                         ok = 1;
1021                 }
1022         } else {
1023                 if (__ratelimit(&drbd_ratelimit_state))
1024                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1025                             (unsigned long long)e->sector);
1026
1027                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1028
1029                 /* update resync data with failure */
1030                 drbd_rs_failed_io(mdev, e->sector, e->size);
1031         }
1032
1033         dec_unacked(mdev);
1034
1035         move_to_net_ee_or_free(mdev, e);
1036
1037         if (unlikely(!ok))
1038                 dev_err(DEV, "drbd_send_block() failed\n");
1039         return ok;
1040 }
1041
1042 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1043 {
1044         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1045         struct digest_info *di;
1046         int digest_size;
1047         void *digest = NULL;
1048         int ok, eq = 0;
1049
1050         if (unlikely(cancel)) {
1051                 drbd_free_ee(mdev, e);
1052                 dec_unacked(mdev);
1053                 return 1;
1054         }
1055
1056         if (get_ldev(mdev)) {
1057                 drbd_rs_complete_io(mdev, e->sector);
1058                 put_ldev(mdev);
1059         }
1060
1061         di = e->digest;
1062
1063         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1064                 /* quick hack to try to avoid a race against reconfiguration.
1065                  * a real fix would be much more involved,
1066                  * introducing more locking mechanisms */
1067                 if (mdev->csums_tfm) {
1068                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1069                         D_ASSERT(digest_size == di->digest_size);
1070                         digest = kmalloc(digest_size, GFP_NOIO);
1071                 }
1072                 if (digest) {
1073                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1074                         eq = !memcmp(digest, di->digest, digest_size);
1075                         kfree(digest);
1076                 }
1077
1078                 if (eq) {
1079                         drbd_set_in_sync(mdev, e->sector, e->size);
1080                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1081                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1082                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1083                 } else {
1084                         inc_rs_pending(mdev);
1085                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1086                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1087                         kfree(di);
1088                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1089                 }
1090         } else {
1091                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1092                 if (__ratelimit(&drbd_ratelimit_state))
1093                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1094         }
1095
1096         dec_unacked(mdev);
1097         move_to_net_ee_or_free(mdev, e);
1098
1099         if (unlikely(!ok))
1100                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1101         return ok;
1102 }
1103
1104 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1105 {
1106         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1107         int digest_size;
1108         void *digest;
1109         int ok = 1;
1110
1111         if (unlikely(cancel))
1112                 goto out;
1113
1114         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1115                 goto out;
1116
1117         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1118         /* FIXME if this allocation fails, online verify will not terminate! */
1119         digest = kmalloc(digest_size, GFP_NOIO);
1120         if (digest) {
1121                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1122                 inc_rs_pending(mdev);
1123                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1124                                              digest, digest_size, P_OV_REPLY);
1125                 if (!ok)
1126                         dec_rs_pending(mdev);
1127                 kfree(digest);
1128         }
1129
1130 out:
1131         drbd_free_ee(mdev, e);
1132
1133         dec_unacked(mdev);
1134
1135         return ok;
1136 }
1137
1138 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1139 {
1140         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1141                 mdev->ov_last_oos_size += size>>9;
1142         } else {
1143                 mdev->ov_last_oos_start = sector;
1144                 mdev->ov_last_oos_size = size>>9;
1145         }
1146         drbd_set_out_of_sync(mdev, sector, size);
1147         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1148 }
1149
1150 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1151 {
1152         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1153         struct digest_info *di;
1154         int digest_size;
1155         void *digest;
1156         int ok, eq = 0;
1157
1158         if (unlikely(cancel)) {
1159                 drbd_free_ee(mdev, e);
1160                 dec_unacked(mdev);
1161                 return 1;
1162         }
1163
1164         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1165          * the resync lru has been cleaned up already */
1166         if (get_ldev(mdev)) {
1167                 drbd_rs_complete_io(mdev, e->sector);
1168                 put_ldev(mdev);
1169         }
1170
1171         di = e->digest;
1172
1173         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1174                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1175                 digest = kmalloc(digest_size, GFP_NOIO);
1176                 if (digest) {
1177                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1178
1179                         D_ASSERT(digest_size == di->digest_size);
1180                         eq = !memcmp(digest, di->digest, digest_size);
1181                         kfree(digest);
1182                 }
1183         } else {
1184                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1185                 if (__ratelimit(&drbd_ratelimit_state))
1186                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1187         }
1188
1189         dec_unacked(mdev);
1190         if (!eq)
1191                 drbd_ov_oos_found(mdev, e->sector, e->size);
1192         else
1193                 ov_oos_print(mdev);
1194
1195         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1196                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1197
1198         drbd_free_ee(mdev, e);
1199
1200         if (--mdev->ov_left == 0) {
1201                 ov_oos_print(mdev);
1202                 drbd_resync_finished(mdev);
1203         }
1204
1205         return ok;
1206 }
1207
1208 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1209 {
1210         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1211         complete(&b->done);
1212         return 1;
1213 }
1214
1215 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1216 {
1217         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1218         struct p_barrier *p = &mdev->data.sbuf.barrier;
1219         int ok = 1;
1220
1221         /* really avoid racing with tl_clear.  w.cb may have been referenced
1222          * just before it was reassigned and re-queued, so double check that.
1223          * actually, this race was harmless, since we only try to send the
1224          * barrier packet here, and otherwise do nothing with the object.
1225          * but compare with the head of w_clear_epoch */
1226         spin_lock_irq(&mdev->req_lock);
1227         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1228                 cancel = 1;
1229         spin_unlock_irq(&mdev->req_lock);
1230         if (cancel)
1231                 return 1;
1232
1233         if (!drbd_get_data_sock(mdev))
1234                 return 0;
1235         p->barrier = b->br_number;
1236         /* inc_ap_pending was done where this was queued.
1237          * dec_ap_pending will be done in got_BarrierAck
1238          * or (on connection loss) in w_clear_epoch.  */
1239         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1240                                 (struct p_header80 *)p, sizeof(*p), 0);
1241         drbd_put_data_sock(mdev);
1242
1243         return ok;
1244 }
1245
1246 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247 {
1248         if (cancel)
1249                 return 1;
1250         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1251 }
1252
1253 /**
1254  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255  * @mdev:       DRBD device.
1256  * @w:          work object.
1257  * @cancel:     The connection will be closed anyways
1258  */
1259 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1260 {
1261         struct drbd_request *req = container_of(w, struct drbd_request, w);
1262         int ok;
1263
1264         if (unlikely(cancel)) {
1265                 req_mod(req, send_canceled);
1266                 return 1;
1267         }
1268
1269         ok = drbd_send_dblock(mdev, req);
1270         req_mod(req, ok ? handed_over_to_network : send_failed);
1271
1272         return ok;
1273 }
1274
1275 /**
1276  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1277  * @mdev:       DRBD device.
1278  * @w:          work object.
1279  * @cancel:     The connection will be closed anyways
1280  */
1281 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1282 {
1283         struct drbd_request *req = container_of(w, struct drbd_request, w);
1284         int ok;
1285
1286         if (unlikely(cancel)) {
1287                 req_mod(req, send_canceled);
1288                 return 1;
1289         }
1290
1291         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1292                                 (unsigned long)req);
1293
1294         if (!ok) {
1295                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1296                  * so this is probably redundant */
1297                 if (mdev->state.conn >= C_CONNECTED)
1298                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1299         }
1300         req_mod(req, ok ? handed_over_to_network : send_failed);
1301
1302         return ok;
1303 }
1304
1305 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1306 {
1307         struct drbd_request *req = container_of(w, struct drbd_request, w);
1308
1309         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1310                 drbd_al_begin_io(mdev, req->sector);
1311         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1312            theoretically. Practically it can not deadlock, since this is
1313            only used when unfreezing IOs. All the extents of the requests
1314            that made it into the TL are already active */
1315
1316         drbd_req_make_private_bio(req, req->master_bio);
1317         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1318         generic_make_request(req->private_bio);
1319
1320         return 1;
1321 }
1322
1323 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1324 {
1325         struct drbd_conf *odev = mdev;
1326
1327         while (1) {
1328                 if (odev->sync_conf.after == -1)
1329                         return 1;
1330                 odev = minor_to_mdev(odev->sync_conf.after);
1331                 ERR_IF(!odev) return 1;
1332                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1333                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1334                     odev->state.aftr_isp || odev->state.peer_isp ||
1335                     odev->state.user_isp)
1336                         return 0;
1337         }
1338 }
1339
1340 /**
1341  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342  * @mdev:       DRBD device.
1343  *
1344  * Called from process context only (admin command and after_state_ch).
1345  */
1346 static int _drbd_pause_after(struct drbd_conf *mdev)
1347 {
1348         struct drbd_conf *odev;
1349         int i, rv = 0;
1350
1351         for (i = 0; i < minor_count; i++) {
1352                 odev = minor_to_mdev(i);
1353                 if (!odev)
1354                         continue;
1355                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356                         continue;
1357                 if (!_drbd_may_sync_now(odev))
1358                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359                                != SS_NOTHING_TO_DO);
1360         }
1361
1362         return rv;
1363 }
1364
1365 /**
1366  * _drbd_resume_next() - Resume resync on all devices that may resync now
1367  * @mdev:       DRBD device.
1368  *
1369  * Called from process context only (admin command and worker).
1370  */
1371 static int _drbd_resume_next(struct drbd_conf *mdev)
1372 {
1373         struct drbd_conf *odev;
1374         int i, rv = 0;
1375
1376         for (i = 0; i < minor_count; i++) {
1377                 odev = minor_to_mdev(i);
1378                 if (!odev)
1379                         continue;
1380                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1381                         continue;
1382                 if (odev->state.aftr_isp) {
1383                         if (_drbd_may_sync_now(odev))
1384                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1385                                                         CS_HARD, NULL)
1386                                        != SS_NOTHING_TO_DO) ;
1387                 }
1388         }
1389         return rv;
1390 }
1391
1392 void resume_next_sg(struct drbd_conf *mdev)
1393 {
1394         write_lock_irq(&global_state_lock);
1395         _drbd_resume_next(mdev);
1396         write_unlock_irq(&global_state_lock);
1397 }
1398
1399 void suspend_other_sg(struct drbd_conf *mdev)
1400 {
1401         write_lock_irq(&global_state_lock);
1402         _drbd_pause_after(mdev);
1403         write_unlock_irq(&global_state_lock);
1404 }
1405
1406 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1407 {
1408         struct drbd_conf *odev;
1409
1410         if (o_minor == -1)
1411                 return NO_ERROR;
1412         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413                 return ERR_SYNC_AFTER;
1414
1415         /* check for loops */
1416         odev = minor_to_mdev(o_minor);
1417         while (1) {
1418                 if (odev == mdev)
1419                         return ERR_SYNC_AFTER_CYCLE;
1420
1421                 /* dependency chain ends here, no cycles. */
1422                 if (odev->sync_conf.after == -1)
1423                         return NO_ERROR;
1424
1425                 /* follow the dependency chain */
1426                 odev = minor_to_mdev(odev->sync_conf.after);
1427         }
1428 }
1429
1430 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1431 {
1432         int changes;
1433         int retcode;
1434
1435         write_lock_irq(&global_state_lock);
1436         retcode = sync_after_error(mdev, na);
1437         if (retcode == NO_ERROR) {
1438                 mdev->sync_conf.after = na;
1439                 do {
1440                         changes  = _drbd_pause_after(mdev);
1441                         changes |= _drbd_resume_next(mdev);
1442                 } while (changes);
1443         }
1444         write_unlock_irq(&global_state_lock);
1445         return retcode;
1446 }
1447
1448 /**
1449  * drbd_start_resync() - Start the resync process
1450  * @mdev:       DRBD device.
1451  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1452  *
1453  * This function might bring you directly into one of the
1454  * C_PAUSED_SYNC_* states.
1455  */
1456 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1457 {
1458         union drbd_state ns;
1459         int r;
1460
1461         if (mdev->state.conn >= C_SYNC_SOURCE) {
1462                 dev_err(DEV, "Resync already running!\n");
1463                 return;
1464         }
1465
1466         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1467         drbd_rs_cancel_all(mdev);
1468
1469         if (side == C_SYNC_TARGET) {
1470                 /* Since application IO was locked out during C_WF_BITMAP_T and
1471                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1472                    we check that we might make the data inconsistent. */
1473                 r = drbd_khelper(mdev, "before-resync-target");
1474                 r = (r >> 8) & 0xff;
1475                 if (r > 0) {
1476                         dev_info(DEV, "before-resync-target handler returned %d, "
1477                              "dropping connection.\n", r);
1478                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1479                         return;
1480                 }
1481         }
1482
1483         drbd_state_lock(mdev);
1484
1485         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1486                 drbd_state_unlock(mdev);
1487                 return;
1488         }
1489
1490         if (side == C_SYNC_TARGET) {
1491                 mdev->bm_resync_fo = 0;
1492         } else /* side == C_SYNC_SOURCE */ {
1493                 u64 uuid;
1494
1495                 get_random_bytes(&uuid, sizeof(u64));
1496                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1497                 drbd_send_sync_uuid(mdev, uuid);
1498
1499                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1500         }
1501
1502         write_lock_irq(&global_state_lock);
1503         ns = mdev->state;
1504
1505         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1506
1507         ns.conn = side;
1508
1509         if (side == C_SYNC_TARGET)
1510                 ns.disk = D_INCONSISTENT;
1511         else /* side == C_SYNC_SOURCE */
1512                 ns.pdsk = D_INCONSISTENT;
1513
1514         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1515         ns = mdev->state;
1516
1517         if (ns.conn < C_CONNECTED)
1518                 r = SS_UNKNOWN_ERROR;
1519
1520         if (r == SS_SUCCESS) {
1521                 unsigned long tw = drbd_bm_total_weight(mdev);
1522                 unsigned long now = jiffies;
1523                 int i;
1524
1525                 mdev->rs_failed    = 0;
1526                 mdev->rs_paused    = 0;
1527                 mdev->rs_same_csum = 0;
1528                 mdev->rs_last_events = 0;
1529                 mdev->rs_last_sect_ev = 0;
1530                 mdev->rs_total     = tw;
1531                 mdev->rs_start     = now;
1532                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1533                         mdev->rs_mark_left[i] = tw;
1534                         mdev->rs_mark_time[i] = now;
1535                 }
1536                 _drbd_pause_after(mdev);
1537         }
1538         write_unlock_irq(&global_state_lock);
1539         put_ldev(mdev);
1540
1541         if (r == SS_SUCCESS) {
1542                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1543                      drbd_conn_str(ns.conn),
1544                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1545                      (unsigned long) mdev->rs_total);
1546
1547                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1548                         /* This still has a race (about when exactly the peers
1549                          * detect connection loss) that can lead to a full sync
1550                          * on next handshake. In 8.3.9 we fixed this with explicit
1551                          * resync-finished notifications, but the fix
1552                          * introduces a protocol change.  Sleeping for some
1553                          * time longer than the ping interval + timeout on the
1554                          * SyncSource, to give the SyncTarget the chance to
1555                          * detect connection loss, then waiting for a ping
1556                          * response (implicit in drbd_resync_finished) reduces
1557                          * the race considerably, but does not solve it. */
1558                         if (side == C_SYNC_SOURCE)
1559                                 schedule_timeout_interruptible(
1560                                         mdev->net_conf->ping_int * HZ +
1561                                         mdev->net_conf->ping_timeo*HZ/9);
1562                         drbd_resync_finished(mdev);
1563                 }
1564
1565                 atomic_set(&mdev->rs_sect_in, 0);
1566                 atomic_set(&mdev->rs_sect_ev, 0);
1567                 mdev->rs_in_flight = 0;
1568                 mdev->rs_planed = 0;
1569                 spin_lock(&mdev->peer_seq_lock);
1570                 fifo_set(&mdev->rs_plan_s, 0);
1571                 spin_unlock(&mdev->peer_seq_lock);
1572                 /* ns.conn may already be != mdev->state.conn,
1573                  * we may have been paused in between, or become paused until
1574                  * the timer triggers.
1575                  * No matter, that is handled in resync_timer_fn() */
1576                 if (ns.conn == C_SYNC_TARGET)
1577                         mod_timer(&mdev->resync_timer, jiffies);
1578
1579                 drbd_md_sync(mdev);
1580         }
1581         drbd_state_unlock(mdev);
1582 }
1583
1584 int drbd_worker(struct drbd_thread *thi)
1585 {
1586         struct drbd_conf *mdev = thi->mdev;
1587         struct drbd_work *w = NULL;
1588         LIST_HEAD(work_list);
1589         int intr = 0, i;
1590
1591         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1592
1593         while (get_t_state(thi) == Running) {
1594                 drbd_thread_current_set_cpu(mdev);
1595
1596                 if (down_trylock(&mdev->data.work.s)) {
1597                         mutex_lock(&mdev->data.mutex);
1598                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1599                                 drbd_tcp_uncork(mdev->data.socket);
1600                         mutex_unlock(&mdev->data.mutex);
1601
1602                         intr = down_interruptible(&mdev->data.work.s);
1603
1604                         mutex_lock(&mdev->data.mutex);
1605                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1606                                 drbd_tcp_cork(mdev->data.socket);
1607                         mutex_unlock(&mdev->data.mutex);
1608                 }
1609
1610                 if (intr) {
1611                         D_ASSERT(intr == -EINTR);
1612                         flush_signals(current);
1613                         ERR_IF (get_t_state(thi) == Running)
1614                                 continue;
1615                         break;
1616                 }
1617
1618                 if (get_t_state(thi) != Running)
1619                         break;
1620                 /* With this break, we have done a down() but not consumed
1621                    the entry from the list. The cleanup code takes care of
1622                    this...   */
1623
1624                 w = NULL;
1625                 spin_lock_irq(&mdev->data.work.q_lock);
1626                 ERR_IF(list_empty(&mdev->data.work.q)) {
1627                         /* something terribly wrong in our logic.
1628                          * we were able to down() the semaphore,
1629                          * but the list is empty... doh.
1630                          *
1631                          * what is the best thing to do now?
1632                          * try again from scratch, restarting the receiver,
1633                          * asender, whatnot? could break even more ugly,
1634                          * e.g. when we are primary, but no good local data.
1635                          *
1636                          * I'll try to get away just starting over this loop.
1637                          */
1638                         spin_unlock_irq(&mdev->data.work.q_lock);
1639                         continue;
1640                 }
1641                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1642                 list_del_init(&w->list);
1643                 spin_unlock_irq(&mdev->data.work.q_lock);
1644
1645                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1646                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1647                         if (mdev->state.conn >= C_CONNECTED)
1648                                 drbd_force_state(mdev,
1649                                                 NS(conn, C_NETWORK_FAILURE));
1650                 }
1651         }
1652         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1653         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1654
1655         spin_lock_irq(&mdev->data.work.q_lock);
1656         i = 0;
1657         while (!list_empty(&mdev->data.work.q)) {
1658                 list_splice_init(&mdev->data.work.q, &work_list);
1659                 spin_unlock_irq(&mdev->data.work.q_lock);
1660
1661                 while (!list_empty(&work_list)) {
1662                         w = list_entry(work_list.next, struct drbd_work, list);
1663                         list_del_init(&w->list);
1664                         w->cb(mdev, w, 1);
1665                         i++; /* dead debugging code */
1666                 }
1667
1668                 spin_lock_irq(&mdev->data.work.q_lock);
1669         }
1670         sema_init(&mdev->data.work.s, 0);
1671         /* DANGEROUS race: if someone did queue his work within the spinlock,
1672          * but up() ed outside the spinlock, we could get an up() on the
1673          * semaphore without corresponding list entry.
1674          * So don't do that.
1675          */
1676         spin_unlock_irq(&mdev->data.work.q_lock);
1677
1678         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1679         /* _drbd_set_state only uses stop_nowait.
1680          * wait here for the Exiting receiver. */
1681         drbd_thread_stop(&mdev->receiver);
1682         drbd_mdev_cleanup(mdev);
1683
1684         dev_info(DEV, "worker terminated\n");
1685
1686         clear_bit(DEVICE_DYING, &mdev->flags);
1687         clear_bit(CONFIG_PENDING, &mdev->flags);
1688         wake_up(&mdev->state_wait);
1689
1690         return 0;
1691 }