Merge git://git.kernel.org/pub/scm/linux/kernel/git/jejb/scsi-misc-2.6
[pandora-kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42
43
44
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60
61
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73         struct drbd_md_io *md_io;
74
75         md_io = (struct drbd_md_io *)bio->bi_private;
76         md_io->error = error;
77
78         complete(&md_io->event);
79 }
80
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86         unsigned long flags = 0;
87         struct drbd_conf *mdev = e->mdev;
88
89         D_ASSERT(e->block_id != ID_VACANT);
90
91         spin_lock_irqsave(&mdev->req_lock, flags);
92         mdev->read_cnt += e->size >> 9;
93         list_del(&e->w.list);
94         if (list_empty(&mdev->read_ee))
95                 wake_up(&mdev->ee_wait);
96         if (test_bit(__EE_WAS_ERROR, &e->flags))
97                 __drbd_chk_io_error(mdev, FALSE);
98         spin_unlock_irqrestore(&mdev->req_lock, flags);
99
100         drbd_queue_work(&mdev->data.work, &e->w);
101         put_ldev(mdev);
102 }
103
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108         unsigned long flags = 0;
109         struct drbd_conf *mdev = e->mdev;
110         sector_t e_sector;
111         int do_wake;
112         int is_syncer_req;
113         int do_al_complete_io;
114
115         D_ASSERT(e->block_id != ID_VACANT);
116
117         /* after we moved e to done_ee,
118          * we may no longer access it,
119          * it may be freed/reused already!
120          * (as soon as we release the req_lock) */
121         e_sector = e->sector;
122         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123         is_syncer_req = is_syncer_block_id(e->block_id);
124
125         spin_lock_irqsave(&mdev->req_lock, flags);
126         mdev->writ_cnt += e->size >> 9;
127         list_del(&e->w.list); /* has been on active_ee or sync_ee */
128         list_add_tail(&e->w.list, &mdev->done_ee);
129
130         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131          * neither did we wake possibly waiting conflicting requests.
132          * done from "drbd_process_done_ee" within the appropriate w.cb
133          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134
135         do_wake = is_syncer_req
136                 ? list_empty(&mdev->sync_ee)
137                 : list_empty(&mdev->active_ee);
138
139         if (test_bit(__EE_WAS_ERROR, &e->flags))
140                 __drbd_chk_io_error(mdev, FALSE);
141         spin_unlock_irqrestore(&mdev->req_lock, flags);
142
143         if (is_syncer_req)
144                 drbd_rs_complete_io(mdev, e_sector);
145
146         if (do_wake)
147                 wake_up(&mdev->ee_wait);
148
149         if (do_al_complete_io)
150                 drbd_al_complete_io(mdev, e_sector);
151
152         wake_asender(mdev);
153         put_ldev(mdev);
154 }
155
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161         struct drbd_epoch_entry *e = bio->bi_private;
162         struct drbd_conf *mdev = e->mdev;
163         int uptodate = bio_flagged(bio, BIO_UPTODATE);
164         int is_write = bio_data_dir(bio) == WRITE;
165
166         if (error)
167                 dev_warn(DEV, "%s: error=%d s=%llus\n",
168                                 is_write ? "write" : "read", error,
169                                 (unsigned long long)e->sector);
170         if (!error && !uptodate) {
171                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172                                 is_write ? "write" : "read",
173                                 (unsigned long long)e->sector);
174                 /* strange behavior of some lower level drivers...
175                  * fail the request by clearing the uptodate flag,
176                  * but do not return any error?! */
177                 error = -EIO;
178         }
179
180         if (error)
181                 set_bit(__EE_WAS_ERROR, &e->flags);
182
183         bio_put(bio); /* no need for the bio anymore */
184         if (atomic_dec_and_test(&e->pending_bios)) {
185                 if (is_write)
186                         drbd_endio_write_sec_final(e);
187                 else
188                         drbd_endio_read_sec_final(e);
189         }
190 }
191
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196         unsigned long flags;
197         struct drbd_request *req = bio->bi_private;
198         struct drbd_conf *mdev = req->mdev;
199         struct bio_and_error m;
200         enum drbd_req_event what;
201         int uptodate = bio_flagged(bio, BIO_UPTODATE);
202
203         if (!error && !uptodate) {
204                 dev_warn(DEV, "p %s: setting error to -EIO\n",
205                          bio_data_dir(bio) == WRITE ? "write" : "read");
206                 /* strange behavior of some lower level drivers...
207                  * fail the request by clearing the uptodate flag,
208                  * but do not return any error?! */
209                 error = -EIO;
210         }
211
212         /* to avoid recursion in __req_mod */
213         if (unlikely(error)) {
214                 what = (bio_data_dir(bio) == WRITE)
215                         ? write_completed_with_error
216                         : (bio_rw(bio) == READ)
217                           ? read_completed_with_error
218                           : read_ahead_completed_with_error;
219         } else
220                 what = completed_ok;
221
222         bio_put(req->private_bio);
223         req->private_bio = ERR_PTR(error);
224
225         /* not req_mod(), we need irqsave here! */
226         spin_lock_irqsave(&mdev->req_lock, flags);
227         __req_mod(req, what, &m);
228         spin_unlock_irqrestore(&mdev->req_lock, flags);
229
230         if (m.bio)
231                 complete_master_bio(mdev, &m);
232 }
233
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236         struct drbd_request *req = container_of(w, struct drbd_request, w);
237
238         /* We should not detach for read io-error,
239          * but try to WRITE the P_DATA_REPLY to the failed location,
240          * to give the disk the chance to relocate that block */
241
242         spin_lock_irq(&mdev->req_lock);
243         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244                 _req_mod(req, read_retry_remote_canceled);
245                 spin_unlock_irq(&mdev->req_lock);
246                 return 1;
247         }
248         spin_unlock_irq(&mdev->req_lock);
249
250         return w_send_read_req(mdev, w, 0);
251 }
252
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255         ERR_IF(cancel) return 1;
256         dev_err(DEV, "resync inactive, but callback triggered??\n");
257         return 1; /* Simply ignore this! */
258 }
259
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262         struct hash_desc desc;
263         struct scatterlist sg;
264         struct page *page = e->pages;
265         struct page *tmp;
266         unsigned len;
267
268         desc.tfm = tfm;
269         desc.flags = 0;
270
271         sg_init_table(&sg, 1);
272         crypto_hash_init(&desc);
273
274         while ((tmp = page_chain_next(page))) {
275                 /* all but the last page will be fully used */
276                 sg_set_page(&sg, page, PAGE_SIZE, 0);
277                 crypto_hash_update(&desc, &sg, sg.length);
278                 page = tmp;
279         }
280         /* and now the last, possibly only partially used page */
281         len = e->size & (PAGE_SIZE - 1);
282         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283         crypto_hash_update(&desc, &sg, sg.length);
284         crypto_hash_final(&desc, digest);
285 }
286
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289         struct hash_desc desc;
290         struct scatterlist sg;
291         struct bio_vec *bvec;
292         int i;
293
294         desc.tfm = tfm;
295         desc.flags = 0;
296
297         sg_init_table(&sg, 1);
298         crypto_hash_init(&desc);
299
300         __bio_for_each_segment(bvec, bio, i, 0) {
301                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302                 crypto_hash_update(&desc, &sg, sg.length);
303         }
304         crypto_hash_final(&desc, digest);
305 }
306
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310         int digest_size;
311         void *digest;
312         int ok;
313
314         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315
316         if (unlikely(cancel)) {
317                 drbd_free_ee(mdev, e);
318                 return 1;
319         }
320
321         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323                 digest = kmalloc(digest_size, GFP_NOIO);
324                 if (digest) {
325                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326
327                         inc_rs_pending(mdev);
328                         ok = drbd_send_drequest_csum(mdev,
329                                                      e->sector,
330                                                      e->size,
331                                                      digest,
332                                                      digest_size,
333                                                      P_CSUM_RS_REQUEST);
334                         kfree(digest);
335                 } else {
336                         dev_err(DEV, "kmalloc() of digest failed.\n");
337                         ok = 0;
338                 }
339         } else
340                 ok = 1;
341
342         drbd_free_ee(mdev, e);
343
344         if (unlikely(!ok))
345                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346         return ok;
347 }
348
349 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
350
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353         struct drbd_epoch_entry *e;
354
355         if (!get_ldev(mdev))
356                 return -EIO;
357
358         if (drbd_rs_should_slow_down(mdev))
359                 goto defer;
360
361         /* GFP_TRY, because if there is no memory available right now, this may
362          * be rescheduled for later. It is "only" background resync, after all. */
363         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364         if (!e)
365                 goto defer;
366
367         e->w.cb = w_e_send_csum;
368         spin_lock_irq(&mdev->req_lock);
369         list_add(&e->w.list, &mdev->read_ee);
370         spin_unlock_irq(&mdev->req_lock);
371
372         atomic_add(size >> 9, &mdev->rs_sect_ev);
373         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374                 return 0;
375
376         /* drbd_submit_ee currently fails for one reason only:
377          * not being able to allocate enough bios.
378          * Is dropping the connection going to help? */
379         spin_lock_irq(&mdev->req_lock);
380         list_del(&e->w.list);
381         spin_unlock_irq(&mdev->req_lock);
382
383         drbd_free_ee(mdev, e);
384 defer:
385         put_ldev(mdev);
386         return -EAGAIN;
387 }
388
389 void resync_timer_fn(unsigned long data)
390 {
391         struct drbd_conf *mdev = (struct drbd_conf *) data;
392         int queue;
393
394         queue = 1;
395         switch (mdev->state.conn) {
396         case C_VERIFY_S:
397                 mdev->resync_work.cb = w_make_ov_request;
398                 break;
399         case C_SYNC_TARGET:
400                 mdev->resync_work.cb = w_make_resync_request;
401                 break;
402         default:
403                 queue = 0;
404                 mdev->resync_work.cb = w_resync_inactive;
405         }
406
407         /* harmless race: list_empty outside data.work.q_lock */
408         if (list_empty(&mdev->resync_work.list) && queue)
409                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414         int i;
415
416         for (i = 0; i < fb->size; i++)
417                 fb->values[i] = value;
418 }
419
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422         int ov;
423
424         ov = fb->values[fb->head_index];
425         fb->values[fb->head_index++] = value;
426
427         if (fb->head_index >= fb->size)
428                 fb->head_index = 0;
429
430         return ov;
431 }
432
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435         int i;
436
437         for (i = 0; i < fb->size; i++)
438                 fb->values[i] += value;
439 }
440
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444         unsigned int want;     /* The number of sectors we want in the proxy */
445         int req_sect; /* Number of sectors to request in this turn */
446         int correction; /* Number of sectors more we need in the proxy*/
447         int cps; /* correction per invocation of drbd_rs_controller() */
448         int steps; /* Number of time steps to plan ahead */
449         int curr_corr;
450         int max_sect;
451
452         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453         mdev->rs_in_flight -= sect_in;
454
455         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456
457         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458
459         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461         } else { /* normal path */
462                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464         }
465
466         correction = want - mdev->rs_in_flight - mdev->rs_planed;
467
468         /* Plan ahead */
469         cps = correction / steps;
470         fifo_add_val(&mdev->rs_plan_s, cps);
471         mdev->rs_planed += cps * steps;
472
473         /* What we do in this step */
474         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475         spin_unlock(&mdev->peer_seq_lock);
476         mdev->rs_planed -= curr_corr;
477
478         req_sect = sect_in + curr_corr;
479         if (req_sect < 0)
480                 req_sect = 0;
481
482         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483         if (req_sect > max_sect)
484                 req_sect = max_sect;
485
486         /*
487         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488                  sect_in, mdev->rs_in_flight, want, correction,
489                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
490         */
491
492         return req_sect;
493 }
494
495 int w_make_resync_request(struct drbd_conf *mdev,
496                 struct drbd_work *w, int cancel)
497 {
498         unsigned long bit;
499         sector_t sector;
500         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
501         int max_segment_size;
502         int number, rollback_i, size, pe, mx;
503         int align, queued, sndbuf;
504         int i = 0;
505
506         if (unlikely(cancel))
507                 return 1;
508
509         if (unlikely(mdev->state.conn < C_CONNECTED)) {
510                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
511                 return 0;
512         }
513
514         if (mdev->state.conn != C_SYNC_TARGET)
515                 dev_err(DEV, "%s in w_make_resync_request\n",
516                         drbd_conn_str(mdev->state.conn));
517
518         if (mdev->rs_total == 0) {
519                 /* empty resync? */
520                 drbd_resync_finished(mdev);
521                 return 1;
522         }
523
524         if (!get_ldev(mdev)) {
525                 /* Since we only need to access mdev->rsync a
526                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
527                    to continue resync with a broken disk makes no sense at
528                    all */
529                 dev_err(DEV, "Disk broke down during resync!\n");
530                 mdev->resync_work.cb = w_resync_inactive;
531                 return 1;
532         }
533
534         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
535          * if it should be necessary */
536         max_segment_size =
537                 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
538                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
539
540         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
541                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
542                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
543         } else {
544                 mdev->c_sync_rate = mdev->sync_conf.rate;
545                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
546         }
547
548         /* Throttle resync on lower level disk activity, which may also be
549          * caused by application IO on Primary/SyncTarget.
550          * Keep this after the call to drbd_rs_controller, as that assumes
551          * to be called as precisely as possible every SLEEP_TIME,
552          * and would be confused otherwise. */
553         if (drbd_rs_should_slow_down(mdev))
554                 goto requeue;
555
556         mutex_lock(&mdev->data.mutex);
557         if (mdev->data.socket)
558                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
559         else
560                 mx = 1;
561         mutex_unlock(&mdev->data.mutex);
562
563         /* For resync rates >160MB/sec, allow more pending RS requests */
564         if (number > mx)
565                 mx = number;
566
567         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
568         pe = atomic_read(&mdev->rs_pending_cnt);
569         if ((pe + number) > mx) {
570                 number = mx - pe;
571         }
572
573         for (i = 0; i < number; i++) {
574                 /* Stop generating RS requests, when half of the send buffer is filled */
575                 mutex_lock(&mdev->data.mutex);
576                 if (mdev->data.socket) {
577                         queued = mdev->data.socket->sk->sk_wmem_queued;
578                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
579                 } else {
580                         queued = 1;
581                         sndbuf = 0;
582                 }
583                 mutex_unlock(&mdev->data.mutex);
584                 if (queued > sndbuf / 2)
585                         goto requeue;
586
587 next_sector:
588                 size = BM_BLOCK_SIZE;
589                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
590
591                 if (bit == -1UL) {
592                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
593                         mdev->resync_work.cb = w_resync_inactive;
594                         put_ldev(mdev);
595                         return 1;
596                 }
597
598                 sector = BM_BIT_TO_SECT(bit);
599
600                 if (drbd_try_rs_begin_io(mdev, sector)) {
601                         mdev->bm_resync_fo = bit;
602                         goto requeue;
603                 }
604                 mdev->bm_resync_fo = bit + 1;
605
606                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
607                         drbd_rs_complete_io(mdev, sector);
608                         goto next_sector;
609                 }
610
611 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
612                 /* try to find some adjacent bits.
613                  * we stop if we have already the maximum req size.
614                  *
615                  * Additionally always align bigger requests, in order to
616                  * be prepared for all stripe sizes of software RAIDs.
617                  */
618                 align = 1;
619                 rollback_i = i;
620                 for (;;) {
621                         if (size + BM_BLOCK_SIZE > max_segment_size)
622                                 break;
623
624                         /* Be always aligned */
625                         if (sector & ((1<<(align+3))-1))
626                                 break;
627
628                         /* do not cross extent boundaries */
629                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
630                                 break;
631                         /* now, is it actually dirty, after all?
632                          * caution, drbd_bm_test_bit is tri-state for some
633                          * obscure reason; ( b == 0 ) would get the out-of-band
634                          * only accidentally right because of the "oddly sized"
635                          * adjustment below */
636                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
637                                 break;
638                         bit++;
639                         size += BM_BLOCK_SIZE;
640                         if ((BM_BLOCK_SIZE << align) <= size)
641                                 align++;
642                         i++;
643                 }
644                 /* if we merged some,
645                  * reset the offset to start the next drbd_bm_find_next from */
646                 if (size > BM_BLOCK_SIZE)
647                         mdev->bm_resync_fo = bit + 1;
648 #endif
649
650                 /* adjust very last sectors, in case we are oddly sized */
651                 if (sector + (size>>9) > capacity)
652                         size = (capacity-sector)<<9;
653                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
654                         switch (read_for_csum(mdev, sector, size)) {
655                         case -EIO: /* Disk failure */
656                                 put_ldev(mdev);
657                                 return 0;
658                         case -EAGAIN: /* allocation failed, or ldev busy */
659                                 drbd_rs_complete_io(mdev, sector);
660                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
661                                 i = rollback_i;
662                                 goto requeue;
663                         case 0:
664                                 /* everything ok */
665                                 break;
666                         default:
667                                 BUG();
668                         }
669                 } else {
670                         inc_rs_pending(mdev);
671                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
672                                                sector, size, ID_SYNCER)) {
673                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
674                                 dec_rs_pending(mdev);
675                                 put_ldev(mdev);
676                                 return 0;
677                         }
678                 }
679         }
680
681         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
682                 /* last syncer _request_ was sent,
683                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
684                  * next sync group will resume), as soon as we receive the last
685                  * resync data block, and the last bit is cleared.
686                  * until then resync "work" is "inactive" ...
687                  */
688                 mdev->resync_work.cb = w_resync_inactive;
689                 put_ldev(mdev);
690                 return 1;
691         }
692
693  requeue:
694         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
695         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
696         put_ldev(mdev);
697         return 1;
698 }
699
700 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
701 {
702         int number, i, size;
703         sector_t sector;
704         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
705
706         if (unlikely(cancel))
707                 return 1;
708
709         if (unlikely(mdev->state.conn < C_CONNECTED)) {
710                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
711                 return 0;
712         }
713
714         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
715         if (atomic_read(&mdev->rs_pending_cnt) > number)
716                 goto requeue;
717
718         number -= atomic_read(&mdev->rs_pending_cnt);
719
720         sector = mdev->ov_position;
721         for (i = 0; i < number; i++) {
722                 if (sector >= capacity) {
723                         mdev->resync_work.cb = w_resync_inactive;
724                         return 1;
725                 }
726
727                 size = BM_BLOCK_SIZE;
728
729                 if (drbd_try_rs_begin_io(mdev, sector)) {
730                         mdev->ov_position = sector;
731                         goto requeue;
732                 }
733
734                 if (sector + (size>>9) > capacity)
735                         size = (capacity-sector)<<9;
736
737                 inc_rs_pending(mdev);
738                 if (!drbd_send_ov_request(mdev, sector, size)) {
739                         dec_rs_pending(mdev);
740                         return 0;
741                 }
742                 sector += BM_SECT_PER_BIT;
743         }
744         mdev->ov_position = sector;
745
746  requeue:
747         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
748         return 1;
749 }
750
751
752 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
753 {
754         kfree(w);
755         ov_oos_print(mdev);
756         drbd_resync_finished(mdev);
757
758         return 1;
759 }
760
761 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
762 {
763         kfree(w);
764
765         drbd_resync_finished(mdev);
766
767         return 1;
768 }
769
770 static void ping_peer(struct drbd_conf *mdev)
771 {
772         clear_bit(GOT_PING_ACK, &mdev->flags);
773         request_ping(mdev);
774         wait_event(mdev->misc_wait,
775                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
776 }
777
778 int drbd_resync_finished(struct drbd_conf *mdev)
779 {
780         unsigned long db, dt, dbdt;
781         unsigned long n_oos;
782         union drbd_state os, ns;
783         struct drbd_work *w;
784         char *khelper_cmd = NULL;
785
786         /* Remove all elements from the resync LRU. Since future actions
787          * might set bits in the (main) bitmap, then the entries in the
788          * resync LRU would be wrong. */
789         if (drbd_rs_del_all(mdev)) {
790                 /* In case this is not possible now, most probably because
791                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
792                  * queue (or even the read operations for those packets
793                  * is not finished by now).   Retry in 100ms. */
794
795                 __set_current_state(TASK_INTERRUPTIBLE);
796                 schedule_timeout(HZ / 10);
797                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
798                 if (w) {
799                         w->cb = w_resync_finished;
800                         drbd_queue_work(&mdev->data.work, w);
801                         return 1;
802                 }
803                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
804         }
805
806         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
807         if (dt <= 0)
808                 dt = 1;
809         db = mdev->rs_total;
810         dbdt = Bit2KB(db/dt);
811         mdev->rs_paused /= HZ;
812
813         if (!get_ldev(mdev))
814                 goto out;
815
816         ping_peer(mdev);
817
818         spin_lock_irq(&mdev->req_lock);
819         os = mdev->state;
820
821         /* This protects us against multiple calls (that can happen in the presence
822            of application IO), and against connectivity loss just before we arrive here. */
823         if (os.conn <= C_CONNECTED)
824                 goto out_unlock;
825
826         ns = os;
827         ns.conn = C_CONNECTED;
828
829         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
830              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
831              "Online verify " : "Resync",
832              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
833
834         n_oos = drbd_bm_total_weight(mdev);
835
836         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
837                 if (n_oos) {
838                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
839                               n_oos, Bit2KB(1));
840                         khelper_cmd = "out-of-sync";
841                 }
842         } else {
843                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
844
845                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
846                         khelper_cmd = "after-resync-target";
847
848                 if (mdev->csums_tfm && mdev->rs_total) {
849                         const unsigned long s = mdev->rs_same_csum;
850                         const unsigned long t = mdev->rs_total;
851                         const int ratio =
852                                 (t == 0)     ? 0 :
853                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
854                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
855                              "transferred %luK total %luK\n",
856                              ratio,
857                              Bit2KB(mdev->rs_same_csum),
858                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
859                              Bit2KB(mdev->rs_total));
860                 }
861         }
862
863         if (mdev->rs_failed) {
864                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
865
866                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
867                         ns.disk = D_INCONSISTENT;
868                         ns.pdsk = D_UP_TO_DATE;
869                 } else {
870                         ns.disk = D_UP_TO_DATE;
871                         ns.pdsk = D_INCONSISTENT;
872                 }
873         } else {
874                 ns.disk = D_UP_TO_DATE;
875                 ns.pdsk = D_UP_TO_DATE;
876
877                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
878                         if (mdev->p_uuid) {
879                                 int i;
880                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
881                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
882                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
883                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
884                         } else {
885                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
886                         }
887                 }
888
889                 drbd_uuid_set_bm(mdev, 0UL);
890
891                 if (mdev->p_uuid) {
892                         /* Now the two UUID sets are equal, update what we
893                          * know of the peer. */
894                         int i;
895                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
896                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
897                 }
898         }
899
900         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
901 out_unlock:
902         spin_unlock_irq(&mdev->req_lock);
903         put_ldev(mdev);
904 out:
905         mdev->rs_total  = 0;
906         mdev->rs_failed = 0;
907         mdev->rs_paused = 0;
908         mdev->ov_start_sector = 0;
909
910         drbd_md_sync(mdev);
911
912         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
913                 dev_info(DEV, "Writing the whole bitmap\n");
914                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
915         }
916
917         if (khelper_cmd)
918                 drbd_khelper(mdev, khelper_cmd);
919
920         return 1;
921 }
922
923 /* helper */
924 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
925 {
926         if (drbd_ee_has_active_page(e)) {
927                 /* This might happen if sendpage() has not finished */
928                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
929                 atomic_add(i, &mdev->pp_in_use_by_net);
930                 atomic_sub(i, &mdev->pp_in_use);
931                 spin_lock_irq(&mdev->req_lock);
932                 list_add_tail(&e->w.list, &mdev->net_ee);
933                 spin_unlock_irq(&mdev->req_lock);
934                 wake_up(&drbd_pp_wait);
935         } else
936                 drbd_free_ee(mdev, e);
937 }
938
939 /**
940  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
941  * @mdev:       DRBD device.
942  * @w:          work object.
943  * @cancel:     The connection will be closed anyways
944  */
945 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
946 {
947         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
948         int ok;
949
950         if (unlikely(cancel)) {
951                 drbd_free_ee(mdev, e);
952                 dec_unacked(mdev);
953                 return 1;
954         }
955
956         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
957                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
958         } else {
959                 if (__ratelimit(&drbd_ratelimit_state))
960                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
961                             (unsigned long long)e->sector);
962
963                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
964         }
965
966         dec_unacked(mdev);
967
968         move_to_net_ee_or_free(mdev, e);
969
970         if (unlikely(!ok))
971                 dev_err(DEV, "drbd_send_block() failed\n");
972         return ok;
973 }
974
975 /**
976  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
977  * @mdev:       DRBD device.
978  * @w:          work object.
979  * @cancel:     The connection will be closed anyways
980  */
981 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
982 {
983         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
984         int ok;
985
986         if (unlikely(cancel)) {
987                 drbd_free_ee(mdev, e);
988                 dec_unacked(mdev);
989                 return 1;
990         }
991
992         if (get_ldev_if_state(mdev, D_FAILED)) {
993                 drbd_rs_complete_io(mdev, e->sector);
994                 put_ldev(mdev);
995         }
996
997         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
998                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
999                         inc_rs_pending(mdev);
1000                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1001                 } else {
1002                         if (__ratelimit(&drbd_ratelimit_state))
1003                                 dev_err(DEV, "Not sending RSDataReply, "
1004                                     "partner DISKLESS!\n");
1005                         ok = 1;
1006                 }
1007         } else {
1008                 if (__ratelimit(&drbd_ratelimit_state))
1009                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1010                             (unsigned long long)e->sector);
1011
1012                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1013
1014                 /* update resync data with failure */
1015                 drbd_rs_failed_io(mdev, e->sector, e->size);
1016         }
1017
1018         dec_unacked(mdev);
1019
1020         move_to_net_ee_or_free(mdev, e);
1021
1022         if (unlikely(!ok))
1023                 dev_err(DEV, "drbd_send_block() failed\n");
1024         return ok;
1025 }
1026
1027 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1028 {
1029         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1030         struct digest_info *di;
1031         int digest_size;
1032         void *digest = NULL;
1033         int ok, eq = 0;
1034
1035         if (unlikely(cancel)) {
1036                 drbd_free_ee(mdev, e);
1037                 dec_unacked(mdev);
1038                 return 1;
1039         }
1040
1041         if (get_ldev(mdev)) {
1042                 drbd_rs_complete_io(mdev, e->sector);
1043                 put_ldev(mdev);
1044         }
1045
1046         di = e->digest;
1047
1048         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1049                 /* quick hack to try to avoid a race against reconfiguration.
1050                  * a real fix would be much more involved,
1051                  * introducing more locking mechanisms */
1052                 if (mdev->csums_tfm) {
1053                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1054                         D_ASSERT(digest_size == di->digest_size);
1055                         digest = kmalloc(digest_size, GFP_NOIO);
1056                 }
1057                 if (digest) {
1058                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1059                         eq = !memcmp(digest, di->digest, digest_size);
1060                         kfree(digest);
1061                 }
1062
1063                 if (eq) {
1064                         drbd_set_in_sync(mdev, e->sector, e->size);
1065                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1066                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1067                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1068                 } else {
1069                         inc_rs_pending(mdev);
1070                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1071                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1072                         kfree(di);
1073                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1074                 }
1075         } else {
1076                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1077                 if (__ratelimit(&drbd_ratelimit_state))
1078                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1079         }
1080
1081         dec_unacked(mdev);
1082         move_to_net_ee_or_free(mdev, e);
1083
1084         if (unlikely(!ok))
1085                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1086         return ok;
1087 }
1088
1089 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1090 {
1091         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1092         int digest_size;
1093         void *digest;
1094         int ok = 1;
1095
1096         if (unlikely(cancel))
1097                 goto out;
1098
1099         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1100                 goto out;
1101
1102         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1103         /* FIXME if this allocation fails, online verify will not terminate! */
1104         digest = kmalloc(digest_size, GFP_NOIO);
1105         if (digest) {
1106                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1107                 inc_rs_pending(mdev);
1108                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1109                                              digest, digest_size, P_OV_REPLY);
1110                 if (!ok)
1111                         dec_rs_pending(mdev);
1112                 kfree(digest);
1113         }
1114
1115 out:
1116         drbd_free_ee(mdev, e);
1117
1118         dec_unacked(mdev);
1119
1120         return ok;
1121 }
1122
1123 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1124 {
1125         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1126                 mdev->ov_last_oos_size += size>>9;
1127         } else {
1128                 mdev->ov_last_oos_start = sector;
1129                 mdev->ov_last_oos_size = size>>9;
1130         }
1131         drbd_set_out_of_sync(mdev, sector, size);
1132         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1133 }
1134
1135 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1136 {
1137         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1138         struct digest_info *di;
1139         int digest_size;
1140         void *digest;
1141         int ok, eq = 0;
1142
1143         if (unlikely(cancel)) {
1144                 drbd_free_ee(mdev, e);
1145                 dec_unacked(mdev);
1146                 return 1;
1147         }
1148
1149         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150          * the resync lru has been cleaned up already */
1151         if (get_ldev(mdev)) {
1152                 drbd_rs_complete_io(mdev, e->sector);
1153                 put_ldev(mdev);
1154         }
1155
1156         di = e->digest;
1157
1158         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160                 digest = kmalloc(digest_size, GFP_NOIO);
1161                 if (digest) {
1162                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163
1164                         D_ASSERT(digest_size == di->digest_size);
1165                         eq = !memcmp(digest, di->digest, digest_size);
1166                         kfree(digest);
1167                 }
1168         } else {
1169                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1170                 if (__ratelimit(&drbd_ratelimit_state))
1171                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1172         }
1173
1174         dec_unacked(mdev);
1175         if (!eq)
1176                 drbd_ov_oos_found(mdev, e->sector, e->size);
1177         else
1178                 ov_oos_print(mdev);
1179
1180         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1181                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1182
1183         drbd_free_ee(mdev, e);
1184
1185         if (--mdev->ov_left == 0) {
1186                 ov_oos_print(mdev);
1187                 drbd_resync_finished(mdev);
1188         }
1189
1190         return ok;
1191 }
1192
1193 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1194 {
1195         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1196         complete(&b->done);
1197         return 1;
1198 }
1199
1200 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1201 {
1202         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1203         struct p_barrier *p = &mdev->data.sbuf.barrier;
1204         int ok = 1;
1205
1206         /* really avoid racing with tl_clear.  w.cb may have been referenced
1207          * just before it was reassigned and re-queued, so double check that.
1208          * actually, this race was harmless, since we only try to send the
1209          * barrier packet here, and otherwise do nothing with the object.
1210          * but compare with the head of w_clear_epoch */
1211         spin_lock_irq(&mdev->req_lock);
1212         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1213                 cancel = 1;
1214         spin_unlock_irq(&mdev->req_lock);
1215         if (cancel)
1216                 return 1;
1217
1218         if (!drbd_get_data_sock(mdev))
1219                 return 0;
1220         p->barrier = b->br_number;
1221         /* inc_ap_pending was done where this was queued.
1222          * dec_ap_pending will be done in got_BarrierAck
1223          * or (on connection loss) in w_clear_epoch.  */
1224         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1225                                 (struct p_header80 *)p, sizeof(*p), 0);
1226         drbd_put_data_sock(mdev);
1227
1228         return ok;
1229 }
1230
1231 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1232 {
1233         if (cancel)
1234                 return 1;
1235         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1236 }
1237
1238 /**
1239  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1240  * @mdev:       DRBD device.
1241  * @w:          work object.
1242  * @cancel:     The connection will be closed anyways
1243  */
1244 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1245 {
1246         struct drbd_request *req = container_of(w, struct drbd_request, w);
1247         int ok;
1248
1249         if (unlikely(cancel)) {
1250                 req_mod(req, send_canceled);
1251                 return 1;
1252         }
1253
1254         ok = drbd_send_dblock(mdev, req);
1255         req_mod(req, ok ? handed_over_to_network : send_failed);
1256
1257         return ok;
1258 }
1259
1260 /**
1261  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1262  * @mdev:       DRBD device.
1263  * @w:          work object.
1264  * @cancel:     The connection will be closed anyways
1265  */
1266 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1267 {
1268         struct drbd_request *req = container_of(w, struct drbd_request, w);
1269         int ok;
1270
1271         if (unlikely(cancel)) {
1272                 req_mod(req, send_canceled);
1273                 return 1;
1274         }
1275
1276         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1277                                 (unsigned long)req);
1278
1279         if (!ok) {
1280                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1281                  * so this is probably redundant */
1282                 if (mdev->state.conn >= C_CONNECTED)
1283                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1284         }
1285         req_mod(req, ok ? handed_over_to_network : send_failed);
1286
1287         return ok;
1288 }
1289
1290 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1291 {
1292         struct drbd_request *req = container_of(w, struct drbd_request, w);
1293
1294         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1295                 drbd_al_begin_io(mdev, req->sector);
1296         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1297            theoretically. Practically it can not deadlock, since this is
1298            only used when unfreezing IOs. All the extents of the requests
1299            that made it into the TL are already active */
1300
1301         drbd_req_make_private_bio(req, req->master_bio);
1302         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1303         generic_make_request(req->private_bio);
1304
1305         return 1;
1306 }
1307
1308 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1309 {
1310         struct drbd_conf *odev = mdev;
1311
1312         while (1) {
1313                 if (odev->sync_conf.after == -1)
1314                         return 1;
1315                 odev = minor_to_mdev(odev->sync_conf.after);
1316                 ERR_IF(!odev) return 1;
1317                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1318                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1319                     odev->state.aftr_isp || odev->state.peer_isp ||
1320                     odev->state.user_isp)
1321                         return 0;
1322         }
1323 }
1324
1325 /**
1326  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1327  * @mdev:       DRBD device.
1328  *
1329  * Called from process context only (admin command and after_state_ch).
1330  */
1331 static int _drbd_pause_after(struct drbd_conf *mdev)
1332 {
1333         struct drbd_conf *odev;
1334         int i, rv = 0;
1335
1336         for (i = 0; i < minor_count; i++) {
1337                 odev = minor_to_mdev(i);
1338                 if (!odev)
1339                         continue;
1340                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1341                         continue;
1342                 if (!_drbd_may_sync_now(odev))
1343                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1344                                != SS_NOTHING_TO_DO);
1345         }
1346
1347         return rv;
1348 }
1349
1350 /**
1351  * _drbd_resume_next() - Resume resync on all devices that may resync now
1352  * @mdev:       DRBD device.
1353  *
1354  * Called from process context only (admin command and worker).
1355  */
1356 static int _drbd_resume_next(struct drbd_conf *mdev)
1357 {
1358         struct drbd_conf *odev;
1359         int i, rv = 0;
1360
1361         for (i = 0; i < minor_count; i++) {
1362                 odev = minor_to_mdev(i);
1363                 if (!odev)
1364                         continue;
1365                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1366                         continue;
1367                 if (odev->state.aftr_isp) {
1368                         if (_drbd_may_sync_now(odev))
1369                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1370                                                         CS_HARD, NULL)
1371                                        != SS_NOTHING_TO_DO) ;
1372                 }
1373         }
1374         return rv;
1375 }
1376
1377 void resume_next_sg(struct drbd_conf *mdev)
1378 {
1379         write_lock_irq(&global_state_lock);
1380         _drbd_resume_next(mdev);
1381         write_unlock_irq(&global_state_lock);
1382 }
1383
1384 void suspend_other_sg(struct drbd_conf *mdev)
1385 {
1386         write_lock_irq(&global_state_lock);
1387         _drbd_pause_after(mdev);
1388         write_unlock_irq(&global_state_lock);
1389 }
1390
1391 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1392 {
1393         struct drbd_conf *odev;
1394
1395         if (o_minor == -1)
1396                 return NO_ERROR;
1397         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1398                 return ERR_SYNC_AFTER;
1399
1400         /* check for loops */
1401         odev = minor_to_mdev(o_minor);
1402         while (1) {
1403                 if (odev == mdev)
1404                         return ERR_SYNC_AFTER_CYCLE;
1405
1406                 /* dependency chain ends here, no cycles. */
1407                 if (odev->sync_conf.after == -1)
1408                         return NO_ERROR;
1409
1410                 /* follow the dependency chain */
1411                 odev = minor_to_mdev(odev->sync_conf.after);
1412         }
1413 }
1414
1415 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1416 {
1417         int changes;
1418         int retcode;
1419
1420         write_lock_irq(&global_state_lock);
1421         retcode = sync_after_error(mdev, na);
1422         if (retcode == NO_ERROR) {
1423                 mdev->sync_conf.after = na;
1424                 do {
1425                         changes  = _drbd_pause_after(mdev);
1426                         changes |= _drbd_resume_next(mdev);
1427                 } while (changes);
1428         }
1429         write_unlock_irq(&global_state_lock);
1430         return retcode;
1431 }
1432
1433 /**
1434  * drbd_start_resync() - Start the resync process
1435  * @mdev:       DRBD device.
1436  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1437  *
1438  * This function might bring you directly into one of the
1439  * C_PAUSED_SYNC_* states.
1440  */
1441 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1442 {
1443         union drbd_state ns;
1444         int r;
1445
1446         if (mdev->state.conn >= C_SYNC_SOURCE) {
1447                 dev_err(DEV, "Resync already running!\n");
1448                 return;
1449         }
1450
1451         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1452         drbd_rs_cancel_all(mdev);
1453
1454         if (side == C_SYNC_TARGET) {
1455                 /* Since application IO was locked out during C_WF_BITMAP_T and
1456                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1457                    we check that we might make the data inconsistent. */
1458                 r = drbd_khelper(mdev, "before-resync-target");
1459                 r = (r >> 8) & 0xff;
1460                 if (r > 0) {
1461                         dev_info(DEV, "before-resync-target handler returned %d, "
1462                              "dropping connection.\n", r);
1463                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1464                         return;
1465                 }
1466         }
1467
1468         drbd_state_lock(mdev);
1469
1470         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1471                 drbd_state_unlock(mdev);
1472                 return;
1473         }
1474
1475         if (side == C_SYNC_TARGET) {
1476                 mdev->bm_resync_fo = 0;
1477         } else /* side == C_SYNC_SOURCE */ {
1478                 u64 uuid;
1479
1480                 get_random_bytes(&uuid, sizeof(u64));
1481                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1482                 drbd_send_sync_uuid(mdev, uuid);
1483
1484                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1485         }
1486
1487         write_lock_irq(&global_state_lock);
1488         ns = mdev->state;
1489
1490         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1491
1492         ns.conn = side;
1493
1494         if (side == C_SYNC_TARGET)
1495                 ns.disk = D_INCONSISTENT;
1496         else /* side == C_SYNC_SOURCE */
1497                 ns.pdsk = D_INCONSISTENT;
1498
1499         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1500         ns = mdev->state;
1501
1502         if (ns.conn < C_CONNECTED)
1503                 r = SS_UNKNOWN_ERROR;
1504
1505         if (r == SS_SUCCESS) {
1506                 unsigned long tw = drbd_bm_total_weight(mdev);
1507                 unsigned long now = jiffies;
1508                 int i;
1509
1510                 mdev->rs_failed    = 0;
1511                 mdev->rs_paused    = 0;
1512                 mdev->rs_same_csum = 0;
1513                 mdev->rs_last_events = 0;
1514                 mdev->rs_last_sect_ev = 0;
1515                 mdev->rs_total     = tw;
1516                 mdev->rs_start     = now;
1517                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1518                         mdev->rs_mark_left[i] = tw;
1519                         mdev->rs_mark_time[i] = now;
1520                 }
1521                 _drbd_pause_after(mdev);
1522         }
1523         write_unlock_irq(&global_state_lock);
1524         put_ldev(mdev);
1525
1526         if (r == SS_SUCCESS) {
1527                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1528                      drbd_conn_str(ns.conn),
1529                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1530                      (unsigned long) mdev->rs_total);
1531
1532                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1533                         /* This still has a race (about when exactly the peers
1534                          * detect connection loss) that can lead to a full sync
1535                          * on next handshake. In 8.3.9 we fixed this with explicit
1536                          * resync-finished notifications, but the fix
1537                          * introduces a protocol change.  Sleeping for some
1538                          * time longer than the ping interval + timeout on the
1539                          * SyncSource, to give the SyncTarget the chance to
1540                          * detect connection loss, then waiting for a ping
1541                          * response (implicit in drbd_resync_finished) reduces
1542                          * the race considerably, but does not solve it. */
1543                         if (side == C_SYNC_SOURCE)
1544                                 schedule_timeout_interruptible(
1545                                         mdev->net_conf->ping_int * HZ +
1546                                         mdev->net_conf->ping_timeo*HZ/9);
1547                         drbd_resync_finished(mdev);
1548                 }
1549
1550                 atomic_set(&mdev->rs_sect_in, 0);
1551                 atomic_set(&mdev->rs_sect_ev, 0);
1552                 mdev->rs_in_flight = 0;
1553                 mdev->rs_planed = 0;
1554                 spin_lock(&mdev->peer_seq_lock);
1555                 fifo_set(&mdev->rs_plan_s, 0);
1556                 spin_unlock(&mdev->peer_seq_lock);
1557                 /* ns.conn may already be != mdev->state.conn,
1558                  * we may have been paused in between, or become paused until
1559                  * the timer triggers.
1560                  * No matter, that is handled in resync_timer_fn() */
1561                 if (ns.conn == C_SYNC_TARGET)
1562                         mod_timer(&mdev->resync_timer, jiffies);
1563
1564                 drbd_md_sync(mdev);
1565         }
1566         drbd_state_unlock(mdev);
1567 }
1568
1569 int drbd_worker(struct drbd_thread *thi)
1570 {
1571         struct drbd_conf *mdev = thi->mdev;
1572         struct drbd_work *w = NULL;
1573         LIST_HEAD(work_list);
1574         int intr = 0, i;
1575
1576         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1577
1578         while (get_t_state(thi) == Running) {
1579                 drbd_thread_current_set_cpu(mdev);
1580
1581                 if (down_trylock(&mdev->data.work.s)) {
1582                         mutex_lock(&mdev->data.mutex);
1583                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1584                                 drbd_tcp_uncork(mdev->data.socket);
1585                         mutex_unlock(&mdev->data.mutex);
1586
1587                         intr = down_interruptible(&mdev->data.work.s);
1588
1589                         mutex_lock(&mdev->data.mutex);
1590                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1591                                 drbd_tcp_cork(mdev->data.socket);
1592                         mutex_unlock(&mdev->data.mutex);
1593                 }
1594
1595                 if (intr) {
1596                         D_ASSERT(intr == -EINTR);
1597                         flush_signals(current);
1598                         ERR_IF (get_t_state(thi) == Running)
1599                                 continue;
1600                         break;
1601                 }
1602
1603                 if (get_t_state(thi) != Running)
1604                         break;
1605                 /* With this break, we have done a down() but not consumed
1606                    the entry from the list. The cleanup code takes care of
1607                    this...   */
1608
1609                 w = NULL;
1610                 spin_lock_irq(&mdev->data.work.q_lock);
1611                 ERR_IF(list_empty(&mdev->data.work.q)) {
1612                         /* something terribly wrong in our logic.
1613                          * we were able to down() the semaphore,
1614                          * but the list is empty... doh.
1615                          *
1616                          * what is the best thing to do now?
1617                          * try again from scratch, restarting the receiver,
1618                          * asender, whatnot? could break even more ugly,
1619                          * e.g. when we are primary, but no good local data.
1620                          *
1621                          * I'll try to get away just starting over this loop.
1622                          */
1623                         spin_unlock_irq(&mdev->data.work.q_lock);
1624                         continue;
1625                 }
1626                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1627                 list_del_init(&w->list);
1628                 spin_unlock_irq(&mdev->data.work.q_lock);
1629
1630                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1631                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1632                         if (mdev->state.conn >= C_CONNECTED)
1633                                 drbd_force_state(mdev,
1634                                                 NS(conn, C_NETWORK_FAILURE));
1635                 }
1636         }
1637         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1638         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1639
1640         spin_lock_irq(&mdev->data.work.q_lock);
1641         i = 0;
1642         while (!list_empty(&mdev->data.work.q)) {
1643                 list_splice_init(&mdev->data.work.q, &work_list);
1644                 spin_unlock_irq(&mdev->data.work.q_lock);
1645
1646                 while (!list_empty(&work_list)) {
1647                         w = list_entry(work_list.next, struct drbd_work, list);
1648                         list_del_init(&w->list);
1649                         w->cb(mdev, w, 1);
1650                         i++; /* dead debugging code */
1651                 }
1652
1653                 spin_lock_irq(&mdev->data.work.q_lock);
1654         }
1655         sema_init(&mdev->data.work.s, 0);
1656         /* DANGEROUS race: if someone did queue his work within the spinlock,
1657          * but up() ed outside the spinlock, we could get an up() on the
1658          * semaphore without corresponding list entry.
1659          * So don't do that.
1660          */
1661         spin_unlock_irq(&mdev->data.work.q_lock);
1662
1663         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1664         /* _drbd_set_state only uses stop_nowait.
1665          * wait here for the Exiting receiver. */
1666         drbd_thread_stop(&mdev->receiver);
1667         drbd_mdev_cleanup(mdev);
1668
1669         dev_info(DEV, "worker terminated\n");
1670
1671         clear_bit(DEVICE_DYING, &mdev->flags);
1672         clear_bit(CONFIG_PENDING, &mdev->flags);
1673         wake_up(&mdev->state_wait);
1674
1675         return 0;
1676 }