90050ab7adf385d08c3a50d71d7893ac0c59435e
[pandora-kernel.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77                            union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84               "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
89 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
90
91 #include <linux/moduleparam.h>
92 /* allow_open_on_secondary */
93 MODULE_PARM_DESC(allow_oos, "DONT USE!");
94 /* thanks to these macros, if compiled into the kernel (not-module),
95  * this becomes the boot parameter drbd.minor_count */
96 module_param(minor_count, uint, 0444);
97 module_param(disable_sendpage, bool, 0644);
98 module_param(allow_oos, bool, 0);
99 module_param(cn_idx, uint, 0444);
100 module_param(proc_details, int, 0644);
101
102 #ifdef CONFIG_DRBD_FAULT_INJECTION
103 int enable_faults;
104 int fault_rate;
105 static int fault_count;
106 int fault_devs;
107 /* bitmap of enabled faults */
108 module_param(enable_faults, int, 0664);
109 /* fault rate % value - applies to all enabled faults */
110 module_param(fault_rate, int, 0664);
111 /* count of faults inserted */
112 module_param(fault_count, int, 0664);
113 /* bitmap of devices to insert faults on */
114 module_param(fault_devs, int, 0644);
115 #endif
116
117 /* module parameter, defined */
118 unsigned int minor_count = 32;
119 int disable_sendpage;
120 int allow_oos;
121 unsigned int cn_idx = CN_IDX_DRBD;
122 int proc_details;       /* Detail level in proc drbd*/
123
124 /* Module parameter for setting the user mode helper program
125  * to run. Default is /sbin/drbdadm */
126 char usermode_helper[80] = "/sbin/drbdadm";
127
128 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
129
130 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
131  * as member "struct gendisk *vdisk;"
132  */
133 struct drbd_conf **minor_table;
134
135 struct kmem_cache *drbd_request_cache;
136 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
137 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
138 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
139 mempool_t *drbd_request_mempool;
140 mempool_t *drbd_ee_mempool;
141
142 /* I do not use a standard mempool, because:
143    1) I want to hand out the pre-allocated objects first.
144    2) I want to be able to interrupt sleeping allocation with a signal.
145    Note: This is a single linked list, the next pointer is the private
146          member of struct page.
147  */
148 struct page *drbd_pp_pool;
149 spinlock_t   drbd_pp_lock;
150 int          drbd_pp_vacant;
151 wait_queue_head_t drbd_pp_wait;
152
153 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
154
155 static const struct block_device_operations drbd_ops = {
156         .owner =   THIS_MODULE,
157         .open =    drbd_open,
158         .release = drbd_release,
159 };
160
161 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
162
163 #ifdef __CHECKER__
164 /* When checking with sparse, and this is an inline function, sparse will
165    give tons of false positives. When this is a real functions sparse works.
166  */
167 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
168 {
169         int io_allowed;
170
171         atomic_inc(&mdev->local_cnt);
172         io_allowed = (mdev->state.disk >= mins);
173         if (!io_allowed) {
174                 if (atomic_dec_and_test(&mdev->local_cnt))
175                         wake_up(&mdev->misc_wait);
176         }
177         return io_allowed;
178 }
179
180 #endif
181
182 /**
183  * DOC: The transfer log
184  *
185  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
186  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
187  * of the list. There is always at least one &struct drbd_tl_epoch object.
188  *
189  * Each &struct drbd_tl_epoch has a circular double linked list of requests
190  * attached.
191  */
192 static int tl_init(struct drbd_conf *mdev)
193 {
194         struct drbd_tl_epoch *b;
195
196         /* during device minor initialization, we may well use GFP_KERNEL */
197         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
198         if (!b)
199                 return 0;
200         INIT_LIST_HEAD(&b->requests);
201         INIT_LIST_HEAD(&b->w.list);
202         b->next = NULL;
203         b->br_number = 4711;
204         b->n_writes = 0;
205         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
206
207         mdev->oldest_tle = b;
208         mdev->newest_tle = b;
209         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
210
211         mdev->tl_hash = NULL;
212         mdev->tl_hash_s = 0;
213
214         return 1;
215 }
216
217 static void tl_cleanup(struct drbd_conf *mdev)
218 {
219         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
220         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
221         kfree(mdev->oldest_tle);
222         mdev->oldest_tle = NULL;
223         kfree(mdev->unused_spare_tle);
224         mdev->unused_spare_tle = NULL;
225         kfree(mdev->tl_hash);
226         mdev->tl_hash = NULL;
227         mdev->tl_hash_s = 0;
228 }
229
230 /**
231  * _tl_add_barrier() - Adds a barrier to the transfer log
232  * @mdev:       DRBD device.
233  * @new:        Barrier to be added before the current head of the TL.
234  *
235  * The caller must hold the req_lock.
236  */
237 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
238 {
239         struct drbd_tl_epoch *newest_before;
240
241         INIT_LIST_HEAD(&new->requests);
242         INIT_LIST_HEAD(&new->w.list);
243         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
244         new->next = NULL;
245         new->n_writes = 0;
246
247         newest_before = mdev->newest_tle;
248         /* never send a barrier number == 0, because that is special-cased
249          * when using TCQ for our write ordering code */
250         new->br_number = (newest_before->br_number+1) ?: 1;
251         if (mdev->newest_tle != new) {
252                 mdev->newest_tle->next = new;
253                 mdev->newest_tle = new;
254         }
255 }
256
257 /**
258  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
259  * @mdev:       DRBD device.
260  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
261  * @set_size:   Expected number of requests before that barrier.
262  *
263  * In case the passed barrier_nr or set_size does not match the oldest
264  * &struct drbd_tl_epoch objects this function will cause a termination
265  * of the connection.
266  */
267 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
268                        unsigned int set_size)
269 {
270         struct drbd_tl_epoch *b, *nob; /* next old barrier */
271         struct list_head *le, *tle;
272         struct drbd_request *r;
273
274         spin_lock_irq(&mdev->req_lock);
275
276         b = mdev->oldest_tle;
277
278         /* first some paranoia code */
279         if (b == NULL) {
280                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
281                         barrier_nr);
282                 goto bail;
283         }
284         if (b->br_number != barrier_nr) {
285                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
286                         barrier_nr, b->br_number);
287                 goto bail;
288         }
289         if (b->n_writes != set_size) {
290                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
291                         barrier_nr, set_size, b->n_writes);
292                 goto bail;
293         }
294
295         /* Clean up list of requests processed during current epoch */
296         list_for_each_safe(le, tle, &b->requests) {
297                 r = list_entry(le, struct drbd_request, tl_requests);
298                 _req_mod(r, barrier_acked);
299         }
300         /* There could be requests on the list waiting for completion
301            of the write to the local disk. To avoid corruptions of
302            slab's data structures we have to remove the lists head.
303
304            Also there could have been a barrier ack out of sequence, overtaking
305            the write acks - which would be a bug and violating write ordering.
306            To not deadlock in case we lose connection while such requests are
307            still pending, we need some way to find them for the
308            _req_mode(connection_lost_while_pending).
309
310            These have been list_move'd to the out_of_sequence_requests list in
311            _req_mod(, barrier_acked) above.
312            */
313         list_del_init(&b->requests);
314
315         nob = b->next;
316         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
317                 _tl_add_barrier(mdev, b);
318                 if (nob)
319                         mdev->oldest_tle = nob;
320                 /* if nob == NULL b was the only barrier, and becomes the new
321                    barrier. Therefore mdev->oldest_tle points already to b */
322         } else {
323                 D_ASSERT(nob != NULL);
324                 mdev->oldest_tle = nob;
325                 kfree(b);
326         }
327
328         spin_unlock_irq(&mdev->req_lock);
329         dec_ap_pending(mdev);
330
331         return;
332
333 bail:
334         spin_unlock_irq(&mdev->req_lock);
335         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
336 }
337
338
339 /* In C_AHEAD mode only out_of_sync packets are sent for requests. Detach
340  * those requests from the newsest barrier when changing to an other cstate.
341  *
342  * That headless list vanishes when the last request finished its write or
343  * send out_of_sync packet.  */
344 static void tl_forget(struct drbd_conf *mdev)
345 {
346         struct drbd_tl_epoch *b;
347
348         if (test_bit(CREATE_BARRIER, &mdev->flags))
349                 return;
350
351         b = mdev->newest_tle;
352         list_del(&b->requests);
353         _tl_add_barrier(mdev, b);
354 }
355
356 /**
357  * _tl_restart() - Walks the transfer log, and applies an action to all requests
358  * @mdev:       DRBD device.
359  * @what:       The action/event to perform with all request objects
360  *
361  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
362  * restart_frozen_disk_io.
363  */
364 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
365 {
366         struct drbd_tl_epoch *b, *tmp, **pn;
367         struct list_head *le, *tle, carry_reads;
368         struct drbd_request *req;
369         int rv, n_writes, n_reads;
370
371         b = mdev->oldest_tle;
372         pn = &mdev->oldest_tle;
373         while (b) {
374                 n_writes = 0;
375                 n_reads = 0;
376                 INIT_LIST_HEAD(&carry_reads);
377                 list_for_each_safe(le, tle, &b->requests) {
378                         req = list_entry(le, struct drbd_request, tl_requests);
379                         rv = _req_mod(req, what);
380
381                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
382                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
383                 }
384                 tmp = b->next;
385
386                 if (n_writes) {
387                         if (what == resend) {
388                                 b->n_writes = n_writes;
389                                 if (b->w.cb == NULL) {
390                                         b->w.cb = w_send_barrier;
391                                         inc_ap_pending(mdev);
392                                         set_bit(CREATE_BARRIER, &mdev->flags);
393                                 }
394
395                                 drbd_queue_work(&mdev->data.work, &b->w);
396                         }
397                         pn = &b->next;
398                 } else {
399                         if (n_reads)
400                                 list_add(&carry_reads, &b->requests);
401                         /* there could still be requests on that ring list,
402                          * in case local io is still pending */
403                         list_del(&b->requests);
404
405                         /* dec_ap_pending corresponding to queue_barrier.
406                          * the newest barrier may not have been queued yet,
407                          * in which case w.cb is still NULL. */
408                         if (b->w.cb != NULL)
409                                 dec_ap_pending(mdev);
410
411                         if (b == mdev->newest_tle) {
412                                 /* recycle, but reinit! */
413                                 D_ASSERT(tmp == NULL);
414                                 INIT_LIST_HEAD(&b->requests);
415                                 list_splice(&carry_reads, &b->requests);
416                                 INIT_LIST_HEAD(&b->w.list);
417                                 b->w.cb = NULL;
418                                 b->br_number = net_random();
419                                 b->n_writes = 0;
420
421                                 *pn = b;
422                                 break;
423                         }
424                         *pn = tmp;
425                         kfree(b);
426                 }
427                 b = tmp;
428                 list_splice(&carry_reads, &b->requests);
429         }
430 }
431
432
433 /**
434  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
435  * @mdev:       DRBD device.
436  *
437  * This is called after the connection to the peer was lost. The storage covered
438  * by the requests on the transfer gets marked as our of sync. Called from the
439  * receiver thread and the worker thread.
440  */
441 void tl_clear(struct drbd_conf *mdev)
442 {
443         struct list_head *le, *tle;
444         struct drbd_request *r;
445
446         spin_lock_irq(&mdev->req_lock);
447
448         _tl_restart(mdev, connection_lost_while_pending);
449
450         /* we expect this list to be empty. */
451         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
452
453         /* but just in case, clean it up anyways! */
454         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
455                 r = list_entry(le, struct drbd_request, tl_requests);
456                 /* It would be nice to complete outside of spinlock.
457                  * But this is easier for now. */
458                 _req_mod(r, connection_lost_while_pending);
459         }
460
461         /* ensure bit indicating barrier is required is clear */
462         clear_bit(CREATE_BARRIER, &mdev->flags);
463
464         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
465
466         spin_unlock_irq(&mdev->req_lock);
467 }
468
469 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
470 {
471         spin_lock_irq(&mdev->req_lock);
472         _tl_restart(mdev, what);
473         spin_unlock_irq(&mdev->req_lock);
474 }
475
476 /**
477  * cl_wide_st_chg() - true if the state change is a cluster wide one
478  * @mdev:       DRBD device.
479  * @os:         old (current) state.
480  * @ns:         new (wanted) state.
481  */
482 static int cl_wide_st_chg(struct drbd_conf *mdev,
483                           union drbd_state os, union drbd_state ns)
484 {
485         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
486                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
487                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
488                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
489                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
490                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
491                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
492 }
493
494 enum drbd_state_rv
495 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
496                   union drbd_state mask, union drbd_state val)
497 {
498         unsigned long flags;
499         union drbd_state os, ns;
500         enum drbd_state_rv rv;
501
502         spin_lock_irqsave(&mdev->req_lock, flags);
503         os = mdev->state;
504         ns.i = (os.i & ~mask.i) | val.i;
505         rv = _drbd_set_state(mdev, ns, f, NULL);
506         ns = mdev->state;
507         spin_unlock_irqrestore(&mdev->req_lock, flags);
508
509         return rv;
510 }
511
512 /**
513  * drbd_force_state() - Impose a change which happens outside our control on our state
514  * @mdev:       DRBD device.
515  * @mask:       mask of state bits to change.
516  * @val:        value of new state bits.
517  */
518 void drbd_force_state(struct drbd_conf *mdev,
519         union drbd_state mask, union drbd_state val)
520 {
521         drbd_change_state(mdev, CS_HARD, mask, val);
522 }
523
524 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
525 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
526                                                     union drbd_state,
527                                                     union drbd_state);
528 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
529                                        union drbd_state ns, const char **warn_sync_abort);
530 int drbd_send_state_req(struct drbd_conf *,
531                         union drbd_state, union drbd_state);
532
533 static enum drbd_state_rv
534 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
535              union drbd_state val)
536 {
537         union drbd_state os, ns;
538         unsigned long flags;
539         enum drbd_state_rv rv;
540
541         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
542                 return SS_CW_SUCCESS;
543
544         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
545                 return SS_CW_FAILED_BY_PEER;
546
547         rv = 0;
548         spin_lock_irqsave(&mdev->req_lock, flags);
549         os = mdev->state;
550         ns.i = (os.i & ~mask.i) | val.i;
551         ns = sanitize_state(mdev, os, ns, NULL);
552
553         if (!cl_wide_st_chg(mdev, os, ns))
554                 rv = SS_CW_NO_NEED;
555         if (!rv) {
556                 rv = is_valid_state(mdev, ns);
557                 if (rv == SS_SUCCESS) {
558                         rv = is_valid_state_transition(mdev, ns, os);
559                         if (rv == SS_SUCCESS)
560                                 rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
561                 }
562         }
563         spin_unlock_irqrestore(&mdev->req_lock, flags);
564
565         return rv;
566 }
567
568 /**
569  * drbd_req_state() - Perform an eventually cluster wide state change
570  * @mdev:       DRBD device.
571  * @mask:       mask of state bits to change.
572  * @val:        value of new state bits.
573  * @f:          flags
574  *
575  * Should not be called directly, use drbd_request_state() or
576  * _drbd_request_state().
577  */
578 static enum drbd_state_rv
579 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
580                union drbd_state val, enum chg_state_flags f)
581 {
582         struct completion done;
583         unsigned long flags;
584         union drbd_state os, ns;
585         enum drbd_state_rv rv;
586
587         init_completion(&done);
588
589         if (f & CS_SERIALIZE)
590                 mutex_lock(&mdev->state_mutex);
591
592         spin_lock_irqsave(&mdev->req_lock, flags);
593         os = mdev->state;
594         ns.i = (os.i & ~mask.i) | val.i;
595         ns = sanitize_state(mdev, os, ns, NULL);
596
597         if (cl_wide_st_chg(mdev, os, ns)) {
598                 rv = is_valid_state(mdev, ns);
599                 if (rv == SS_SUCCESS)
600                         rv = is_valid_state_transition(mdev, ns, os);
601                 spin_unlock_irqrestore(&mdev->req_lock, flags);
602
603                 if (rv < SS_SUCCESS) {
604                         if (f & CS_VERBOSE)
605                                 print_st_err(mdev, os, ns, rv);
606                         goto abort;
607                 }
608
609                 drbd_state_lock(mdev);
610                 if (!drbd_send_state_req(mdev, mask, val)) {
611                         drbd_state_unlock(mdev);
612                         rv = SS_CW_FAILED_BY_PEER;
613                         if (f & CS_VERBOSE)
614                                 print_st_err(mdev, os, ns, rv);
615                         goto abort;
616                 }
617
618                 wait_event(mdev->state_wait,
619                         (rv = _req_st_cond(mdev, mask, val)));
620
621                 if (rv < SS_SUCCESS) {
622                         drbd_state_unlock(mdev);
623                         if (f & CS_VERBOSE)
624                                 print_st_err(mdev, os, ns, rv);
625                         goto abort;
626                 }
627                 spin_lock_irqsave(&mdev->req_lock, flags);
628                 os = mdev->state;
629                 ns.i = (os.i & ~mask.i) | val.i;
630                 rv = _drbd_set_state(mdev, ns, f, &done);
631                 drbd_state_unlock(mdev);
632         } else {
633                 rv = _drbd_set_state(mdev, ns, f, &done);
634         }
635
636         spin_unlock_irqrestore(&mdev->req_lock, flags);
637
638         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
639                 D_ASSERT(current != mdev->worker.task);
640                 wait_for_completion(&done);
641         }
642
643 abort:
644         if (f & CS_SERIALIZE)
645                 mutex_unlock(&mdev->state_mutex);
646
647         return rv;
648 }
649
650 /**
651  * _drbd_request_state() - Request a state change (with flags)
652  * @mdev:       DRBD device.
653  * @mask:       mask of state bits to change.
654  * @val:        value of new state bits.
655  * @f:          flags
656  *
657  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
658  * flag, or when logging of failed state change requests is not desired.
659  */
660 enum drbd_state_rv
661 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
662                     union drbd_state val, enum chg_state_flags f)
663 {
664         enum drbd_state_rv rv;
665
666         wait_event(mdev->state_wait,
667                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
668
669         return rv;
670 }
671
672 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
673 {
674         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
675             name,
676             drbd_conn_str(ns.conn),
677             drbd_role_str(ns.role),
678             drbd_role_str(ns.peer),
679             drbd_disk_str(ns.disk),
680             drbd_disk_str(ns.pdsk),
681             is_susp(ns) ? 's' : 'r',
682             ns.aftr_isp ? 'a' : '-',
683             ns.peer_isp ? 'p' : '-',
684             ns.user_isp ? 'u' : '-'
685             );
686 }
687
688 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
689                   union drbd_state ns, enum drbd_state_rv err)
690 {
691         if (err == SS_IN_TRANSIENT_STATE)
692                 return;
693         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
694         print_st(mdev, " state", os);
695         print_st(mdev, "wanted", ns);
696 }
697
698
699 /**
700  * is_valid_state() - Returns an SS_ error code if ns is not valid
701  * @mdev:       DRBD device.
702  * @ns:         State to consider.
703  */
704 static enum drbd_state_rv
705 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
706 {
707         /* See drbd_state_sw_errors in drbd_strings.c */
708
709         enum drbd_fencing_p fp;
710         enum drbd_state_rv rv = SS_SUCCESS;
711
712         fp = FP_DONT_CARE;
713         if (get_ldev(mdev)) {
714                 fp = mdev->ldev->dc.fencing;
715                 put_ldev(mdev);
716         }
717
718         if (get_net_conf(mdev)) {
719                 if (!mdev->net_conf->two_primaries &&
720                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
721                         rv = SS_TWO_PRIMARIES;
722                 put_net_conf(mdev);
723         }
724
725         if (rv <= 0)
726                 /* already found a reason to abort */;
727         else if (ns.role == R_SECONDARY && mdev->open_cnt)
728                 rv = SS_DEVICE_IN_USE;
729
730         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
731                 rv = SS_NO_UP_TO_DATE_DISK;
732
733         else if (fp >= FP_RESOURCE &&
734                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
735                 rv = SS_PRIMARY_NOP;
736
737         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
738                 rv = SS_NO_UP_TO_DATE_DISK;
739
740         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
741                 rv = SS_NO_LOCAL_DISK;
742
743         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
744                 rv = SS_NO_REMOTE_DISK;
745
746         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
747                 rv = SS_NO_UP_TO_DATE_DISK;
748
749         else if ((ns.conn == C_CONNECTED ||
750                   ns.conn == C_WF_BITMAP_S ||
751                   ns.conn == C_SYNC_SOURCE ||
752                   ns.conn == C_PAUSED_SYNC_S) &&
753                   ns.disk == D_OUTDATED)
754                 rv = SS_CONNECTED_OUTDATES;
755
756         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
757                  (mdev->sync_conf.verify_alg[0] == 0))
758                 rv = SS_NO_VERIFY_ALG;
759
760         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
761                   mdev->agreed_pro_version < 88)
762                 rv = SS_NOT_SUPPORTED;
763
764         return rv;
765 }
766
767 /**
768  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
769  * @mdev:       DRBD device.
770  * @ns:         new state.
771  * @os:         old state.
772  */
773 static enum drbd_state_rv
774 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
775                           union drbd_state os)
776 {
777         enum drbd_state_rv rv = SS_SUCCESS;
778
779         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
780             os.conn > C_CONNECTED)
781                 rv = SS_RESYNC_RUNNING;
782
783         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
784                 rv = SS_ALREADY_STANDALONE;
785
786         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
787                 rv = SS_IS_DISKLESS;
788
789         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
790                 rv = SS_NO_NET_CONFIG;
791
792         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
793                 rv = SS_LOWER_THAN_OUTDATED;
794
795         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
796                 rv = SS_IN_TRANSIENT_STATE;
797
798         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
799                 rv = SS_IN_TRANSIENT_STATE;
800
801         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
802                 rv = SS_NEED_CONNECTION;
803
804         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
805             ns.conn != os.conn && os.conn > C_CONNECTED)
806                 rv = SS_RESYNC_RUNNING;
807
808         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
809             os.conn < C_CONNECTED)
810                 rv = SS_NEED_CONNECTION;
811
812         if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
813             && os.conn < C_WF_REPORT_PARAMS)
814                 rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
815
816         return rv;
817 }
818
819 /**
820  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
821  * @mdev:       DRBD device.
822  * @os:         old state.
823  * @ns:         new state.
824  * @warn_sync_abort:
825  *
826  * When we loose connection, we have to set the state of the peers disk (pdsk)
827  * to D_UNKNOWN. This rule and many more along those lines are in this function.
828  */
829 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
830                                        union drbd_state ns, const char **warn_sync_abort)
831 {
832         enum drbd_fencing_p fp;
833         enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
834
835         fp = FP_DONT_CARE;
836         if (get_ldev(mdev)) {
837                 fp = mdev->ldev->dc.fencing;
838                 put_ldev(mdev);
839         }
840
841         /* Disallow Network errors to configure a device's network part */
842         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
843             os.conn <= C_DISCONNECTING)
844                 ns.conn = os.conn;
845
846         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
847          * If you try to go into some Sync* state, that shall fail (elsewhere). */
848         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
849             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
850                 ns.conn = os.conn;
851
852         /* we cannot fail (again) if we already detached */
853         if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
854                 ns.disk = D_DISKLESS;
855
856         /* if we are only D_ATTACHING yet,
857          * we can (and should) go directly to D_DISKLESS. */
858         if (ns.disk == D_FAILED && os.disk == D_ATTACHING)
859                 ns.disk = D_DISKLESS;
860
861         /* After C_DISCONNECTING only C_STANDALONE may follow */
862         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
863                 ns.conn = os.conn;
864
865         if (ns.conn < C_CONNECTED) {
866                 ns.peer_isp = 0;
867                 ns.peer = R_UNKNOWN;
868                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
869                         ns.pdsk = D_UNKNOWN;
870         }
871
872         /* Clear the aftr_isp when becoming unconfigured */
873         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
874                 ns.aftr_isp = 0;
875
876         /* Abort resync if a disk fails/detaches */
877         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
878             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
879                 if (warn_sync_abort)
880                         *warn_sync_abort =
881                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
882                                 "Online-verify" : "Resync";
883                 ns.conn = C_CONNECTED;
884         }
885
886         /* Connection breaks down before we finished "Negotiating" */
887         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
888             get_ldev_if_state(mdev, D_NEGOTIATING)) {
889                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
890                         ns.disk = mdev->new_state_tmp.disk;
891                         ns.pdsk = mdev->new_state_tmp.pdsk;
892                 } else {
893                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
894                         ns.disk = D_DISKLESS;
895                         ns.pdsk = D_UNKNOWN;
896                 }
897                 put_ldev(mdev);
898         }
899
900         /* D_CONSISTENT and D_OUTDATED vanish when we get connected */
901         if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
902                 if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
903                         ns.disk = D_UP_TO_DATE;
904                 if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
905                         ns.pdsk = D_UP_TO_DATE;
906         }
907
908         /* Implications of the connection stat on the disk states */
909         disk_min = D_DISKLESS;
910         disk_max = D_UP_TO_DATE;
911         pdsk_min = D_INCONSISTENT;
912         pdsk_max = D_UNKNOWN;
913         switch ((enum drbd_conns)ns.conn) {
914         case C_WF_BITMAP_T:
915         case C_PAUSED_SYNC_T:
916         case C_STARTING_SYNC_T:
917         case C_WF_SYNC_UUID:
918         case C_BEHIND:
919                 disk_min = D_INCONSISTENT;
920                 disk_max = D_OUTDATED;
921                 pdsk_min = D_UP_TO_DATE;
922                 pdsk_max = D_UP_TO_DATE;
923                 break;
924         case C_VERIFY_S:
925         case C_VERIFY_T:
926                 disk_min = D_UP_TO_DATE;
927                 disk_max = D_UP_TO_DATE;
928                 pdsk_min = D_UP_TO_DATE;
929                 pdsk_max = D_UP_TO_DATE;
930                 break;
931         case C_CONNECTED:
932                 disk_min = D_DISKLESS;
933                 disk_max = D_UP_TO_DATE;
934                 pdsk_min = D_DISKLESS;
935                 pdsk_max = D_UP_TO_DATE;
936                 break;
937         case C_WF_BITMAP_S:
938         case C_PAUSED_SYNC_S:
939         case C_STARTING_SYNC_S:
940         case C_AHEAD:
941                 disk_min = D_UP_TO_DATE;
942                 disk_max = D_UP_TO_DATE;
943                 pdsk_min = D_INCONSISTENT;
944                 pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
945                 break;
946         case C_SYNC_TARGET:
947                 disk_min = D_INCONSISTENT;
948                 disk_max = D_INCONSISTENT;
949                 pdsk_min = D_UP_TO_DATE;
950                 pdsk_max = D_UP_TO_DATE;
951                 break;
952         case C_SYNC_SOURCE:
953                 disk_min = D_UP_TO_DATE;
954                 disk_max = D_UP_TO_DATE;
955                 pdsk_min = D_INCONSISTENT;
956                 pdsk_max = D_INCONSISTENT;
957                 break;
958         case C_STANDALONE:
959         case C_DISCONNECTING:
960         case C_UNCONNECTED:
961         case C_TIMEOUT:
962         case C_BROKEN_PIPE:
963         case C_NETWORK_FAILURE:
964         case C_PROTOCOL_ERROR:
965         case C_TEAR_DOWN:
966         case C_WF_CONNECTION:
967         case C_WF_REPORT_PARAMS:
968         case C_MASK:
969                 break;
970         }
971         if (ns.disk > disk_max)
972                 ns.disk = disk_max;
973
974         if (ns.disk < disk_min) {
975                 dev_warn(DEV, "Implicitly set disk from %s to %s\n",
976                          drbd_disk_str(ns.disk), drbd_disk_str(disk_min));
977                 ns.disk = disk_min;
978         }
979         if (ns.pdsk > pdsk_max)
980                 ns.pdsk = pdsk_max;
981
982         if (ns.pdsk < pdsk_min) {
983                 dev_warn(DEV, "Implicitly set pdsk from %s to %s\n",
984                          drbd_disk_str(ns.pdsk), drbd_disk_str(pdsk_min));
985                 ns.pdsk = pdsk_min;
986         }
987
988         if (fp == FP_STONITH &&
989             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
990             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
991                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
992
993         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
994             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
995             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
996                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
997
998         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
999                 if (ns.conn == C_SYNC_SOURCE)
1000                         ns.conn = C_PAUSED_SYNC_S;
1001                 if (ns.conn == C_SYNC_TARGET)
1002                         ns.conn = C_PAUSED_SYNC_T;
1003         } else {
1004                 if (ns.conn == C_PAUSED_SYNC_S)
1005                         ns.conn = C_SYNC_SOURCE;
1006                 if (ns.conn == C_PAUSED_SYNC_T)
1007                         ns.conn = C_SYNC_TARGET;
1008         }
1009
1010         return ns;
1011 }
1012
1013 /* helper for __drbd_set_state */
1014 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
1015 {
1016         if (mdev->agreed_pro_version < 90)
1017                 mdev->ov_start_sector = 0;
1018         mdev->rs_total = drbd_bm_bits(mdev);
1019         mdev->ov_position = 0;
1020         if (cs == C_VERIFY_T) {
1021                 /* starting online verify from an arbitrary position
1022                  * does not fit well into the existing protocol.
1023                  * on C_VERIFY_T, we initialize ov_left and friends
1024                  * implicitly in receive_DataRequest once the
1025                  * first P_OV_REQUEST is received */
1026                 mdev->ov_start_sector = ~(sector_t)0;
1027         } else {
1028                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1029                 if (bit >= mdev->rs_total) {
1030                         mdev->ov_start_sector =
1031                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
1032                         mdev->rs_total = 1;
1033                 } else
1034                         mdev->rs_total -= bit;
1035                 mdev->ov_position = mdev->ov_start_sector;
1036         }
1037         mdev->ov_left = mdev->rs_total;
1038 }
1039
1040 static void drbd_resume_al(struct drbd_conf *mdev)
1041 {
1042         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1043                 dev_info(DEV, "Resumed AL updates\n");
1044 }
1045
1046 /**
1047  * __drbd_set_state() - Set a new DRBD state
1048  * @mdev:       DRBD device.
1049  * @ns:         new state.
1050  * @flags:      Flags
1051  * @done:       Optional completion, that will get completed after the after_state_ch() finished
1052  *
1053  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1054  */
1055 enum drbd_state_rv
1056 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1057                  enum chg_state_flags flags, struct completion *done)
1058 {
1059         union drbd_state os;
1060         enum drbd_state_rv rv = SS_SUCCESS;
1061         const char *warn_sync_abort = NULL;
1062         struct after_state_chg_work *ascw;
1063
1064         os = mdev->state;
1065
1066         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
1067
1068         if (ns.i == os.i)
1069                 return SS_NOTHING_TO_DO;
1070
1071         if (!(flags & CS_HARD)) {
1072                 /*  pre-state-change checks ; only look at ns  */
1073                 /* See drbd_state_sw_errors in drbd_strings.c */
1074
1075                 rv = is_valid_state(mdev, ns);
1076                 if (rv < SS_SUCCESS) {
1077                         /* If the old state was illegal as well, then let
1078                            this happen...*/
1079
1080                         if (is_valid_state(mdev, os) == rv)
1081                                 rv = is_valid_state_transition(mdev, ns, os);
1082                 } else
1083                         rv = is_valid_state_transition(mdev, ns, os);
1084         }
1085
1086         if (rv < SS_SUCCESS) {
1087                 if (flags & CS_VERBOSE)
1088                         print_st_err(mdev, os, ns, rv);
1089                 return rv;
1090         }
1091
1092         if (warn_sync_abort)
1093                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1094
1095         {
1096         char *pbp, pb[300];
1097         pbp = pb;
1098         *pbp = 0;
1099         if (ns.role != os.role)
1100                 pbp += sprintf(pbp, "role( %s -> %s ) ",
1101                                drbd_role_str(os.role),
1102                                drbd_role_str(ns.role));
1103         if (ns.peer != os.peer)
1104                 pbp += sprintf(pbp, "peer( %s -> %s ) ",
1105                                drbd_role_str(os.peer),
1106                                drbd_role_str(ns.peer));
1107         if (ns.conn != os.conn)
1108                 pbp += sprintf(pbp, "conn( %s -> %s ) ",
1109                                drbd_conn_str(os.conn),
1110                                drbd_conn_str(ns.conn));
1111         if (ns.disk != os.disk)
1112                 pbp += sprintf(pbp, "disk( %s -> %s ) ",
1113                                drbd_disk_str(os.disk),
1114                                drbd_disk_str(ns.disk));
1115         if (ns.pdsk != os.pdsk)
1116                 pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1117                                drbd_disk_str(os.pdsk),
1118                                drbd_disk_str(ns.pdsk));
1119         if (is_susp(ns) != is_susp(os))
1120                 pbp += sprintf(pbp, "susp( %d -> %d ) ",
1121                                is_susp(os),
1122                                is_susp(ns));
1123         if (ns.aftr_isp != os.aftr_isp)
1124                 pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1125                                os.aftr_isp,
1126                                ns.aftr_isp);
1127         if (ns.peer_isp != os.peer_isp)
1128                 pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1129                                os.peer_isp,
1130                                ns.peer_isp);
1131         if (ns.user_isp != os.user_isp)
1132                 pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1133                                os.user_isp,
1134                                ns.user_isp);
1135         dev_info(DEV, "%s\n", pb);
1136         }
1137
1138         /* solve the race between becoming unconfigured,
1139          * worker doing the cleanup, and
1140          * admin reconfiguring us:
1141          * on (re)configure, first set CONFIG_PENDING,
1142          * then wait for a potentially exiting worker,
1143          * start the worker, and schedule one no_op.
1144          * then proceed with configuration.
1145          */
1146         if (ns.disk == D_DISKLESS &&
1147             ns.conn == C_STANDALONE &&
1148             ns.role == R_SECONDARY &&
1149             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1150                 set_bit(DEVICE_DYING, &mdev->flags);
1151
1152         /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1153          * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1154          * drbd_ldev_destroy() won't happen before our corresponding
1155          * after_state_ch works run, where we put_ldev again. */
1156         if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1157             (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1158                 atomic_inc(&mdev->local_cnt);
1159
1160         mdev->state = ns;
1161         wake_up(&mdev->misc_wait);
1162         wake_up(&mdev->state_wait);
1163
1164         /* aborted verify run. log the last position */
1165         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1166             ns.conn < C_CONNECTED) {
1167                 mdev->ov_start_sector =
1168                         BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1169                 dev_info(DEV, "Online Verify reached sector %llu\n",
1170                         (unsigned long long)mdev->ov_start_sector);
1171         }
1172
1173         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1174             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1175                 dev_info(DEV, "Syncer continues.\n");
1176                 mdev->rs_paused += (long)jiffies
1177                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1178                 if (ns.conn == C_SYNC_TARGET)
1179                         mod_timer(&mdev->resync_timer, jiffies);
1180         }
1181
1182         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1183             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1184                 dev_info(DEV, "Resync suspended\n");
1185                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1186         }
1187
1188         if (os.conn == C_CONNECTED &&
1189             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1190                 unsigned long now = jiffies;
1191                 int i;
1192
1193                 set_ov_position(mdev, ns.conn);
1194                 mdev->rs_start = now;
1195                 mdev->rs_last_events = 0;
1196                 mdev->rs_last_sect_ev = 0;
1197                 mdev->ov_last_oos_size = 0;
1198                 mdev->ov_last_oos_start = 0;
1199
1200                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1201                         mdev->rs_mark_left[i] = mdev->ov_left;
1202                         mdev->rs_mark_time[i] = now;
1203                 }
1204
1205                 drbd_rs_controller_reset(mdev);
1206
1207                 if (ns.conn == C_VERIFY_S) {
1208                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1209                                         (unsigned long long)mdev->ov_position);
1210                         mod_timer(&mdev->resync_timer, jiffies);
1211                 }
1212         }
1213
1214         if (get_ldev(mdev)) {
1215                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1216                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1217                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1218
1219                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1220                         mdf |= MDF_CRASHED_PRIMARY;
1221                 if (mdev->state.role == R_PRIMARY ||
1222                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1223                         mdf |= MDF_PRIMARY_IND;
1224                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1225                         mdf |= MDF_CONNECTED_IND;
1226                 if (mdev->state.disk > D_INCONSISTENT)
1227                         mdf |= MDF_CONSISTENT;
1228                 if (mdev->state.disk > D_OUTDATED)
1229                         mdf |= MDF_WAS_UP_TO_DATE;
1230                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1231                         mdf |= MDF_PEER_OUT_DATED;
1232                 if (mdf != mdev->ldev->md.flags) {
1233                         mdev->ldev->md.flags = mdf;
1234                         drbd_md_mark_dirty(mdev);
1235                 }
1236                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1237                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1238                 put_ldev(mdev);
1239         }
1240
1241         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1242         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1243             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1244                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1245
1246         /* Receiver should clean up itself */
1247         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1248                 drbd_thread_stop_nowait(&mdev->receiver);
1249
1250         /* Now the receiver finished cleaning up itself, it should die */
1251         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1252                 drbd_thread_stop_nowait(&mdev->receiver);
1253
1254         /* Upon network failure, we need to restart the receiver. */
1255         if (os.conn > C_TEAR_DOWN &&
1256             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1257                 drbd_thread_restart_nowait(&mdev->receiver);
1258
1259         /* Resume AL writing if we get a connection */
1260         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1261                 drbd_resume_al(mdev);
1262
1263         if (os.conn == C_AHEAD && ns.conn != C_AHEAD)
1264                 tl_forget(mdev);
1265
1266         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1267         if (ascw) {
1268                 ascw->os = os;
1269                 ascw->ns = ns;
1270                 ascw->flags = flags;
1271                 ascw->w.cb = w_after_state_ch;
1272                 ascw->done = done;
1273                 drbd_queue_work(&mdev->data.work, &ascw->w);
1274         } else {
1275                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1276         }
1277
1278         return rv;
1279 }
1280
1281 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1282 {
1283         struct after_state_chg_work *ascw =
1284                 container_of(w, struct after_state_chg_work, w);
1285         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1286         if (ascw->flags & CS_WAIT_COMPLETE) {
1287                 D_ASSERT(ascw->done != NULL);
1288                 complete(ascw->done);
1289         }
1290         kfree(ascw);
1291
1292         return 1;
1293 }
1294
1295 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1296 {
1297         if (rv) {
1298                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1299                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1300                 return;
1301         }
1302
1303         switch (mdev->state.conn) {
1304         case C_STARTING_SYNC_T:
1305                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1306                 break;
1307         case C_STARTING_SYNC_S:
1308                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1309                 break;
1310         }
1311 }
1312
1313 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
1314 {
1315         int rv;
1316
1317         D_ASSERT(current == mdev->worker.task);
1318
1319         /* open coded non-blocking drbd_suspend_io(mdev); */
1320         set_bit(SUSPEND_IO, &mdev->flags);
1321         if (!is_susp(mdev->state))
1322                 D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
1323
1324         drbd_bm_lock(mdev, why);
1325         rv = io_fn(mdev);
1326         drbd_bm_unlock(mdev);
1327
1328         drbd_resume_io(mdev);
1329
1330         return rv;
1331 }
1332
1333 /**
1334  * after_state_ch() - Perform after state change actions that may sleep
1335  * @mdev:       DRBD device.
1336  * @os:         old state.
1337  * @ns:         new state.
1338  * @flags:      Flags
1339  */
1340 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1341                            union drbd_state ns, enum chg_state_flags flags)
1342 {
1343         enum drbd_fencing_p fp;
1344         enum drbd_req_event what = nothing;
1345         union drbd_state nsm = (union drbd_state){ .i = -1 };
1346
1347         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1348                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1349                 if (mdev->p_uuid)
1350                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1351         }
1352
1353         fp = FP_DONT_CARE;
1354         if (get_ldev(mdev)) {
1355                 fp = mdev->ldev->dc.fencing;
1356                 put_ldev(mdev);
1357         }
1358
1359         /* Inform userspace about the change... */
1360         drbd_bcast_state(mdev, ns);
1361
1362         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1363             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1364                 drbd_khelper(mdev, "pri-on-incon-degr");
1365
1366         /* Here we have the actions that are performed after a
1367            state change. This function might sleep */
1368
1369         nsm.i = -1;
1370         if (ns.susp_nod) {
1371                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1372                         what = resend;
1373
1374                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1375                         what = restart_frozen_disk_io;
1376
1377                 if (what != nothing)
1378                         nsm.susp_nod = 0;
1379         }
1380
1381         if (ns.susp_fen) {
1382                 /* case1: The outdate peer handler is successful: */
1383                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1384                         tl_clear(mdev);
1385                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1386                                 drbd_uuid_new_current(mdev);
1387                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1388                         }
1389                         spin_lock_irq(&mdev->req_lock);
1390                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1391                         spin_unlock_irq(&mdev->req_lock);
1392                 }
1393                 /* case2: The connection was established again: */
1394                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1395                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1396                         what = resend;
1397                         nsm.susp_fen = 0;
1398                 }
1399         }
1400
1401         if (what != nothing) {
1402                 spin_lock_irq(&mdev->req_lock);
1403                 _tl_restart(mdev, what);
1404                 nsm.i &= mdev->state.i;
1405                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1406                 spin_unlock_irq(&mdev->req_lock);
1407         }
1408
1409         /* Became sync source.  With protocol >= 96, we still need to send out
1410          * the sync uuid now. Need to do that before any drbd_send_state, or
1411          * the other side may go "paused sync" before receiving the sync uuids,
1412          * which is unexpected. */
1413         if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1414             (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1415             mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1416                 drbd_gen_and_send_sync_uuid(mdev);
1417                 put_ldev(mdev);
1418         }
1419
1420         /* Do not change the order of the if above and the two below... */
1421         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1422                 drbd_send_uuids(mdev);
1423                 drbd_send_state(mdev);
1424         }
1425         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1426                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1427
1428         /* Lost contact to peer's copy of the data */
1429         if ((os.pdsk >= D_INCONSISTENT &&
1430              os.pdsk != D_UNKNOWN &&
1431              os.pdsk != D_OUTDATED)
1432         &&  (ns.pdsk < D_INCONSISTENT ||
1433              ns.pdsk == D_UNKNOWN ||
1434              ns.pdsk == D_OUTDATED)) {
1435                 if (get_ldev(mdev)) {
1436                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1437                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1438                                 if (is_susp(mdev->state)) {
1439                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1440                                 } else {
1441                                         drbd_uuid_new_current(mdev);
1442                                         drbd_send_uuids(mdev);
1443                                 }
1444                         }
1445                         put_ldev(mdev);
1446                 }
1447         }
1448
1449         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1450                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1451                         drbd_uuid_new_current(mdev);
1452                         drbd_send_uuids(mdev);
1453                 }
1454
1455                 /* D_DISKLESS Peer becomes secondary */
1456                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1457                         drbd_bitmap_io_from_worker(mdev, &drbd_bm_write, "demote diskless peer");
1458                 put_ldev(mdev);
1459         }
1460
1461         /* Write out all changed bits on demote.
1462          * Though, no need to da that just yet
1463          * if there is a resync going on still */
1464         if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1465                 mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1466                 drbd_bitmap_io_from_worker(mdev, &drbd_bm_write, "demote");
1467                 put_ldev(mdev);
1468         }
1469
1470         /* Last part of the attaching process ... */
1471         if (ns.conn >= C_CONNECTED &&
1472             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1473                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1474                 drbd_send_uuids(mdev);
1475                 drbd_send_state(mdev);
1476         }
1477
1478         /* We want to pause/continue resync, tell peer. */
1479         if (ns.conn >= C_CONNECTED &&
1480              ((os.aftr_isp != ns.aftr_isp) ||
1481               (os.user_isp != ns.user_isp)))
1482                 drbd_send_state(mdev);
1483
1484         /* In case one of the isp bits got set, suspend other devices. */
1485         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1486             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1487                 suspend_other_sg(mdev);
1488
1489         /* Make sure the peer gets informed about eventual state
1490            changes (ISP bits) while we were in WFReportParams. */
1491         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1492                 drbd_send_state(mdev);
1493
1494         if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1495                 drbd_send_state(mdev);
1496
1497         /* We are in the progress to start a full sync... */
1498         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1499             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1500                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1501
1502         /* We are invalidating our self... */
1503         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1504             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1505                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1506
1507         /* first half of local IO error, failure to attach,
1508          * or administrative detach */
1509         if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1510                 enum drbd_io_error_p eh;
1511                 int was_io_error;
1512                 /* corresponding get_ldev was in __drbd_set_state, to serialize
1513                  * our cleanup here with the transition to D_DISKLESS,
1514                  * so it is safe to dreference ldev here. */
1515                 eh = mdev->ldev->dc.on_io_error;
1516                 was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1517
1518                 /* current state still has to be D_FAILED,
1519                  * there is only one way out: to D_DISKLESS,
1520                  * and that may only happen after our put_ldev below. */
1521                 if (mdev->state.disk != D_FAILED)
1522                         dev_err(DEV,
1523                                 "ASSERT FAILED: disk is %s during detach\n",
1524                                 drbd_disk_str(mdev->state.disk));
1525
1526                 if (drbd_send_state(mdev))
1527                         dev_warn(DEV, "Notified peer that I am detaching my disk\n");
1528                 else
1529                         dev_err(DEV, "Sending state for detaching disk failed\n");
1530
1531                 drbd_rs_cancel_all(mdev);
1532
1533                 /* In case we want to get something to stable storage still,
1534                  * this may be the last chance.
1535                  * Following put_ldev may transition to D_DISKLESS. */
1536                 drbd_md_sync(mdev);
1537                 put_ldev(mdev);
1538
1539                 if (was_io_error && eh == EP_CALL_HELPER)
1540                         drbd_khelper(mdev, "local-io-error");
1541         }
1542
1543         /* second half of local IO error, failure to attach,
1544          * or administrative detach,
1545          * after local_cnt references have reached zero again */
1546         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1547                 /* We must still be diskless,
1548                  * re-attach has to be serialized with this! */
1549                 if (mdev->state.disk != D_DISKLESS)
1550                         dev_err(DEV,
1551                                 "ASSERT FAILED: disk is %s while going diskless\n",
1552                                 drbd_disk_str(mdev->state.disk));
1553
1554                 mdev->rs_total = 0;
1555                 mdev->rs_failed = 0;
1556                 atomic_set(&mdev->rs_pending_cnt, 0);
1557
1558                 if (drbd_send_state(mdev))
1559                         dev_warn(DEV, "Notified peer that I'm now diskless.\n");
1560                 else
1561                         dev_err(DEV, "Sending state for being diskless failed\n");
1562                 /* corresponding get_ldev in __drbd_set_state
1563                  * this may finaly trigger drbd_ldev_destroy. */
1564                 put_ldev(mdev);
1565         }
1566
1567         /* Disks got bigger while they were detached */
1568         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1569             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1570                 if (ns.conn == C_CONNECTED)
1571                         resync_after_online_grow(mdev);
1572         }
1573
1574         /* A resync finished or aborted, wake paused devices... */
1575         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1576             (os.peer_isp && !ns.peer_isp) ||
1577             (os.user_isp && !ns.user_isp))
1578                 resume_next_sg(mdev);
1579
1580         /* sync target done with resync.  Explicitly notify peer, even though
1581          * it should (at least for non-empty resyncs) already know itself. */
1582         if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1583                 drbd_send_state(mdev);
1584
1585         if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED)
1586                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
1587
1588         /* free tl_hash if we Got thawed and are C_STANDALONE */
1589         if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1590                 drbd_free_tl_hash(mdev);
1591
1592         /* Upon network connection, we need to start the receiver */
1593         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1594                 drbd_thread_start(&mdev->receiver);
1595
1596         /* Terminate worker thread if we are unconfigured - it will be
1597            restarted as needed... */
1598         if (ns.disk == D_DISKLESS &&
1599             ns.conn == C_STANDALONE &&
1600             ns.role == R_SECONDARY) {
1601                 if (os.aftr_isp != ns.aftr_isp)
1602                         resume_next_sg(mdev);
1603                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1604                 if (test_bit(DEVICE_DYING, &mdev->flags))
1605                         drbd_thread_stop_nowait(&mdev->worker);
1606         }
1607
1608         drbd_md_sync(mdev);
1609 }
1610
1611
1612 static int drbd_thread_setup(void *arg)
1613 {
1614         struct drbd_thread *thi = (struct drbd_thread *) arg;
1615         struct drbd_conf *mdev = thi->mdev;
1616         unsigned long flags;
1617         int retval;
1618
1619 restart:
1620         retval = thi->function(thi);
1621
1622         spin_lock_irqsave(&thi->t_lock, flags);
1623
1624         /* if the receiver has been "Exiting", the last thing it did
1625          * was set the conn state to "StandAlone",
1626          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1627          * and receiver thread will be "started".
1628          * drbd_thread_start needs to set "Restarting" in that case.
1629          * t_state check and assignment needs to be within the same spinlock,
1630          * so either thread_start sees Exiting, and can remap to Restarting,
1631          * or thread_start see None, and can proceed as normal.
1632          */
1633
1634         if (thi->t_state == Restarting) {
1635                 dev_info(DEV, "Restarting %s\n", current->comm);
1636                 thi->t_state = Running;
1637                 spin_unlock_irqrestore(&thi->t_lock, flags);
1638                 goto restart;
1639         }
1640
1641         thi->task = NULL;
1642         thi->t_state = None;
1643         smp_mb();
1644         complete(&thi->stop);
1645         spin_unlock_irqrestore(&thi->t_lock, flags);
1646
1647         dev_info(DEV, "Terminating %s\n", current->comm);
1648
1649         /* Release mod reference taken when thread was started */
1650         module_put(THIS_MODULE);
1651         return retval;
1652 }
1653
1654 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1655                       int (*func) (struct drbd_thread *))
1656 {
1657         spin_lock_init(&thi->t_lock);
1658         thi->task    = NULL;
1659         thi->t_state = None;
1660         thi->function = func;
1661         thi->mdev = mdev;
1662 }
1663
1664 int drbd_thread_start(struct drbd_thread *thi)
1665 {
1666         struct drbd_conf *mdev = thi->mdev;
1667         struct task_struct *nt;
1668         unsigned long flags;
1669
1670         const char *me =
1671                 thi == &mdev->receiver ? "receiver" :
1672                 thi == &mdev->asender  ? "asender"  :
1673                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1674
1675         /* is used from state engine doing drbd_thread_stop_nowait,
1676          * while holding the req lock irqsave */
1677         spin_lock_irqsave(&thi->t_lock, flags);
1678
1679         switch (thi->t_state) {
1680         case None:
1681                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1682                                 me, current->comm, current->pid);
1683
1684                 /* Get ref on module for thread - this is released when thread exits */
1685                 if (!try_module_get(THIS_MODULE)) {
1686                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1687                         spin_unlock_irqrestore(&thi->t_lock, flags);
1688                         return false;
1689                 }
1690
1691                 init_completion(&thi->stop);
1692                 D_ASSERT(thi->task == NULL);
1693                 thi->reset_cpu_mask = 1;
1694                 thi->t_state = Running;
1695                 spin_unlock_irqrestore(&thi->t_lock, flags);
1696                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1697
1698                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1699                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1700
1701                 if (IS_ERR(nt)) {
1702                         dev_err(DEV, "Couldn't start thread\n");
1703
1704                         module_put(THIS_MODULE);
1705                         return false;
1706                 }
1707                 spin_lock_irqsave(&thi->t_lock, flags);
1708                 thi->task = nt;
1709                 thi->t_state = Running;
1710                 spin_unlock_irqrestore(&thi->t_lock, flags);
1711                 wake_up_process(nt);
1712                 break;
1713         case Exiting:
1714                 thi->t_state = Restarting;
1715                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1716                                 me, current->comm, current->pid);
1717                 /* fall through */
1718         case Running:
1719         case Restarting:
1720         default:
1721                 spin_unlock_irqrestore(&thi->t_lock, flags);
1722                 break;
1723         }
1724
1725         return true;
1726 }
1727
1728
1729 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1730 {
1731         unsigned long flags;
1732
1733         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1734
1735         /* may be called from state engine, holding the req lock irqsave */
1736         spin_lock_irqsave(&thi->t_lock, flags);
1737
1738         if (thi->t_state == None) {
1739                 spin_unlock_irqrestore(&thi->t_lock, flags);
1740                 if (restart)
1741                         drbd_thread_start(thi);
1742                 return;
1743         }
1744
1745         if (thi->t_state != ns) {
1746                 if (thi->task == NULL) {
1747                         spin_unlock_irqrestore(&thi->t_lock, flags);
1748                         return;
1749                 }
1750
1751                 thi->t_state = ns;
1752                 smp_mb();
1753                 init_completion(&thi->stop);
1754                 if (thi->task != current)
1755                         force_sig(DRBD_SIGKILL, thi->task);
1756
1757         }
1758
1759         spin_unlock_irqrestore(&thi->t_lock, flags);
1760
1761         if (wait)
1762                 wait_for_completion(&thi->stop);
1763 }
1764
1765 #ifdef CONFIG_SMP
1766 /**
1767  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1768  * @mdev:       DRBD device.
1769  *
1770  * Forces all threads of a device onto the same CPU. This is beneficial for
1771  * DRBD's performance. May be overwritten by user's configuration.
1772  */
1773 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1774 {
1775         int ord, cpu;
1776
1777         /* user override. */
1778         if (cpumask_weight(mdev->cpu_mask))
1779                 return;
1780
1781         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1782         for_each_online_cpu(cpu) {
1783                 if (ord-- == 0) {
1784                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1785                         return;
1786                 }
1787         }
1788         /* should not be reached */
1789         cpumask_setall(mdev->cpu_mask);
1790 }
1791
1792 /**
1793  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1794  * @mdev:       DRBD device.
1795  *
1796  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1797  * prematurely.
1798  */
1799 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1800 {
1801         struct task_struct *p = current;
1802         struct drbd_thread *thi =
1803                 p == mdev->asender.task  ? &mdev->asender  :
1804                 p == mdev->receiver.task ? &mdev->receiver :
1805                 p == mdev->worker.task   ? &mdev->worker   :
1806                 NULL;
1807         ERR_IF(thi == NULL)
1808                 return;
1809         if (!thi->reset_cpu_mask)
1810                 return;
1811         thi->reset_cpu_mask = 0;
1812         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1813 }
1814 #endif
1815
1816 /* the appropriate socket mutex must be held already */
1817 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1818                           enum drbd_packets cmd, struct p_header80 *h,
1819                           size_t size, unsigned msg_flags)
1820 {
1821         int sent, ok;
1822
1823         ERR_IF(!h) return false;
1824         ERR_IF(!size) return false;
1825
1826         h->magic   = BE_DRBD_MAGIC;
1827         h->command = cpu_to_be16(cmd);
1828         h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1829
1830         sent = drbd_send(mdev, sock, h, size, msg_flags);
1831
1832         ok = (sent == size);
1833         if (!ok)
1834                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1835                     cmdname(cmd), (int)size, sent);
1836         return ok;
1837 }
1838
1839 /* don't pass the socket. we may only look at it
1840  * when we hold the appropriate socket mutex.
1841  */
1842 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1843                   enum drbd_packets cmd, struct p_header80 *h, size_t size)
1844 {
1845         int ok = 0;
1846         struct socket *sock;
1847
1848         if (use_data_socket) {
1849                 mutex_lock(&mdev->data.mutex);
1850                 sock = mdev->data.socket;
1851         } else {
1852                 mutex_lock(&mdev->meta.mutex);
1853                 sock = mdev->meta.socket;
1854         }
1855
1856         /* drbd_disconnect() could have called drbd_free_sock()
1857          * while we were waiting in down()... */
1858         if (likely(sock != NULL))
1859                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1860
1861         if (use_data_socket)
1862                 mutex_unlock(&mdev->data.mutex);
1863         else
1864                 mutex_unlock(&mdev->meta.mutex);
1865         return ok;
1866 }
1867
1868 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1869                    size_t size)
1870 {
1871         struct p_header80 h;
1872         int ok;
1873
1874         h.magic   = BE_DRBD_MAGIC;
1875         h.command = cpu_to_be16(cmd);
1876         h.length  = cpu_to_be16(size);
1877
1878         if (!drbd_get_data_sock(mdev))
1879                 return 0;
1880
1881         ok = (sizeof(h) ==
1882                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1883         ok = ok && (size ==
1884                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1885
1886         drbd_put_data_sock(mdev);
1887
1888         return ok;
1889 }
1890
1891 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1892 {
1893         struct p_rs_param_95 *p;
1894         struct socket *sock;
1895         int size, rv;
1896         const int apv = mdev->agreed_pro_version;
1897
1898         size = apv <= 87 ? sizeof(struct p_rs_param)
1899                 : apv == 88 ? sizeof(struct p_rs_param)
1900                         + strlen(mdev->sync_conf.verify_alg) + 1
1901                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1902                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1903
1904         /* used from admin command context and receiver/worker context.
1905          * to avoid kmalloc, grab the socket right here,
1906          * then use the pre-allocated sbuf there */
1907         mutex_lock(&mdev->data.mutex);
1908         sock = mdev->data.socket;
1909
1910         if (likely(sock != NULL)) {
1911                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1912
1913                 p = &mdev->data.sbuf.rs_param_95;
1914
1915                 /* initialize verify_alg and csums_alg */
1916                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1917
1918                 p->rate = cpu_to_be32(sc->rate);
1919                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1920                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1921                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1922                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1923
1924                 if (apv >= 88)
1925                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1926                 if (apv >= 89)
1927                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1928
1929                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1930         } else
1931                 rv = 0; /* not ok */
1932
1933         mutex_unlock(&mdev->data.mutex);
1934
1935         return rv;
1936 }
1937
1938 int drbd_send_protocol(struct drbd_conf *mdev)
1939 {
1940         struct p_protocol *p;
1941         int size, cf, rv;
1942
1943         size = sizeof(struct p_protocol);
1944
1945         if (mdev->agreed_pro_version >= 87)
1946                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1947
1948         /* we must not recurse into our own queue,
1949          * as that is blocked during handshake */
1950         p = kmalloc(size, GFP_NOIO);
1951         if (p == NULL)
1952                 return 0;
1953
1954         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1955         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1956         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1957         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1958         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1959
1960         cf = 0;
1961         if (mdev->net_conf->want_lose)
1962                 cf |= CF_WANT_LOSE;
1963         if (mdev->net_conf->dry_run) {
1964                 if (mdev->agreed_pro_version >= 92)
1965                         cf |= CF_DRY_RUN;
1966                 else {
1967                         dev_err(DEV, "--dry-run is not supported by peer");
1968                         kfree(p);
1969                         return 0;
1970                 }
1971         }
1972         p->conn_flags    = cpu_to_be32(cf);
1973
1974         if (mdev->agreed_pro_version >= 87)
1975                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1976
1977         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1978                            (struct p_header80 *)p, size);
1979         kfree(p);
1980         return rv;
1981 }
1982
1983 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1984 {
1985         struct p_uuids p;
1986         int i;
1987
1988         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1989                 return 1;
1990
1991         for (i = UI_CURRENT; i < UI_SIZE; i++)
1992                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1993
1994         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1995         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1996         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1997         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1998         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1999         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2000
2001         put_ldev(mdev);
2002
2003         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2004                              (struct p_header80 *)&p, sizeof(p));
2005 }
2006
2007 int drbd_send_uuids(struct drbd_conf *mdev)
2008 {
2009         return _drbd_send_uuids(mdev, 0);
2010 }
2011
2012 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2013 {
2014         return _drbd_send_uuids(mdev, 8);
2015 }
2016
2017 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2018 {
2019         struct p_rs_uuid p;
2020         u64 uuid;
2021
2022         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2023
2024         get_random_bytes(&uuid, sizeof(u64));
2025         drbd_uuid_set(mdev, UI_BITMAP, uuid);
2026         drbd_md_sync(mdev);
2027         p.uuid = cpu_to_be64(uuid);
2028
2029         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2030                              (struct p_header80 *)&p, sizeof(p));
2031 }
2032
2033 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2034 {
2035         struct p_sizes p;
2036         sector_t d_size, u_size;
2037         int q_order_type;
2038         int ok;
2039
2040         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2041                 D_ASSERT(mdev->ldev->backing_bdev);
2042                 d_size = drbd_get_max_capacity(mdev->ldev);
2043                 u_size = mdev->ldev->dc.disk_size;
2044                 q_order_type = drbd_queue_order_type(mdev);
2045                 put_ldev(mdev);
2046         } else {
2047                 d_size = 0;
2048                 u_size = 0;
2049                 q_order_type = QUEUE_ORDERED_NONE;
2050         }
2051
2052         p.d_size = cpu_to_be64(d_size);
2053         p.u_size = cpu_to_be64(u_size);
2054         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2055         p.max_bio_size = cpu_to_be32(queue_max_hw_sectors(mdev->rq_queue) << 9);
2056         p.queue_order_type = cpu_to_be16(q_order_type);
2057         p.dds_flags = cpu_to_be16(flags);
2058
2059         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2060                            (struct p_header80 *)&p, sizeof(p));
2061         return ok;
2062 }
2063
2064 /**
2065  * drbd_send_state() - Sends the drbd state to the peer
2066  * @mdev:       DRBD device.
2067  */
2068 int drbd_send_state(struct drbd_conf *mdev)
2069 {
2070         struct socket *sock;
2071         struct p_state p;
2072         int ok = 0;
2073
2074         /* Grab state lock so we wont send state if we're in the middle
2075          * of a cluster wide state change on another thread */
2076         drbd_state_lock(mdev);
2077
2078         mutex_lock(&mdev->data.mutex);
2079
2080         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2081         sock = mdev->data.socket;
2082
2083         if (likely(sock != NULL)) {
2084                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
2085                                     (struct p_header80 *)&p, sizeof(p), 0);
2086         }
2087
2088         mutex_unlock(&mdev->data.mutex);
2089
2090         drbd_state_unlock(mdev);
2091         return ok;
2092 }
2093
2094 int drbd_send_state_req(struct drbd_conf *mdev,
2095         union drbd_state mask, union drbd_state val)
2096 {
2097         struct p_req_state p;
2098
2099         p.mask    = cpu_to_be32(mask.i);
2100         p.val     = cpu_to_be32(val.i);
2101
2102         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2103                              (struct p_header80 *)&p, sizeof(p));
2104 }
2105
2106 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2107 {
2108         struct p_req_state_reply p;
2109
2110         p.retcode    = cpu_to_be32(retcode);
2111
2112         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2113                              (struct p_header80 *)&p, sizeof(p));
2114 }
2115
2116 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2117         struct p_compressed_bm *p,
2118         struct bm_xfer_ctx *c)
2119 {
2120         struct bitstream bs;
2121         unsigned long plain_bits;
2122         unsigned long tmp;
2123         unsigned long rl;
2124         unsigned len;
2125         unsigned toggle;
2126         int bits;
2127
2128         /* may we use this feature? */
2129         if ((mdev->sync_conf.use_rle == 0) ||
2130                 (mdev->agreed_pro_version < 90))
2131                         return 0;
2132
2133         if (c->bit_offset >= c->bm_bits)
2134                 return 0; /* nothing to do. */
2135
2136         /* use at most thus many bytes */
2137         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2138         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2139         /* plain bits covered in this code string */
2140         plain_bits = 0;
2141
2142         /* p->encoding & 0x80 stores whether the first run length is set.
2143          * bit offset is implicit.
2144          * start with toggle == 2 to be able to tell the first iteration */
2145         toggle = 2;
2146
2147         /* see how much plain bits we can stuff into one packet
2148          * using RLE and VLI. */
2149         do {
2150                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2151                                     : _drbd_bm_find_next(mdev, c->bit_offset);
2152                 if (tmp == -1UL)
2153                         tmp = c->bm_bits;
2154                 rl = tmp - c->bit_offset;
2155
2156                 if (toggle == 2) { /* first iteration */
2157                         if (rl == 0) {
2158                                 /* the first checked bit was set,
2159                                  * store start value, */
2160                                 DCBP_set_start(p, 1);
2161                                 /* but skip encoding of zero run length */
2162                                 toggle = !toggle;
2163                                 continue;
2164                         }
2165                         DCBP_set_start(p, 0);
2166                 }
2167
2168                 /* paranoia: catch zero runlength.
2169                  * can only happen if bitmap is modified while we scan it. */
2170                 if (rl == 0) {
2171                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2172                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2173                         return -1;
2174                 }
2175
2176                 bits = vli_encode_bits(&bs, rl);
2177                 if (bits == -ENOBUFS) /* buffer full */
2178                         break;
2179                 if (bits <= 0) {
2180                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2181                         return 0;
2182                 }
2183
2184                 toggle = !toggle;
2185                 plain_bits += rl;
2186                 c->bit_offset = tmp;
2187         } while (c->bit_offset < c->bm_bits);
2188
2189         len = bs.cur.b - p->code + !!bs.cur.bit;
2190
2191         if (plain_bits < (len << 3)) {
2192                 /* incompressible with this method.
2193                  * we need to rewind both word and bit position. */
2194                 c->bit_offset -= plain_bits;
2195                 bm_xfer_ctx_bit_to_word_offset(c);
2196                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2197                 return 0;
2198         }
2199
2200         /* RLE + VLI was able to compress it just fine.
2201          * update c->word_offset. */
2202         bm_xfer_ctx_bit_to_word_offset(c);
2203
2204         /* store pad_bits */
2205         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2206
2207         return len;
2208 }
2209
2210 /**
2211  * send_bitmap_rle_or_plain
2212  *
2213  * Return 0 when done, 1 when another iteration is needed, and a negative error
2214  * code upon failure.
2215  */
2216 static int
2217 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2218                          struct p_header80 *h, struct bm_xfer_ctx *c)
2219 {
2220         struct p_compressed_bm *p = (void*)h;
2221         unsigned long num_words;
2222         int len;
2223         int ok;
2224
2225         len = fill_bitmap_rle_bits(mdev, p, c);
2226
2227         if (len < 0)
2228                 return -EIO;
2229
2230         if (len) {
2231                 DCBP_set_code(p, RLE_VLI_Bits);
2232                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2233                         sizeof(*p) + len, 0);
2234
2235                 c->packets[0]++;
2236                 c->bytes[0] += sizeof(*p) + len;
2237
2238                 if (c->bit_offset >= c->bm_bits)
2239                         len = 0; /* DONE */
2240         } else {
2241                 /* was not compressible.
2242                  * send a buffer full of plain text bits instead. */
2243                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2244                 len = num_words * sizeof(long);
2245                 if (len)
2246                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2247                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2248                                    h, sizeof(struct p_header80) + len, 0);
2249                 c->word_offset += num_words;
2250                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2251
2252                 c->packets[1]++;
2253                 c->bytes[1] += sizeof(struct p_header80) + len;
2254
2255                 if (c->bit_offset > c->bm_bits)
2256                         c->bit_offset = c->bm_bits;
2257         }
2258         if (ok) {
2259                 if (len == 0) {
2260                         INFO_bm_xfer_stats(mdev, "send", c);
2261                         return 0;
2262                 } else
2263                         return 1;
2264         }
2265         return -EIO;
2266 }
2267
2268 /* See the comment at receive_bitmap() */
2269 int _drbd_send_bitmap(struct drbd_conf *mdev)
2270 {
2271         struct bm_xfer_ctx c;
2272         struct p_header80 *p;
2273         int err;
2274
2275         ERR_IF(!mdev->bitmap) return false;
2276
2277         /* maybe we should use some per thread scratch page,
2278          * and allocate that during initial device creation? */
2279         p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2280         if (!p) {
2281                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2282                 return false;
2283         }
2284
2285         if (get_ldev(mdev)) {
2286                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2287                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2288                         drbd_bm_set_all(mdev);
2289                         if (drbd_bm_write(mdev)) {
2290                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2291                                  * but otherwise process as per normal - need to tell other
2292                                  * side that a full resync is required! */
2293                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2294                         } else {
2295                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2296                                 drbd_md_sync(mdev);
2297                         }
2298                 }
2299                 put_ldev(mdev);
2300         }
2301
2302         c = (struct bm_xfer_ctx) {
2303                 .bm_bits = drbd_bm_bits(mdev),
2304                 .bm_words = drbd_bm_words(mdev),
2305         };
2306
2307         do {
2308                 err = send_bitmap_rle_or_plain(mdev, p, &c);
2309         } while (err > 0);
2310
2311         free_page((unsigned long) p);
2312         return err == 0;
2313 }
2314
2315 int drbd_send_bitmap(struct drbd_conf *mdev)
2316 {
2317         int err;
2318
2319         if (!drbd_get_data_sock(mdev))
2320                 return -1;
2321         err = !_drbd_send_bitmap(mdev);
2322         drbd_put_data_sock(mdev);
2323         return err;
2324 }
2325
2326 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2327 {
2328         int ok;
2329         struct p_barrier_ack p;
2330
2331         p.barrier  = barrier_nr;
2332         p.set_size = cpu_to_be32(set_size);
2333
2334         if (mdev->state.conn < C_CONNECTED)
2335                 return false;
2336         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2337                         (struct p_header80 *)&p, sizeof(p));
2338         return ok;
2339 }
2340
2341 /**
2342  * _drbd_send_ack() - Sends an ack packet
2343  * @mdev:       DRBD device.
2344  * @cmd:        Packet command code.
2345  * @sector:     sector, needs to be in big endian byte order
2346  * @blksize:    size in byte, needs to be in big endian byte order
2347  * @block_id:   Id, big endian byte order
2348  */
2349 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2350                           u64 sector,
2351                           u32 blksize,
2352                           u64 block_id)
2353 {
2354         int ok;
2355         struct p_block_ack p;
2356
2357         p.sector   = sector;
2358         p.block_id = block_id;
2359         p.blksize  = blksize;
2360         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2361
2362         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2363                 return false;
2364         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2365                                 (struct p_header80 *)&p, sizeof(p));
2366         return ok;
2367 }
2368
2369 /* dp->sector and dp->block_id already/still in network byte order,
2370  * data_size is payload size according to dp->head,
2371  * and may need to be corrected for digest size. */
2372 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2373                      struct p_data *dp, int data_size)
2374 {
2375         data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2376                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2377         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2378                               dp->block_id);
2379 }
2380
2381 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2382                      struct p_block_req *rp)
2383 {
2384         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2385 }
2386
2387 /**
2388  * drbd_send_ack() - Sends an ack packet
2389  * @mdev:       DRBD device.
2390  * @cmd:        Packet command code.
2391  * @e:          Epoch entry.
2392  */
2393 int drbd_send_ack(struct drbd_conf *mdev,
2394         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2395 {
2396         return _drbd_send_ack(mdev, cmd,
2397                               cpu_to_be64(e->sector),
2398                               cpu_to_be32(e->size),
2399                               e->block_id);
2400 }
2401
2402 /* This function misuses the block_id field to signal if the blocks
2403  * are is sync or not. */
2404 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2405                      sector_t sector, int blksize, u64 block_id)
2406 {
2407         return _drbd_send_ack(mdev, cmd,
2408                               cpu_to_be64(sector),
2409                               cpu_to_be32(blksize),
2410                               cpu_to_be64(block_id));
2411 }
2412
2413 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2414                        sector_t sector, int size, u64 block_id)
2415 {
2416         int ok;
2417         struct p_block_req p;
2418
2419         p.sector   = cpu_to_be64(sector);
2420         p.block_id = block_id;
2421         p.blksize  = cpu_to_be32(size);
2422
2423         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2424                                 (struct p_header80 *)&p, sizeof(p));
2425         return ok;
2426 }
2427
2428 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2429                             sector_t sector, int size,
2430                             void *digest, int digest_size,
2431                             enum drbd_packets cmd)
2432 {
2433         int ok;
2434         struct p_block_req p;
2435
2436         p.sector   = cpu_to_be64(sector);
2437         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2438         p.blksize  = cpu_to_be32(size);
2439
2440         p.head.magic   = BE_DRBD_MAGIC;
2441         p.head.command = cpu_to_be16(cmd);
2442         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2443
2444         mutex_lock(&mdev->data.mutex);
2445
2446         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2447         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2448
2449         mutex_unlock(&mdev->data.mutex);
2450
2451         return ok;
2452 }
2453
2454 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2455 {
2456         int ok;
2457         struct p_block_req p;
2458
2459         p.sector   = cpu_to_be64(sector);
2460         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2461         p.blksize  = cpu_to_be32(size);
2462
2463         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2464                            (struct p_header80 *)&p, sizeof(p));
2465         return ok;
2466 }
2467
2468 /* called on sndtimeo
2469  * returns false if we should retry,
2470  * true if we think connection is dead
2471  */
2472 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2473 {
2474         int drop_it;
2475         /* long elapsed = (long)(jiffies - mdev->last_received); */
2476
2477         drop_it =   mdev->meta.socket == sock
2478                 || !mdev->asender.task
2479                 || get_t_state(&mdev->asender) != Running
2480                 || mdev->state.conn < C_CONNECTED;
2481
2482         if (drop_it)
2483                 return true;
2484
2485         drop_it = !--mdev->ko_count;
2486         if (!drop_it) {
2487                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2488                        current->comm, current->pid, mdev->ko_count);
2489                 request_ping(mdev);
2490         }
2491
2492         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2493 }
2494
2495 /* The idea of sendpage seems to be to put some kind of reference
2496  * to the page into the skb, and to hand it over to the NIC. In
2497  * this process get_page() gets called.
2498  *
2499  * As soon as the page was really sent over the network put_page()
2500  * gets called by some part of the network layer. [ NIC driver? ]
2501  *
2502  * [ get_page() / put_page() increment/decrement the count. If count
2503  *   reaches 0 the page will be freed. ]
2504  *
2505  * This works nicely with pages from FSs.
2506  * But this means that in protocol A we might signal IO completion too early!
2507  *
2508  * In order not to corrupt data during a resync we must make sure
2509  * that we do not reuse our own buffer pages (EEs) to early, therefore
2510  * we have the net_ee list.
2511  *
2512  * XFS seems to have problems, still, it submits pages with page_count == 0!
2513  * As a workaround, we disable sendpage on pages
2514  * with page_count == 0 or PageSlab.
2515  */
2516 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2517                    int offset, size_t size, unsigned msg_flags)
2518 {
2519         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2520         kunmap(page);
2521         if (sent == size)
2522                 mdev->send_cnt += size>>9;
2523         return sent == size;
2524 }
2525
2526 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2527                     int offset, size_t size, unsigned msg_flags)
2528 {
2529         mm_segment_t oldfs = get_fs();
2530         int sent, ok;
2531         int len = size;
2532
2533         /* e.g. XFS meta- & log-data is in slab pages, which have a
2534          * page_count of 0 and/or have PageSlab() set.
2535          * we cannot use send_page for those, as that does get_page();
2536          * put_page(); and would cause either a VM_BUG directly, or
2537          * __page_cache_release a page that would actually still be referenced
2538          * by someone, leading to some obscure delayed Oops somewhere else. */
2539         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2540                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2541
2542         msg_flags |= MSG_NOSIGNAL;
2543         drbd_update_congested(mdev);
2544         set_fs(KERNEL_DS);
2545         do {
2546                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2547                                                         offset, len,
2548                                                         msg_flags);
2549                 if (sent == -EAGAIN) {
2550                         if (we_should_drop_the_connection(mdev,
2551                                                           mdev->data.socket))
2552                                 break;
2553                         else
2554                                 continue;
2555                 }
2556                 if (sent <= 0) {
2557                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2558                              __func__, (int)size, len, sent);
2559                         break;
2560                 }
2561                 len    -= sent;
2562                 offset += sent;
2563         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2564         set_fs(oldfs);
2565         clear_bit(NET_CONGESTED, &mdev->flags);
2566
2567         ok = (len == 0);
2568         if (likely(ok))
2569                 mdev->send_cnt += size>>9;
2570         return ok;
2571 }
2572
2573 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2574 {
2575         struct bio_vec *bvec;
2576         int i;
2577         /* hint all but last page with MSG_MORE */
2578         __bio_for_each_segment(bvec, bio, i, 0) {
2579                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2580                                      bvec->bv_offset, bvec->bv_len,
2581                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2582                         return 0;
2583         }
2584         return 1;
2585 }
2586
2587 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2588 {
2589         struct bio_vec *bvec;
2590         int i;
2591         /* hint all but last page with MSG_MORE */
2592         __bio_for_each_segment(bvec, bio, i, 0) {
2593                 if (!_drbd_send_page(mdev, bvec->bv_page,
2594                                      bvec->bv_offset, bvec->bv_len,
2595                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2596                         return 0;
2597         }
2598         return 1;
2599 }
2600
2601 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2602 {
2603         struct page *page = e->pages;
2604         unsigned len = e->size;
2605         /* hint all but last page with MSG_MORE */
2606         page_chain_for_each(page) {
2607                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2608                 if (!_drbd_send_page(mdev, page, 0, l,
2609                                 page_chain_next(page) ? MSG_MORE : 0))
2610                         return 0;
2611                 len -= l;
2612         }
2613         return 1;
2614 }
2615
2616 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2617 {
2618         if (mdev->agreed_pro_version >= 95)
2619                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2620                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2621                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2622                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2623         else
2624                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2625 }
2626
2627 /* Used to send write requests
2628  * R_PRIMARY -> Peer    (P_DATA)
2629  */
2630 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2631 {
2632         int ok = 1;
2633         struct p_data p;
2634         unsigned int dp_flags = 0;
2635         void *dgb;
2636         int dgs;
2637
2638         if (!drbd_get_data_sock(mdev))
2639                 return 0;
2640
2641         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2642                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2643
2644         if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2645                 p.head.h80.magic   = BE_DRBD_MAGIC;
2646                 p.head.h80.command = cpu_to_be16(P_DATA);
2647                 p.head.h80.length  =
2648                         cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2649         } else {
2650                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2651                 p.head.h95.command = cpu_to_be16(P_DATA);
2652                 p.head.h95.length  =
2653                         cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2654         }
2655
2656         p.sector   = cpu_to_be64(req->sector);
2657         p.block_id = (unsigned long)req;
2658         p.seq_num  = cpu_to_be32(req->seq_num =
2659                                  atomic_add_return(1, &mdev->packet_seq));
2660
2661         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2662
2663         if (mdev->state.conn >= C_SYNC_SOURCE &&
2664             mdev->state.conn <= C_PAUSED_SYNC_T)
2665                 dp_flags |= DP_MAY_SET_IN_SYNC;
2666
2667         p.dp_flags = cpu_to_be32(dp_flags);
2668         set_bit(UNPLUG_REMOTE, &mdev->flags);
2669         ok = (sizeof(p) ==
2670                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2671         if (ok && dgs) {
2672                 dgb = mdev->int_dig_out;
2673                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2674                 ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2675         }
2676         if (ok) {
2677                 /* For protocol A, we have to memcpy the payload into
2678                  * socket buffers, as we may complete right away
2679                  * as soon as we handed it over to tcp, at which point the data
2680                  * pages may become invalid.
2681                  *
2682                  * For data-integrity enabled, we copy it as well, so we can be
2683                  * sure that even if the bio pages may still be modified, it
2684                  * won't change the data on the wire, thus if the digest checks
2685                  * out ok after sending on this side, but does not fit on the
2686                  * receiving side, we sure have detected corruption elsewhere.
2687                  */
2688                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2689                         ok = _drbd_send_bio(mdev, req->master_bio);
2690                 else
2691                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2692
2693                 /* double check digest, sometimes buffers have been modified in flight. */
2694                 if (dgs > 0 && dgs <= 64) {
2695                         /* 64 byte, 512 bit, is the larges digest size
2696                          * currently supported in kernel crypto. */
2697                         unsigned char digest[64];
2698                         drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2699                         if (memcmp(mdev->int_dig_out, digest, dgs)) {
2700                                 dev_warn(DEV,
2701                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2702                                         (unsigned long long)req->sector, req->size);
2703                         }
2704                 } /* else if (dgs > 64) {
2705                      ... Be noisy about digest too large ...
2706                 } */
2707         }
2708
2709         drbd_put_data_sock(mdev);
2710
2711         return ok;
2712 }
2713
2714 /* answer packet, used to send data back for read requests:
2715  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2716  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2717  */
2718 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2719                     struct drbd_epoch_entry *e)
2720 {
2721         int ok;
2722         struct p_data p;
2723         void *dgb;
2724         int dgs;
2725
2726         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2727                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2728
2729         if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2730                 p.head.h80.magic   = BE_DRBD_MAGIC;
2731                 p.head.h80.command = cpu_to_be16(cmd);
2732                 p.head.h80.length  =
2733                         cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2734         } else {
2735                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2736                 p.head.h95.command = cpu_to_be16(cmd);
2737                 p.head.h95.length  =
2738                         cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2739         }
2740
2741         p.sector   = cpu_to_be64(e->sector);
2742         p.block_id = e->block_id;
2743         /* p.seq_num  = 0;    No sequence numbers here.. */
2744
2745         /* Only called by our kernel thread.
2746          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2747          * in response to admin command or module unload.
2748          */
2749         if (!drbd_get_data_sock(mdev))
2750                 return 0;
2751
2752         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2753         if (ok && dgs) {
2754                 dgb = mdev->int_dig_out;
2755                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2756                 ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2757         }
2758         if (ok)
2759                 ok = _drbd_send_zc_ee(mdev, e);
2760
2761         drbd_put_data_sock(mdev);
2762
2763         return ok;
2764 }
2765
2766 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2767 {
2768         struct p_block_desc p;
2769
2770         p.sector  = cpu_to_be64(req->sector);
2771         p.blksize = cpu_to_be32(req->size);
2772
2773         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2774 }
2775
2776 /*
2777   drbd_send distinguishes two cases:
2778
2779   Packets sent via the data socket "sock"
2780   and packets sent via the meta data socket "msock"
2781
2782                     sock                      msock
2783   -----------------+-------------------------+------------------------------
2784   timeout           conf.timeout / 2          conf.timeout / 2
2785   timeout action    send a ping via msock     Abort communication
2786                                               and close all sockets
2787 */
2788
2789 /*
2790  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2791  */
2792 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2793               void *buf, size_t size, unsigned msg_flags)
2794 {
2795         struct kvec iov;
2796         struct msghdr msg;
2797         int rv, sent = 0;
2798
2799         if (!sock)
2800                 return -1000;
2801
2802         /* THINK  if (signal_pending) return ... ? */
2803
2804         iov.iov_base = buf;
2805         iov.iov_len  = size;
2806
2807         msg.msg_name       = NULL;
2808         msg.msg_namelen    = 0;
2809         msg.msg_control    = NULL;
2810         msg.msg_controllen = 0;
2811         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2812
2813         if (sock == mdev->data.socket) {
2814                 mdev->ko_count = mdev->net_conf->ko_count;
2815                 drbd_update_congested(mdev);
2816         }
2817         do {
2818                 /* STRANGE
2819                  * tcp_sendmsg does _not_ use its size parameter at all ?
2820                  *
2821                  * -EAGAIN on timeout, -EINTR on signal.
2822                  */
2823 /* THINK
2824  * do we need to block DRBD_SIG if sock == &meta.socket ??
2825  * otherwise wake_asender() might interrupt some send_*Ack !
2826  */
2827                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2828                 if (rv == -EAGAIN) {
2829                         if (we_should_drop_the_connection(mdev, sock))
2830                                 break;
2831                         else
2832                                 continue;
2833                 }
2834                 D_ASSERT(rv != 0);
2835                 if (rv == -EINTR) {
2836                         flush_signals(current);
2837                         rv = 0;
2838                 }
2839                 if (rv < 0)
2840                         break;
2841                 sent += rv;
2842                 iov.iov_base += rv;
2843                 iov.iov_len  -= rv;
2844         } while (sent < size);
2845
2846         if (sock == mdev->data.socket)
2847                 clear_bit(NET_CONGESTED, &mdev->flags);
2848
2849         if (rv <= 0) {
2850                 if (rv != -EAGAIN) {
2851                         dev_err(DEV, "%s_sendmsg returned %d\n",
2852                             sock == mdev->meta.socket ? "msock" : "sock",
2853                             rv);
2854                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2855                 } else
2856                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2857         }
2858
2859         return sent;
2860 }
2861
2862 static int drbd_open(struct block_device *bdev, fmode_t mode)
2863 {
2864         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2865         unsigned long flags;
2866         int rv = 0;
2867
2868         mutex_lock(&drbd_main_mutex);
2869         spin_lock_irqsave(&mdev->req_lock, flags);
2870         /* to have a stable mdev->state.role
2871          * and no race with updating open_cnt */
2872
2873         if (mdev->state.role != R_PRIMARY) {
2874                 if (mode & FMODE_WRITE)
2875                         rv = -EROFS;
2876                 else if (!allow_oos)
2877                         rv = -EMEDIUMTYPE;
2878         }
2879
2880         if (!rv)
2881                 mdev->open_cnt++;
2882         spin_unlock_irqrestore(&mdev->req_lock, flags);
2883         mutex_unlock(&drbd_main_mutex);
2884
2885         return rv;
2886 }
2887
2888 static int drbd_release(struct gendisk *gd, fmode_t mode)
2889 {
2890         struct drbd_conf *mdev = gd->private_data;
2891         mutex_lock(&drbd_main_mutex);
2892         mdev->open_cnt--;
2893         mutex_unlock(&drbd_main_mutex);
2894         return 0;
2895 }
2896
2897 static void drbd_set_defaults(struct drbd_conf *mdev)
2898 {
2899         /* This way we get a compile error when sync_conf grows,
2900            and we forgot to initialize it here */
2901         mdev->sync_conf = (struct syncer_conf) {
2902                 /* .rate = */           DRBD_RATE_DEF,
2903                 /* .after = */          DRBD_AFTER_DEF,
2904                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2905                 /* .verify_alg = */     {}, 0,
2906                 /* .cpu_mask = */       {}, 0,
2907                 /* .csums_alg = */      {}, 0,
2908                 /* .use_rle = */        0,
2909                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2910                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2911                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2912                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2913                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2914                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2915         };
2916
2917         /* Have to use that way, because the layout differs between
2918            big endian and little endian */
2919         mdev->state = (union drbd_state) {
2920                 { .role = R_SECONDARY,
2921                   .peer = R_UNKNOWN,
2922                   .conn = C_STANDALONE,
2923                   .disk = D_DISKLESS,
2924                   .pdsk = D_UNKNOWN,
2925                   .susp = 0,
2926                   .susp_nod = 0,
2927                   .susp_fen = 0
2928                 } };
2929 }
2930
2931 void drbd_init_set_defaults(struct drbd_conf *mdev)
2932 {
2933         /* the memset(,0,) did most of this.
2934          * note: only assignments, no allocation in here */
2935
2936         drbd_set_defaults(mdev);
2937
2938         atomic_set(&mdev->ap_bio_cnt, 0);
2939         atomic_set(&mdev->ap_pending_cnt, 0);
2940         atomic_set(&mdev->rs_pending_cnt, 0);
2941         atomic_set(&mdev->unacked_cnt, 0);
2942         atomic_set(&mdev->local_cnt, 0);
2943         atomic_set(&mdev->net_cnt, 0);
2944         atomic_set(&mdev->packet_seq, 0);
2945         atomic_set(&mdev->pp_in_use, 0);
2946         atomic_set(&mdev->pp_in_use_by_net, 0);
2947         atomic_set(&mdev->rs_sect_in, 0);
2948         atomic_set(&mdev->rs_sect_ev, 0);
2949         atomic_set(&mdev->ap_in_flight, 0);
2950
2951         mutex_init(&mdev->md_io_mutex);
2952         mutex_init(&mdev->data.mutex);
2953         mutex_init(&mdev->meta.mutex);
2954         sema_init(&mdev->data.work.s, 0);
2955         sema_init(&mdev->meta.work.s, 0);
2956         mutex_init(&mdev->state_mutex);
2957
2958         spin_lock_init(&mdev->data.work.q_lock);
2959         spin_lock_init(&mdev->meta.work.q_lock);
2960
2961         spin_lock_init(&mdev->al_lock);
2962         spin_lock_init(&mdev->req_lock);
2963         spin_lock_init(&mdev->peer_seq_lock);
2964         spin_lock_init(&mdev->epoch_lock);
2965
2966         INIT_LIST_HEAD(&mdev->active_ee);
2967         INIT_LIST_HEAD(&mdev->sync_ee);
2968         INIT_LIST_HEAD(&mdev->done_ee);
2969         INIT_LIST_HEAD(&mdev->read_ee);
2970         INIT_LIST_HEAD(&mdev->net_ee);
2971         INIT_LIST_HEAD(&mdev->resync_reads);
2972         INIT_LIST_HEAD(&mdev->data.work.q);
2973         INIT_LIST_HEAD(&mdev->meta.work.q);
2974         INIT_LIST_HEAD(&mdev->resync_work.list);
2975         INIT_LIST_HEAD(&mdev->unplug_work.list);
2976         INIT_LIST_HEAD(&mdev->go_diskless.list);
2977         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2978         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2979         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2980
2981         mdev->resync_work.cb  = w_resync_inactive;
2982         mdev->unplug_work.cb  = w_send_write_hint;
2983         mdev->go_diskless.cb  = w_go_diskless;
2984         mdev->md_sync_work.cb = w_md_sync;
2985         mdev->bm_io_work.w.cb = w_bitmap_io;
2986         init_timer(&mdev->resync_timer);
2987         init_timer(&mdev->md_sync_timer);
2988         mdev->resync_timer.function = resync_timer_fn;
2989         mdev->resync_timer.data = (unsigned long) mdev;
2990         mdev->md_sync_timer.function = md_sync_timer_fn;
2991         mdev->md_sync_timer.data = (unsigned long) mdev;
2992
2993         init_waitqueue_head(&mdev->misc_wait);
2994         init_waitqueue_head(&mdev->state_wait);
2995         init_waitqueue_head(&mdev->net_cnt_wait);
2996         init_waitqueue_head(&mdev->ee_wait);
2997         init_waitqueue_head(&mdev->al_wait);
2998         init_waitqueue_head(&mdev->seq_wait);
2999
3000         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3001         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3002         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3003
3004         mdev->agreed_pro_version = PRO_VERSION_MAX;
3005         mdev->write_ordering = WO_bdev_flush;
3006         mdev->resync_wenr = LC_FREE;
3007 }
3008
3009 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3010 {
3011         int i;
3012         if (mdev->receiver.t_state != None)
3013                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3014                                 mdev->receiver.t_state);
3015
3016         /* no need to lock it, I'm the only thread alive */
3017         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3018                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3019         mdev->al_writ_cnt  =
3020         mdev->bm_writ_cnt  =
3021         mdev->read_cnt     =
3022         mdev->recv_cnt     =
3023         mdev->send_cnt     =
3024         mdev->writ_cnt     =
3025         mdev->p_size       =
3026         mdev->rs_start     =
3027         mdev->rs_total     =
3028         mdev->rs_failed    = 0;
3029         mdev->rs_last_events = 0;
3030         mdev->rs_last_sect_ev = 0;
3031         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3032                 mdev->rs_mark_left[i] = 0;
3033                 mdev->rs_mark_time[i] = 0;
3034         }
3035         D_ASSERT(mdev->net_conf == NULL);
3036
3037         drbd_set_my_capacity(mdev, 0);
3038         if (mdev->bitmap) {
3039                 /* maybe never allocated. */
3040                 drbd_bm_resize(mdev, 0, 1);
3041                 drbd_bm_cleanup(mdev);
3042         }
3043
3044         drbd_free_resources(mdev);
3045         clear_bit(AL_SUSPENDED, &mdev->flags);
3046
3047         /*
3048          * currently we drbd_init_ee only on module load, so
3049          * we may do drbd_release_ee only on module unload!
3050          */
3051         D_ASSERT(list_empty(&mdev->active_ee));
3052         D_ASSERT(list_empty(&mdev->sync_ee));
3053         D_ASSERT(list_empty(&mdev->done_ee));
3054         D_ASSERT(list_empty(&mdev->read_ee));
3055         D_ASSERT(list_empty(&mdev->net_ee));
3056         D_ASSERT(list_empty(&mdev->resync_reads));
3057         D_ASSERT(list_empty(&mdev->data.work.q));
3058         D_ASSERT(list_empty(&mdev->meta.work.q));
3059         D_ASSERT(list_empty(&mdev->resync_work.list));
3060         D_ASSERT(list_empty(&mdev->unplug_work.list));
3061         D_ASSERT(list_empty(&mdev->go_diskless.list));
3062
3063         drbd_set_defaults(mdev);
3064 }
3065
3066
3067 static void drbd_destroy_mempools(void)
3068 {
3069         struct page *page;
3070
3071         while (drbd_pp_pool) {
3072                 page = drbd_pp_pool;
3073                 drbd_pp_pool = (struct page *)page_private(page);
3074                 __free_page(page);
3075                 drbd_pp_vacant--;
3076         }
3077
3078         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3079
3080         if (drbd_ee_mempool)
3081                 mempool_destroy(drbd_ee_mempool);
3082         if (drbd_request_mempool)
3083                 mempool_destroy(drbd_request_mempool);
3084         if (drbd_ee_cache)
3085                 kmem_cache_destroy(drbd_ee_cache);
3086         if (drbd_request_cache)
3087                 kmem_cache_destroy(drbd_request_cache);
3088         if (drbd_bm_ext_cache)
3089                 kmem_cache_destroy(drbd_bm_ext_cache);
3090         if (drbd_al_ext_cache)
3091                 kmem_cache_destroy(drbd_al_ext_cache);
3092
3093         drbd_ee_mempool      = NULL;
3094         drbd_request_mempool = NULL;
3095         drbd_ee_cache        = NULL;
3096         drbd_request_cache   = NULL;
3097         drbd_bm_ext_cache    = NULL;
3098         drbd_al_ext_cache    = NULL;
3099
3100         return;
3101 }
3102
3103 static int drbd_create_mempools(void)
3104 {
3105         struct page *page;
3106         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3107         int i;
3108
3109         /* prepare our caches and mempools */
3110         drbd_request_mempool = NULL;
3111         drbd_ee_cache        = NULL;
3112         drbd_request_cache   = NULL;
3113         drbd_bm_ext_cache    = NULL;
3114         drbd_al_ext_cache    = NULL;
3115         drbd_pp_pool         = NULL;
3116
3117         /* caches */
3118         drbd_request_cache = kmem_cache_create(
3119                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3120         if (drbd_request_cache == NULL)
3121                 goto Enomem;
3122
3123         drbd_ee_cache = kmem_cache_create(
3124                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3125         if (drbd_ee_cache == NULL)
3126                 goto Enomem;
3127
3128         drbd_bm_ext_cache = kmem_cache_create(
3129                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3130         if (drbd_bm_ext_cache == NULL)
3131                 goto Enomem;
3132
3133         drbd_al_ext_cache = kmem_cache_create(
3134                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3135         if (drbd_al_ext_cache == NULL)
3136                 goto Enomem;
3137
3138         /* mempools */
3139         drbd_request_mempool = mempool_create(number,
3140                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3141         if (drbd_request_mempool == NULL)
3142                 goto Enomem;
3143
3144         drbd_ee_mempool = mempool_create(number,
3145                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3146         if (drbd_ee_mempool == NULL)
3147                 goto Enomem;
3148
3149         /* drbd's page pool */
3150         spin_lock_init(&drbd_pp_lock);
3151
3152         for (i = 0; i < number; i++) {
3153                 page = alloc_page(GFP_HIGHUSER);
3154                 if (!page)
3155                         goto Enomem;
3156                 set_page_private(page, (unsigned long)drbd_pp_pool);
3157                 drbd_pp_pool = page;
3158         }
3159         drbd_pp_vacant = number;
3160
3161         return 0;
3162
3163 Enomem:
3164         drbd_destroy_mempools(); /* in case we allocated some */
3165         return -ENOMEM;
3166 }
3167
3168 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3169         void *unused)
3170 {
3171         /* just so we have it.  you never know what interesting things we
3172          * might want to do here some day...
3173          */
3174
3175         return NOTIFY_DONE;
3176 }
3177
3178 static struct notifier_block drbd_notifier = {
3179         .notifier_call = drbd_notify_sys,
3180 };
3181
3182 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3183 {
3184         int rr;
3185
3186         rr = drbd_release_ee(mdev, &mdev->active_ee);
3187         if (rr)
3188                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3189
3190         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3191         if (rr)
3192                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3193
3194         rr = drbd_release_ee(mdev, &mdev->read_ee);
3195         if (rr)
3196                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3197
3198         rr = drbd_release_ee(mdev, &mdev->done_ee);
3199         if (rr)
3200                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3201
3202         rr = drbd_release_ee(mdev, &mdev->net_ee);
3203         if (rr)
3204                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3205 }
3206
3207 /* caution. no locking.
3208  * currently only used from module cleanup code. */
3209 static void drbd_delete_device(unsigned int minor)
3210 {
3211         struct drbd_conf *mdev = minor_to_mdev(minor);
3212
3213         if (!mdev)
3214                 return;
3215
3216         /* paranoia asserts */
3217         if (mdev->open_cnt != 0)
3218                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3219                                 __FILE__ , __LINE__);
3220
3221         ERR_IF (!list_empty(&mdev->data.work.q)) {
3222                 struct list_head *lp;
3223                 list_for_each(lp, &mdev->data.work.q) {
3224                         dev_err(DEV, "lp = %p\n", lp);
3225                 }
3226         };
3227         /* end paranoia asserts */
3228
3229         del_gendisk(mdev->vdisk);
3230
3231         /* cleanup stuff that may have been allocated during
3232          * device (re-)configuration or state changes */
3233
3234         if (mdev->this_bdev)
3235                 bdput(mdev->this_bdev);
3236
3237         drbd_free_resources(mdev);
3238
3239         drbd_release_ee_lists(mdev);
3240
3241         /* should be free'd on disconnect? */
3242         kfree(mdev->ee_hash);
3243         /*
3244         mdev->ee_hash_s = 0;
3245         mdev->ee_hash = NULL;
3246         */
3247
3248         lc_destroy(mdev->act_log);
3249         lc_destroy(mdev->resync);
3250
3251         kfree(mdev->p_uuid);
3252         /* mdev->p_uuid = NULL; */
3253
3254         kfree(mdev->int_dig_out);
3255         kfree(mdev->int_dig_in);
3256         kfree(mdev->int_dig_vv);
3257
3258         /* cleanup the rest that has been
3259          * allocated from drbd_new_device
3260          * and actually free the mdev itself */
3261         drbd_free_mdev(mdev);
3262 }
3263
3264 static void drbd_cleanup(void)
3265 {
3266         unsigned int i;
3267
3268         unregister_reboot_notifier(&drbd_notifier);
3269
3270         /* first remove proc,
3271          * drbdsetup uses it's presence to detect
3272          * whether DRBD is loaded.
3273          * If we would get stuck in proc removal,
3274          * but have netlink already deregistered,
3275          * some drbdsetup commands may wait forever
3276          * for an answer.
3277          */
3278         if (drbd_proc)
3279                 remove_proc_entry("drbd", NULL);
3280
3281         drbd_nl_cleanup();
3282
3283         if (minor_table) {
3284                 i = minor_count;
3285                 while (i--)
3286                         drbd_delete_device(i);
3287                 drbd_destroy_mempools();
3288         }
3289
3290         kfree(minor_table);
3291
3292         unregister_blkdev(DRBD_MAJOR, "drbd");
3293
3294         printk(KERN_INFO "drbd: module cleanup done.\n");
3295 }
3296
3297 /**
3298  * drbd_congested() - Callback for pdflush
3299  * @congested_data:     User data
3300  * @bdi_bits:           Bits pdflush is currently interested in
3301  *
3302  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3303  */
3304 static int drbd_congested(void *congested_data, int bdi_bits)
3305 {
3306         struct drbd_conf *mdev = congested_data;
3307         struct request_queue *q;
3308         char reason = '-';
3309         int r = 0;
3310
3311         if (!may_inc_ap_bio(mdev)) {
3312                 /* DRBD has frozen IO */
3313                 r = bdi_bits;
3314                 reason = 'd';
3315                 goto out;
3316         }
3317
3318         if (get_ldev(mdev)) {
3319                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3320                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3321                 put_ldev(mdev);
3322                 if (r)
3323                         reason = 'b';
3324         }
3325
3326         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3327                 r |= (1 << BDI_async_congested);
3328                 reason = reason == 'b' ? 'a' : 'n';
3329         }
3330
3331 out:
3332         mdev->congestion_reason = reason;
3333         return r;
3334 }
3335
3336 struct drbd_conf *drbd_new_device(unsigned int minor)
3337 {
3338         struct drbd_conf *mdev;
3339         struct gendisk *disk;
3340         struct request_queue *q;
3341
3342         /* GFP_KERNEL, we are outside of all write-out paths */
3343         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3344         if (!mdev)
3345                 return NULL;
3346         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3347                 goto out_no_cpumask;
3348
3349         mdev->minor = minor;
3350
3351         drbd_init_set_defaults(mdev);
3352
3353         q = blk_alloc_queue(GFP_KERNEL);
3354         if (!q)
3355                 goto out_no_q;
3356         mdev->rq_queue = q;
3357         q->queuedata   = mdev;
3358
3359         disk = alloc_disk(1);
3360         if (!disk)
3361                 goto out_no_disk;
3362         mdev->vdisk = disk;
3363
3364         set_disk_ro(disk, true);
3365
3366         disk->queue = q;
3367         disk->major = DRBD_MAJOR;
3368         disk->first_minor = minor;
3369         disk->fops = &drbd_ops;
3370         sprintf(disk->disk_name, "drbd%d", minor);
3371         disk->private_data = mdev;
3372
3373         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3374         /* we have no partitions. we contain only ourselves. */
3375         mdev->this_bdev->bd_contains = mdev->this_bdev;
3376
3377         q->backing_dev_info.congested_fn = drbd_congested;
3378         q->backing_dev_info.congested_data = mdev;
3379
3380         blk_queue_make_request(q, drbd_make_request);
3381         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE >> 9);
3382         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3383         blk_queue_merge_bvec(q, drbd_merge_bvec);
3384         q->queue_lock = &mdev->req_lock;
3385
3386         mdev->md_io_page = alloc_page(GFP_KERNEL);
3387         if (!mdev->md_io_page)
3388                 goto out_no_io_page;
3389
3390         if (drbd_bm_init(mdev))
3391                 goto out_no_bitmap;
3392         /* no need to lock access, we are still initializing this minor device. */
3393         if (!tl_init(mdev))
3394                 goto out_no_tl;
3395
3396         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3397         if (!mdev->app_reads_hash)
3398                 goto out_no_app_reads;
3399
3400         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3401         if (!mdev->current_epoch)
3402                 goto out_no_epoch;
3403
3404         INIT_LIST_HEAD(&mdev->current_epoch->list);
3405         mdev->epochs = 1;
3406
3407         return mdev;
3408
3409 /* out_whatever_else:
3410         kfree(mdev->current_epoch); */
3411 out_no_epoch:
3412         kfree(mdev->app_reads_hash);
3413 out_no_app_reads:
3414         tl_cleanup(mdev);
3415 out_no_tl:
3416         drbd_bm_cleanup(mdev);
3417 out_no_bitmap:
3418         __free_page(mdev->md_io_page);
3419 out_no_io_page:
3420         put_disk(disk);
3421 out_no_disk:
3422         blk_cleanup_queue(q);
3423 out_no_q:
3424         free_cpumask_var(mdev->cpu_mask);
3425 out_no_cpumask:
3426         kfree(mdev);
3427         return NULL;
3428 }
3429
3430 /* counterpart of drbd_new_device.
3431  * last part of drbd_delete_device. */
3432 void drbd_free_mdev(struct drbd_conf *mdev)
3433 {
3434         kfree(mdev->current_epoch);
3435         kfree(mdev->app_reads_hash);
3436         tl_cleanup(mdev);
3437         if (mdev->bitmap) /* should no longer be there. */
3438                 drbd_bm_cleanup(mdev);
3439         __free_page(mdev->md_io_page);
3440         put_disk(mdev->vdisk);
3441         blk_cleanup_queue(mdev->rq_queue);
3442         free_cpumask_var(mdev->cpu_mask);
3443         drbd_free_tl_hash(mdev);
3444         kfree(mdev);
3445 }
3446
3447
3448 int __init drbd_init(void)
3449 {
3450         int err;
3451
3452         if (sizeof(struct p_handshake) != 80) {
3453                 printk(KERN_ERR
3454                        "drbd: never change the size or layout "
3455                        "of the HandShake packet.\n");
3456                 return -EINVAL;
3457         }
3458
3459         if (1 > minor_count || minor_count > 255) {
3460                 printk(KERN_ERR
3461                         "drbd: invalid minor_count (%d)\n", minor_count);
3462 #ifdef MODULE
3463                 return -EINVAL;
3464 #else
3465                 minor_count = 8;
3466 #endif
3467         }
3468
3469         err = drbd_nl_init();
3470         if (err)
3471                 return err;
3472
3473         err = register_blkdev(DRBD_MAJOR, "drbd");
3474         if (err) {
3475                 printk(KERN_ERR
3476                        "drbd: unable to register block device major %d\n",
3477                        DRBD_MAJOR);
3478                 return err;
3479         }
3480
3481         register_reboot_notifier(&drbd_notifier);
3482
3483         /*
3484          * allocate all necessary structs
3485          */
3486         err = -ENOMEM;
3487
3488         init_waitqueue_head(&drbd_pp_wait);
3489
3490         drbd_proc = NULL; /* play safe for drbd_cleanup */
3491         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3492                                 GFP_KERNEL);
3493         if (!minor_table)
3494                 goto Enomem;
3495
3496         err = drbd_create_mempools();
3497         if (err)
3498                 goto Enomem;
3499
3500         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3501         if (!drbd_proc) {
3502                 printk(KERN_ERR "drbd: unable to register proc file\n");
3503                 goto Enomem;
3504         }
3505
3506         rwlock_init(&global_state_lock);
3507
3508         printk(KERN_INFO "drbd: initialized. "
3509                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3510                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3511         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3512         printk(KERN_INFO "drbd: registered as block device major %d\n",
3513                 DRBD_MAJOR);
3514         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3515
3516         return 0; /* Success! */
3517
3518 Enomem:
3519         drbd_cleanup();
3520         if (err == -ENOMEM)
3521                 /* currently always the case */
3522                 printk(KERN_ERR "drbd: ran out of memory\n");
3523         else
3524                 printk(KERN_ERR "drbd: initialization failure\n");
3525         return err;
3526 }
3527
3528 void drbd_free_bc(struct drbd_backing_dev *ldev)
3529 {
3530         if (ldev == NULL)
3531                 return;
3532
3533         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3534         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3535
3536         kfree(ldev);
3537 }
3538
3539 void drbd_free_sock(struct drbd_conf *mdev)
3540 {
3541         if (mdev->data.socket) {
3542                 mutex_lock(&mdev->data.mutex);
3543                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3544                 sock_release(mdev->data.socket);
3545                 mdev->data.socket = NULL;
3546                 mutex_unlock(&mdev->data.mutex);
3547         }
3548         if (mdev->meta.socket) {
3549                 mutex_lock(&mdev->meta.mutex);
3550                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3551                 sock_release(mdev->meta.socket);
3552                 mdev->meta.socket = NULL;
3553                 mutex_unlock(&mdev->meta.mutex);
3554         }
3555 }
3556
3557
3558 void drbd_free_resources(struct drbd_conf *mdev)
3559 {
3560         crypto_free_hash(mdev->csums_tfm);
3561         mdev->csums_tfm = NULL;
3562         crypto_free_hash(mdev->verify_tfm);
3563         mdev->verify_tfm = NULL;
3564         crypto_free_hash(mdev->cram_hmac_tfm);
3565         mdev->cram_hmac_tfm = NULL;
3566         crypto_free_hash(mdev->integrity_w_tfm);
3567         mdev->integrity_w_tfm = NULL;
3568         crypto_free_hash(mdev->integrity_r_tfm);
3569         mdev->integrity_r_tfm = NULL;
3570
3571         drbd_free_sock(mdev);
3572
3573         __no_warn(local,
3574                   drbd_free_bc(mdev->ldev);
3575                   mdev->ldev = NULL;);
3576 }
3577
3578 /* meta data management */
3579
3580 struct meta_data_on_disk {
3581         u64 la_size;           /* last agreed size. */
3582         u64 uuid[UI_SIZE];   /* UUIDs. */
3583         u64 device_uuid;
3584         u64 reserved_u64_1;
3585         u32 flags;             /* MDF */
3586         u32 magic;
3587         u32 md_size_sect;
3588         u32 al_offset;         /* offset to this block */
3589         u32 al_nr_extents;     /* important for restoring the AL */
3590               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3591         u32 bm_offset;         /* offset to the bitmap, from here */
3592         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3593         u32 reserved_u32[4];
3594
3595 } __packed;
3596
3597 /**
3598  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3599  * @mdev:       DRBD device.
3600  */
3601 void drbd_md_sync(struct drbd_conf *mdev)
3602 {
3603         struct meta_data_on_disk *buffer;
3604         sector_t sector;
3605         int i;
3606
3607         del_timer(&mdev->md_sync_timer);
3608         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3609         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3610                 return;
3611
3612         /* We use here D_FAILED and not D_ATTACHING because we try to write
3613          * metadata even if we detach due to a disk failure! */
3614         if (!get_ldev_if_state(mdev, D_FAILED))
3615                 return;
3616
3617         mutex_lock(&mdev->md_io_mutex);
3618         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3619         memset(buffer, 0, 512);
3620
3621         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3622         for (i = UI_CURRENT; i < UI_SIZE; i++)
3623                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3624         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3625         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3626
3627         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3628         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3629         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3630         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3631         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3632
3633         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3634
3635         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3636         sector = mdev->ldev->md.md_offset;
3637
3638         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3639                 /* this was a try anyways ... */
3640                 dev_err(DEV, "meta data update failed!\n");
3641                 drbd_chk_io_error(mdev, 1, true);
3642         }
3643
3644         /* Update mdev->ldev->md.la_size_sect,
3645          * since we updated it on metadata. */
3646         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3647
3648         mutex_unlock(&mdev->md_io_mutex);
3649         put_ldev(mdev);
3650 }
3651
3652 /**
3653  * drbd_md_read() - Reads in the meta data super block
3654  * @mdev:       DRBD device.
3655  * @bdev:       Device from which the meta data should be read in.
3656  *
3657  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3658  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3659  */
3660 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3661 {
3662         struct meta_data_on_disk *buffer;
3663         int i, rv = NO_ERROR;
3664
3665         if (!get_ldev_if_state(mdev, D_ATTACHING))
3666                 return ERR_IO_MD_DISK;
3667
3668         mutex_lock(&mdev->md_io_mutex);
3669         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3670
3671         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3672                 /* NOTE: cant do normal error processing here as this is
3673                    called BEFORE disk is attached */
3674                 dev_err(DEV, "Error while reading metadata.\n");
3675                 rv = ERR_IO_MD_DISK;
3676                 goto err;
3677         }
3678
3679         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3680                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3681                 rv = ERR_MD_INVALID;
3682                 goto err;
3683         }
3684         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3685                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3686                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3687                 rv = ERR_MD_INVALID;
3688                 goto err;
3689         }
3690         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3691                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3692                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3693                 rv = ERR_MD_INVALID;
3694                 goto err;
3695         }
3696         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3697                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3698                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3699                 rv = ERR_MD_INVALID;
3700                 goto err;
3701         }
3702
3703         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3704                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3705                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3706                 rv = ERR_MD_INVALID;
3707                 goto err;
3708         }
3709
3710         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3711         for (i = UI_CURRENT; i < UI_SIZE; i++)
3712                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3713         bdev->md.flags = be32_to_cpu(buffer->flags);
3714         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3715         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3716
3717         if (mdev->sync_conf.al_extents < 7)
3718                 mdev->sync_conf.al_extents = 127;
3719
3720  err:
3721         mutex_unlock(&mdev->md_io_mutex);
3722         put_ldev(mdev);
3723
3724         return rv;
3725 }
3726
3727 static void debug_drbd_uuid(struct drbd_conf *mdev, enum drbd_uuid_index index)
3728 {
3729         static char *uuid_str[UI_EXTENDED_SIZE] = {
3730                 [UI_CURRENT] = "CURRENT",
3731                 [UI_BITMAP] = "BITMAP",
3732                 [UI_HISTORY_START] = "HISTORY_START",
3733                 [UI_HISTORY_END] = "HISTORY_END",
3734                 [UI_SIZE] = "SIZE",
3735                 [UI_FLAGS] = "FLAGS",
3736         };
3737
3738         if (index >= UI_EXTENDED_SIZE) {
3739                 dev_warn(DEV, " uuid_index >= EXTENDED_SIZE\n");
3740                 return;
3741         }
3742
3743         dynamic_dev_dbg(DEV, " uuid[%s] now %016llX\n",
3744                  uuid_str[index],
3745                  (unsigned long long)mdev->ldev->md.uuid[index]);
3746 }
3747
3748
3749 /**
3750  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3751  * @mdev:       DRBD device.
3752  *
3753  * Call this function if you change anything that should be written to
3754  * the meta-data super block. This function sets MD_DIRTY, and starts a
3755  * timer that ensures that within five seconds you have to call drbd_md_sync().
3756  */
3757 #ifdef DEBUG
3758 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3759 {
3760         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3761                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3762                 mdev->last_md_mark_dirty.line = line;
3763                 mdev->last_md_mark_dirty.func = func;
3764         }
3765 }
3766 #else
3767 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3768 {
3769         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3770                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3771 }
3772 #endif
3773
3774 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3775 {
3776         int i;
3777
3778         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++) {
3779                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3780                 debug_drbd_uuid(mdev, i+1);
3781         }
3782 }
3783
3784 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3785 {
3786         if (idx == UI_CURRENT) {
3787                 if (mdev->state.role == R_PRIMARY)
3788                         val |= 1;
3789                 else
3790                         val &= ~((u64)1);
3791
3792                 drbd_set_ed_uuid(mdev, val);
3793         }
3794
3795         mdev->ldev->md.uuid[idx] = val;
3796         debug_drbd_uuid(mdev, idx);
3797         drbd_md_mark_dirty(mdev);
3798 }
3799
3800
3801 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3802 {
3803         if (mdev->ldev->md.uuid[idx]) {
3804                 drbd_uuid_move_history(mdev);
3805                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3806                 debug_drbd_uuid(mdev, UI_HISTORY_START);
3807         }
3808         _drbd_uuid_set(mdev, idx, val);
3809 }
3810
3811 /**
3812  * drbd_uuid_new_current() - Creates a new current UUID
3813  * @mdev:       DRBD device.
3814  *
3815  * Creates a new current UUID, and rotates the old current UUID into
3816  * the bitmap slot. Causes an incremental resync upon next connect.
3817  */
3818 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3819 {
3820         u64 val;
3821
3822         dev_info(DEV, "Creating new current UUID\n");
3823         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3824         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3825         debug_drbd_uuid(mdev, UI_BITMAP);
3826
3827         get_random_bytes(&val, sizeof(u64));
3828         _drbd_uuid_set(mdev, UI_CURRENT, val);
3829         /* get it to stable storage _now_ */
3830         drbd_md_sync(mdev);
3831 }
3832
3833 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3834 {
3835         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3836                 return;
3837
3838         if (val == 0) {
3839                 drbd_uuid_move_history(mdev);
3840                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3841                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3842                 debug_drbd_uuid(mdev, UI_HISTORY_START);
3843                 debug_drbd_uuid(mdev, UI_BITMAP);
3844         } else {
3845                 if (mdev->ldev->md.uuid[UI_BITMAP])
3846                         dev_warn(DEV, "bm UUID already set");
3847
3848                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3849                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3850
3851                 debug_drbd_uuid(mdev, UI_BITMAP);
3852         }
3853         drbd_md_mark_dirty(mdev);
3854 }
3855
3856 /**
3857  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3858  * @mdev:       DRBD device.
3859  *
3860  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3861  */
3862 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3863 {
3864         int rv = -EIO;
3865
3866         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3867                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3868                 drbd_md_sync(mdev);
3869                 drbd_bm_set_all(mdev);
3870
3871                 rv = drbd_bm_write(mdev);
3872
3873                 if (!rv) {
3874                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3875                         drbd_md_sync(mdev);
3876                 }
3877
3878                 put_ldev(mdev);
3879         }
3880
3881         return rv;
3882 }
3883
3884 /**
3885  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3886  * @mdev:       DRBD device.
3887  *
3888  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3889  */
3890 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3891 {
3892         int rv = -EIO;
3893
3894         drbd_resume_al(mdev);
3895         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3896                 drbd_bm_clear_all(mdev);
3897                 rv = drbd_bm_write(mdev);
3898                 put_ldev(mdev);
3899         }
3900
3901         return rv;
3902 }
3903
3904 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3905 {
3906         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3907         int rv = -EIO;
3908
3909         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3910
3911         if (get_ldev(mdev)) {
3912                 drbd_bm_lock(mdev, work->why);
3913                 rv = work->io_fn(mdev);
3914                 drbd_bm_unlock(mdev);
3915                 put_ldev(mdev);
3916         }
3917
3918         clear_bit(BITMAP_IO, &mdev->flags);
3919         smp_mb__after_clear_bit();
3920         wake_up(&mdev->misc_wait);
3921
3922         if (work->done)
3923                 work->done(mdev, rv);
3924
3925         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3926         work->why = NULL;
3927
3928         return 1;
3929 }
3930
3931 void drbd_ldev_destroy(struct drbd_conf *mdev)
3932 {
3933         lc_destroy(mdev->resync);
3934         mdev->resync = NULL;
3935         lc_destroy(mdev->act_log);
3936         mdev->act_log = NULL;
3937         __no_warn(local,
3938                 drbd_free_bc(mdev->ldev);
3939                 mdev->ldev = NULL;);
3940
3941         if (mdev->md_io_tmpp) {
3942                 __free_page(mdev->md_io_tmpp);
3943                 mdev->md_io_tmpp = NULL;
3944         }
3945         clear_bit(GO_DISKLESS, &mdev->flags);
3946 }
3947
3948 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3949 {
3950         D_ASSERT(mdev->state.disk == D_FAILED);
3951         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3952          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3953          * the protected members anymore, though, so once put_ldev reaches zero
3954          * again, it will be safe to free them. */
3955         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3956         return 1;
3957 }
3958
3959 void drbd_go_diskless(struct drbd_conf *mdev)
3960 {
3961         D_ASSERT(mdev->state.disk == D_FAILED);
3962         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3963                 drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
3964 }
3965
3966 /**
3967  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3968  * @mdev:       DRBD device.
3969  * @io_fn:      IO callback to be called when bitmap IO is possible
3970  * @done:       callback to be called after the bitmap IO was performed
3971  * @why:        Descriptive text of the reason for doing the IO
3972  *
3973  * While IO on the bitmap happens we freeze application IO thus we ensure
3974  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3975  * called from worker context. It MUST NOT be used while a previous such
3976  * work is still pending!
3977  */
3978 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3979                           int (*io_fn)(struct drbd_conf *),
3980                           void (*done)(struct drbd_conf *, int),
3981                           char *why)
3982 {
3983         D_ASSERT(current == mdev->worker.task);
3984
3985         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3986         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3987         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3988         if (mdev->bm_io_work.why)
3989                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3990                         why, mdev->bm_io_work.why);
3991
3992         mdev->bm_io_work.io_fn = io_fn;
3993         mdev->bm_io_work.done = done;
3994         mdev->bm_io_work.why = why;
3995
3996         spin_lock_irq(&mdev->req_lock);
3997         set_bit(BITMAP_IO, &mdev->flags);
3998         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3999                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4000                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4001         }
4002         spin_unlock_irq(&mdev->req_lock);
4003 }
4004
4005 /**
4006  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4007  * @mdev:       DRBD device.
4008  * @io_fn:      IO callback to be called when bitmap IO is possible
4009  * @why:        Descriptive text of the reason for doing the IO
4010  *
4011  * freezes application IO while that the actual IO operations runs. This
4012  * functions MAY NOT be called from worker context.
4013  */
4014 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
4015 {
4016         int rv;
4017
4018         D_ASSERT(current != mdev->worker.task);
4019
4020         drbd_suspend_io(mdev);
4021
4022         drbd_bm_lock(mdev, why);
4023         rv = io_fn(mdev);
4024         drbd_bm_unlock(mdev);
4025
4026         drbd_resume_io(mdev);
4027
4028         return rv;
4029 }
4030
4031 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4032 {
4033         if ((mdev->ldev->md.flags & flag) != flag) {
4034                 drbd_md_mark_dirty(mdev);
4035                 mdev->ldev->md.flags |= flag;
4036         }
4037 }
4038
4039 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4040 {
4041         if ((mdev->ldev->md.flags & flag) != 0) {
4042                 drbd_md_mark_dirty(mdev);
4043                 mdev->ldev->md.flags &= ~flag;
4044         }
4045 }
4046 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4047 {
4048         return (bdev->md.flags & flag) != 0;
4049 }
4050
4051 static void md_sync_timer_fn(unsigned long data)
4052 {
4053         struct drbd_conf *mdev = (struct drbd_conf *) data;
4054
4055         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4056 }
4057
4058 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4059 {
4060         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4061 #ifdef DEBUG
4062         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4063                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4064 #endif
4065         drbd_md_sync(mdev);
4066         return 1;
4067 }
4068
4069 #ifdef CONFIG_DRBD_FAULT_INJECTION
4070 /* Fault insertion support including random number generator shamelessly
4071  * stolen from kernel/rcutorture.c */
4072 struct fault_random_state {
4073         unsigned long state;
4074         unsigned long count;
4075 };
4076
4077 #define FAULT_RANDOM_MULT 39916801  /* prime */
4078 #define FAULT_RANDOM_ADD        479001701 /* prime */
4079 #define FAULT_RANDOM_REFRESH 10000
4080
4081 /*
4082  * Crude but fast random-number generator.  Uses a linear congruential
4083  * generator, with occasional help from get_random_bytes().
4084  */
4085 static unsigned long
4086 _drbd_fault_random(struct fault_random_state *rsp)
4087 {
4088         long refresh;
4089
4090         if (!rsp->count--) {
4091                 get_random_bytes(&refresh, sizeof(refresh));
4092                 rsp->state += refresh;
4093                 rsp->count = FAULT_RANDOM_REFRESH;
4094         }
4095         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4096         return swahw32(rsp->state);
4097 }
4098
4099 static char *
4100 _drbd_fault_str(unsigned int type) {
4101         static char *_faults[] = {
4102                 [DRBD_FAULT_MD_WR] = "Meta-data write",
4103                 [DRBD_FAULT_MD_RD] = "Meta-data read",
4104                 [DRBD_FAULT_RS_WR] = "Resync write",
4105                 [DRBD_FAULT_RS_RD] = "Resync read",
4106                 [DRBD_FAULT_DT_WR] = "Data write",
4107                 [DRBD_FAULT_DT_RD] = "Data read",
4108                 [DRBD_FAULT_DT_RA] = "Data read ahead",
4109                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
4110                 [DRBD_FAULT_AL_EE] = "EE allocation",
4111                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
4112         };
4113
4114         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4115 }
4116
4117 unsigned int
4118 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4119 {
4120         static struct fault_random_state rrs = {0, 0};
4121
4122         unsigned int ret = (
4123                 (fault_devs == 0 ||
4124                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4125                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4126
4127         if (ret) {
4128                 fault_count++;
4129
4130                 if (__ratelimit(&drbd_ratelimit_state))
4131                         dev_warn(DEV, "***Simulating %s failure\n",
4132                                 _drbd_fault_str(type));
4133         }
4134
4135         return ret;
4136 }
4137 #endif
4138
4139 const char *drbd_buildtag(void)
4140 {
4141         /* DRBD built from external sources has here a reference to the
4142            git hash of the source code. */
4143
4144         static char buildtag[38] = "\0uilt-in";
4145
4146         if (buildtag[0] == 0) {
4147 #ifdef CONFIG_MODULES
4148                 if (THIS_MODULE != NULL)
4149                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4150                 else
4151 #endif
4152                         buildtag[0] = 'b';
4153         }
4154
4155         return buildtag;
4156 }
4157
4158 module_init(drbd_init)
4159 module_exit(drbd_cleanup)
4160
4161 EXPORT_SYMBOL(drbd_conn_str);
4162 EXPORT_SYMBOL(drbd_role_str);
4163 EXPORT_SYMBOL(drbd_disk_str);
4164 EXPORT_SYMBOL(drbd_set_st_err_str);