Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/ericvh...
[pandora-kernel.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76                            union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82               "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144          member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152
153 static const struct block_device_operations drbd_ops = {
154         .owner =   THIS_MODULE,
155         .open =    drbd_open,
156         .release = drbd_release,
157 };
158
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167         int io_allowed;
168
169         atomic_inc(&mdev->local_cnt);
170         io_allowed = (mdev->state.disk >= mins);
171         if (!io_allowed) {
172                 if (atomic_dec_and_test(&mdev->local_cnt))
173                         wake_up(&mdev->misc_wait);
174         }
175         return io_allowed;
176 }
177
178 #endif
179
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192         struct drbd_tl_epoch *b;
193
194         /* during device minor initialization, we may well use GFP_KERNEL */
195         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196         if (!b)
197                 return 0;
198         INIT_LIST_HEAD(&b->requests);
199         INIT_LIST_HEAD(&b->w.list);
200         b->next = NULL;
201         b->br_number = 4711;
202         b->n_req = 0;
203         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204
205         mdev->oldest_tle = b;
206         mdev->newest_tle = b;
207         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208
209         mdev->tl_hash = NULL;
210         mdev->tl_hash_s = 0;
211
212         return 1;
213 }
214
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219         kfree(mdev->oldest_tle);
220         mdev->oldest_tle = NULL;
221         kfree(mdev->unused_spare_tle);
222         mdev->unused_spare_tle = NULL;
223         kfree(mdev->tl_hash);
224         mdev->tl_hash = NULL;
225         mdev->tl_hash_s = 0;
226 }
227
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:       DRBD device.
231  * @new:        Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237         struct drbd_tl_epoch *newest_before;
238
239         INIT_LIST_HEAD(&new->requests);
240         INIT_LIST_HEAD(&new->w.list);
241         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242         new->next = NULL;
243         new->n_req = 0;
244
245         newest_before = mdev->newest_tle;
246         /* never send a barrier number == 0, because that is special-cased
247          * when using TCQ for our write ordering code */
248         new->br_number = (newest_before->br_number+1) ?: 1;
249         if (mdev->newest_tle != new) {
250                 mdev->newest_tle->next = new;
251                 mdev->newest_tle = new;
252         }
253 }
254
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:       DRBD device.
258  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
259  * @set_size:   Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266                        unsigned int set_size)
267 {
268         struct drbd_tl_epoch *b, *nob; /* next old barrier */
269         struct list_head *le, *tle;
270         struct drbd_request *r;
271
272         spin_lock_irq(&mdev->req_lock);
273
274         b = mdev->oldest_tle;
275
276         /* first some paranoia code */
277         if (b == NULL) {
278                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279                         barrier_nr);
280                 goto bail;
281         }
282         if (b->br_number != barrier_nr) {
283                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284                         barrier_nr, b->br_number);
285                 goto bail;
286         }
287         if (b->n_req != set_size) {
288                 dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289                         barrier_nr, set_size, b->n_req);
290                 goto bail;
291         }
292
293         /* Clean up list of requests processed during current epoch */
294         list_for_each_safe(le, tle, &b->requests) {
295                 r = list_entry(le, struct drbd_request, tl_requests);
296                 _req_mod(r, barrier_acked);
297         }
298         /* There could be requests on the list waiting for completion
299            of the write to the local disk. To avoid corruptions of
300            slab's data structures we have to remove the lists head.
301
302            Also there could have been a barrier ack out of sequence, overtaking
303            the write acks - which would be a bug and violating write ordering.
304            To not deadlock in case we lose connection while such requests are
305            still pending, we need some way to find them for the
306            _req_mode(connection_lost_while_pending).
307
308            These have been list_move'd to the out_of_sequence_requests list in
309            _req_mod(, barrier_acked) above.
310            */
311         list_del_init(&b->requests);
312
313         nob = b->next;
314         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315                 _tl_add_barrier(mdev, b);
316                 if (nob)
317                         mdev->oldest_tle = nob;
318                 /* if nob == NULL b was the only barrier, and becomes the new
319                    barrier. Therefore mdev->oldest_tle points already to b */
320         } else {
321                 D_ASSERT(nob != NULL);
322                 mdev->oldest_tle = nob;
323                 kfree(b);
324         }
325
326         spin_unlock_irq(&mdev->req_lock);
327         dec_ap_pending(mdev);
328
329         return;
330
331 bail:
332         spin_unlock_irq(&mdev->req_lock);
333         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335
336
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:       DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347         struct drbd_tl_epoch *b, *tmp;
348         struct list_head *le, *tle;
349         struct drbd_request *r;
350         int new_initial_bnr = net_random();
351
352         spin_lock_irq(&mdev->req_lock);
353
354         b = mdev->oldest_tle;
355         while (b) {
356                 list_for_each_safe(le, tle, &b->requests) {
357                         r = list_entry(le, struct drbd_request, tl_requests);
358                         /* It would be nice to complete outside of spinlock.
359                          * But this is easier for now. */
360                         _req_mod(r, connection_lost_while_pending);
361                 }
362                 tmp = b->next;
363
364                 /* there could still be requests on that ring list,
365                  * in case local io is still pending */
366                 list_del(&b->requests);
367
368                 /* dec_ap_pending corresponding to queue_barrier.
369                  * the newest barrier may not have been queued yet,
370                  * in which case w.cb is still NULL. */
371                 if (b->w.cb != NULL)
372                         dec_ap_pending(mdev);
373
374                 if (b == mdev->newest_tle) {
375                         /* recycle, but reinit! */
376                         D_ASSERT(tmp == NULL);
377                         INIT_LIST_HEAD(&b->requests);
378                         INIT_LIST_HEAD(&b->w.list);
379                         b->w.cb = NULL;
380                         b->br_number = new_initial_bnr;
381                         b->n_req = 0;
382
383                         mdev->oldest_tle = b;
384                         break;
385                 }
386                 kfree(b);
387                 b = tmp;
388         }
389
390         /* we expect this list to be empty. */
391         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392
393         /* but just in case, clean it up anyways! */
394         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395                 r = list_entry(le, struct drbd_request, tl_requests);
396                 /* It would be nice to complete outside of spinlock.
397                  * But this is easier for now. */
398                 _req_mod(r, connection_lost_while_pending);
399         }
400
401         /* ensure bit indicating barrier is required is clear */
402         clear_bit(CREATE_BARRIER, &mdev->flags);
403
404         spin_unlock_irq(&mdev->req_lock);
405 }
406
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:       DRBD device.
410  * @os:         old (current) state.
411  * @ns:         new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414                           union drbd_state os, union drbd_state ns)
415 {
416         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426                       union drbd_state mask, union drbd_state val)
427 {
428         unsigned long flags;
429         union drbd_state os, ns;
430         int rv;
431
432         spin_lock_irqsave(&mdev->req_lock, flags);
433         os = mdev->state;
434         ns.i = (os.i & ~mask.i) | val.i;
435         rv = _drbd_set_state(mdev, ns, f, NULL);
436         ns = mdev->state;
437         spin_unlock_irqrestore(&mdev->req_lock, flags);
438
439         return rv;
440 }
441
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:       DRBD device.
445  * @mask:       mask of state bits to change.
446  * @val:        value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449         union drbd_state mask, union drbd_state val)
450 {
451         drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456                                      union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458                                        union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460                         union drbd_state, union drbd_state);
461
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463                                     union drbd_state mask, union drbd_state val)
464 {
465         union drbd_state os, ns;
466         unsigned long flags;
467         int rv;
468
469         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470                 return SS_CW_SUCCESS;
471
472         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473                 return SS_CW_FAILED_BY_PEER;
474
475         rv = 0;
476         spin_lock_irqsave(&mdev->req_lock, flags);
477         os = mdev->state;
478         ns.i = (os.i & ~mask.i) | val.i;
479         ns = sanitize_state(mdev, os, ns, NULL);
480
481         if (!cl_wide_st_chg(mdev, os, ns))
482                 rv = SS_CW_NO_NEED;
483         if (!rv) {
484                 rv = is_valid_state(mdev, ns);
485                 if (rv == SS_SUCCESS) {
486                         rv = is_valid_state_transition(mdev, ns, os);
487                         if (rv == SS_SUCCESS)
488                                 rv = 0; /* cont waiting, otherwise fail. */
489                 }
490         }
491         spin_unlock_irqrestore(&mdev->req_lock, flags);
492
493         return rv;
494 }
495
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:       DRBD device.
499  * @mask:       mask of state bits to change.
500  * @val:        value of new state bits.
501  * @f:          flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507                           union drbd_state mask, union drbd_state val,
508                           enum chg_state_flags f)
509 {
510         struct completion done;
511         unsigned long flags;
512         union drbd_state os, ns;
513         int rv;
514
515         init_completion(&done);
516
517         if (f & CS_SERIALIZE)
518                 mutex_lock(&mdev->state_mutex);
519
520         spin_lock_irqsave(&mdev->req_lock, flags);
521         os = mdev->state;
522         ns.i = (os.i & ~mask.i) | val.i;
523         ns = sanitize_state(mdev, os, ns, NULL);
524
525         if (cl_wide_st_chg(mdev, os, ns)) {
526                 rv = is_valid_state(mdev, ns);
527                 if (rv == SS_SUCCESS)
528                         rv = is_valid_state_transition(mdev, ns, os);
529                 spin_unlock_irqrestore(&mdev->req_lock, flags);
530
531                 if (rv < SS_SUCCESS) {
532                         if (f & CS_VERBOSE)
533                                 print_st_err(mdev, os, ns, rv);
534                         goto abort;
535                 }
536
537                 drbd_state_lock(mdev);
538                 if (!drbd_send_state_req(mdev, mask, val)) {
539                         drbd_state_unlock(mdev);
540                         rv = SS_CW_FAILED_BY_PEER;
541                         if (f & CS_VERBOSE)
542                                 print_st_err(mdev, os, ns, rv);
543                         goto abort;
544                 }
545
546                 wait_event(mdev->state_wait,
547                         (rv = _req_st_cond(mdev, mask, val)));
548
549                 if (rv < SS_SUCCESS) {
550                         drbd_state_unlock(mdev);
551                         if (f & CS_VERBOSE)
552                                 print_st_err(mdev, os, ns, rv);
553                         goto abort;
554                 }
555                 spin_lock_irqsave(&mdev->req_lock, flags);
556                 os = mdev->state;
557                 ns.i = (os.i & ~mask.i) | val.i;
558                 rv = _drbd_set_state(mdev, ns, f, &done);
559                 drbd_state_unlock(mdev);
560         } else {
561                 rv = _drbd_set_state(mdev, ns, f, &done);
562         }
563
564         spin_unlock_irqrestore(&mdev->req_lock, flags);
565
566         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567                 D_ASSERT(current != mdev->worker.task);
568                 wait_for_completion(&done);
569         }
570
571 abort:
572         if (f & CS_SERIALIZE)
573                 mutex_unlock(&mdev->state_mutex);
574
575         return rv;
576 }
577
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:       DRBD device.
581  * @mask:       mask of state bits to change.
582  * @val:        value of new state bits.
583  * @f:          flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
589                         union drbd_state val,   enum chg_state_flags f)
590 {
591         int rv;
592
593         wait_event(mdev->state_wait,
594                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595
596         return rv;
597 }
598
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602             name,
603             drbd_conn_str(ns.conn),
604             drbd_role_str(ns.role),
605             drbd_role_str(ns.peer),
606             drbd_disk_str(ns.disk),
607             drbd_disk_str(ns.pdsk),
608             ns.susp ? 's' : 'r',
609             ns.aftr_isp ? 'a' : '-',
610             ns.peer_isp ? 'p' : '-',
611             ns.user_isp ? 'u' : '-'
612             );
613 }
614
615 void print_st_err(struct drbd_conf *mdev,
616         union drbd_state os, union drbd_state ns, int err)
617 {
618         if (err == SS_IN_TRANSIENT_STATE)
619                 return;
620         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621         print_st(mdev, " state", os);
622         print_st(mdev, "wanted", ns);
623 }
624
625
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633
634 #define PSC(A) \
635         ({ if (ns.A != os.A) { \
636                 pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637                               drbd_##A##_str(os.A), \
638                               drbd_##A##_str(ns.A)); \
639         } })
640
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:       DRBD device.
644  * @ns:         State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648         /* See drbd_state_sw_errors in drbd_strings.c */
649
650         enum drbd_fencing_p fp;
651         int rv = SS_SUCCESS;
652
653         fp = FP_DONT_CARE;
654         if (get_ldev(mdev)) {
655                 fp = mdev->ldev->dc.fencing;
656                 put_ldev(mdev);
657         }
658
659         if (get_net_conf(mdev)) {
660                 if (!mdev->net_conf->two_primaries &&
661                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662                         rv = SS_TWO_PRIMARIES;
663                 put_net_conf(mdev);
664         }
665
666         if (rv <= 0)
667                 /* already found a reason to abort */;
668         else if (ns.role == R_SECONDARY && mdev->open_cnt)
669                 rv = SS_DEVICE_IN_USE;
670
671         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672                 rv = SS_NO_UP_TO_DATE_DISK;
673
674         else if (fp >= FP_RESOURCE &&
675                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676                 rv = SS_PRIMARY_NOP;
677
678         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679                 rv = SS_NO_UP_TO_DATE_DISK;
680
681         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682                 rv = SS_NO_LOCAL_DISK;
683
684         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685                 rv = SS_NO_REMOTE_DISK;
686
687         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
688                 rv = SS_NO_UP_TO_DATE_DISK;
689
690         else if ((ns.conn == C_CONNECTED ||
691                   ns.conn == C_WF_BITMAP_S ||
692                   ns.conn == C_SYNC_SOURCE ||
693                   ns.conn == C_PAUSED_SYNC_S) &&
694                   ns.disk == D_OUTDATED)
695                 rv = SS_CONNECTED_OUTDATES;
696
697         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
698                  (mdev->sync_conf.verify_alg[0] == 0))
699                 rv = SS_NO_VERIFY_ALG;
700
701         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
702                   mdev->agreed_pro_version < 88)
703                 rv = SS_NOT_SUPPORTED;
704
705         return rv;
706 }
707
708 /**
709  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
710  * @mdev:       DRBD device.
711  * @ns:         new state.
712  * @os:         old state.
713  */
714 static int is_valid_state_transition(struct drbd_conf *mdev,
715                                      union drbd_state ns, union drbd_state os)
716 {
717         int rv = SS_SUCCESS;
718
719         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
720             os.conn > C_CONNECTED)
721                 rv = SS_RESYNC_RUNNING;
722
723         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
724                 rv = SS_ALREADY_STANDALONE;
725
726         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
727                 rv = SS_IS_DISKLESS;
728
729         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
730                 rv = SS_NO_NET_CONFIG;
731
732         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
733                 rv = SS_LOWER_THAN_OUTDATED;
734
735         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
736                 rv = SS_IN_TRANSIENT_STATE;
737
738         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
739                 rv = SS_IN_TRANSIENT_STATE;
740
741         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
742                 rv = SS_NEED_CONNECTION;
743
744         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
745             ns.conn != os.conn && os.conn > C_CONNECTED)
746                 rv = SS_RESYNC_RUNNING;
747
748         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
749             os.conn < C_CONNECTED)
750                 rv = SS_NEED_CONNECTION;
751
752         return rv;
753 }
754
755 /**
756  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
757  * @mdev:       DRBD device.
758  * @os:         old state.
759  * @ns:         new state.
760  * @warn_sync_abort:
761  *
762  * When we loose connection, we have to set the state of the peers disk (pdsk)
763  * to D_UNKNOWN. This rule and many more along those lines are in this function.
764  */
765 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
766                                        union drbd_state ns, int *warn_sync_abort)
767 {
768         enum drbd_fencing_p fp;
769
770         fp = FP_DONT_CARE;
771         if (get_ldev(mdev)) {
772                 fp = mdev->ldev->dc.fencing;
773                 put_ldev(mdev);
774         }
775
776         /* Disallow Network errors to configure a device's network part */
777         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
778             os.conn <= C_DISCONNECTING)
779                 ns.conn = os.conn;
780
781         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
782         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
783             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
784                 ns.conn = os.conn;
785
786         /* After C_DISCONNECTING only C_STANDALONE may follow */
787         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
788                 ns.conn = os.conn;
789
790         if (ns.conn < C_CONNECTED) {
791                 ns.peer_isp = 0;
792                 ns.peer = R_UNKNOWN;
793                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
794                         ns.pdsk = D_UNKNOWN;
795         }
796
797         /* Clear the aftr_isp when becoming unconfigured */
798         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
799                 ns.aftr_isp = 0;
800
801         if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
802                 ns.pdsk = D_UNKNOWN;
803
804         /* Abort resync if a disk fails/detaches */
805         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
806             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
807                 if (warn_sync_abort)
808                         *warn_sync_abort = 1;
809                 ns.conn = C_CONNECTED;
810         }
811
812         if (ns.conn >= C_CONNECTED &&
813             ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
814              (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
815                 switch (ns.conn) {
816                 case C_WF_BITMAP_T:
817                 case C_PAUSED_SYNC_T:
818                         ns.disk = D_OUTDATED;
819                         break;
820                 case C_CONNECTED:
821                 case C_WF_BITMAP_S:
822                 case C_SYNC_SOURCE:
823                 case C_PAUSED_SYNC_S:
824                         ns.disk = D_UP_TO_DATE;
825                         break;
826                 case C_SYNC_TARGET:
827                         ns.disk = D_INCONSISTENT;
828                         dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
829                         break;
830                 }
831                 if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
832                         dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
833         }
834
835         if (ns.conn >= C_CONNECTED &&
836             (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
837                 switch (ns.conn) {
838                 case C_CONNECTED:
839                 case C_WF_BITMAP_T:
840                 case C_PAUSED_SYNC_T:
841                 case C_SYNC_TARGET:
842                         ns.pdsk = D_UP_TO_DATE;
843                         break;
844                 case C_WF_BITMAP_S:
845                 case C_PAUSED_SYNC_S:
846                         /* remap any consistent state to D_OUTDATED,
847                          * but disallow "upgrade" of not even consistent states.
848                          */
849                         ns.pdsk =
850                                 (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
851                                 ? os.pdsk : D_OUTDATED;
852                         break;
853                 case C_SYNC_SOURCE:
854                         ns.pdsk = D_INCONSISTENT;
855                         dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
856                         break;
857                 }
858                 if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
859                         dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
860         }
861
862         /* Connection breaks down before we finished "Negotiating" */
863         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
864             get_ldev_if_state(mdev, D_NEGOTIATING)) {
865                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
866                         ns.disk = mdev->new_state_tmp.disk;
867                         ns.pdsk = mdev->new_state_tmp.pdsk;
868                 } else {
869                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
870                         ns.disk = D_DISKLESS;
871                         ns.pdsk = D_UNKNOWN;
872                 }
873                 put_ldev(mdev);
874         }
875
876         if (fp == FP_STONITH &&
877             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
878             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
879                 ns.susp = 1;
880
881         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
882                 if (ns.conn == C_SYNC_SOURCE)
883                         ns.conn = C_PAUSED_SYNC_S;
884                 if (ns.conn == C_SYNC_TARGET)
885                         ns.conn = C_PAUSED_SYNC_T;
886         } else {
887                 if (ns.conn == C_PAUSED_SYNC_S)
888                         ns.conn = C_SYNC_SOURCE;
889                 if (ns.conn == C_PAUSED_SYNC_T)
890                         ns.conn = C_SYNC_TARGET;
891         }
892
893         return ns;
894 }
895
896 /* helper for __drbd_set_state */
897 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
898 {
899         if (cs == C_VERIFY_T) {
900                 /* starting online verify from an arbitrary position
901                  * does not fit well into the existing protocol.
902                  * on C_VERIFY_T, we initialize ov_left and friends
903                  * implicitly in receive_DataRequest once the
904                  * first P_OV_REQUEST is received */
905                 mdev->ov_start_sector = ~(sector_t)0;
906         } else {
907                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
908                 if (bit >= mdev->rs_total)
909                         mdev->ov_start_sector =
910                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
911                 mdev->ov_position = mdev->ov_start_sector;
912         }
913 }
914
915 /**
916  * __drbd_set_state() - Set a new DRBD state
917  * @mdev:       DRBD device.
918  * @ns:         new state.
919  * @flags:      Flags
920  * @done:       Optional completion, that will get completed after the after_state_ch() finished
921  *
922  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
923  */
924 int __drbd_set_state(struct drbd_conf *mdev,
925                     union drbd_state ns, enum chg_state_flags flags,
926                     struct completion *done)
927 {
928         union drbd_state os;
929         int rv = SS_SUCCESS;
930         int warn_sync_abort = 0;
931         struct after_state_chg_work *ascw;
932
933         os = mdev->state;
934
935         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
936
937         if (ns.i == os.i)
938                 return SS_NOTHING_TO_DO;
939
940         if (!(flags & CS_HARD)) {
941                 /*  pre-state-change checks ; only look at ns  */
942                 /* See drbd_state_sw_errors in drbd_strings.c */
943
944                 rv = is_valid_state(mdev, ns);
945                 if (rv < SS_SUCCESS) {
946                         /* If the old state was illegal as well, then let
947                            this happen...*/
948
949                         if (is_valid_state(mdev, os) == rv) {
950                                 dev_err(DEV, "Considering state change from bad state. "
951                                     "Error would be: '%s'\n",
952                                     drbd_set_st_err_str(rv));
953                                 print_st(mdev, "old", os);
954                                 print_st(mdev, "new", ns);
955                                 rv = is_valid_state_transition(mdev, ns, os);
956                         }
957                 } else
958                         rv = is_valid_state_transition(mdev, ns, os);
959         }
960
961         if (rv < SS_SUCCESS) {
962                 if (flags & CS_VERBOSE)
963                         print_st_err(mdev, os, ns, rv);
964                 return rv;
965         }
966
967         if (warn_sync_abort)
968                 dev_warn(DEV, "Resync aborted.\n");
969
970         {
971                 char *pbp, pb[300];
972                 pbp = pb;
973                 *pbp = 0;
974                 PSC(role);
975                 PSC(peer);
976                 PSC(conn);
977                 PSC(disk);
978                 PSC(pdsk);
979                 PSC(susp);
980                 PSC(aftr_isp);
981                 PSC(peer_isp);
982                 PSC(user_isp);
983                 dev_info(DEV, "%s\n", pb);
984         }
985
986         /* solve the race between becoming unconfigured,
987          * worker doing the cleanup, and
988          * admin reconfiguring us:
989          * on (re)configure, first set CONFIG_PENDING,
990          * then wait for a potentially exiting worker,
991          * start the worker, and schedule one no_op.
992          * then proceed with configuration.
993          */
994         if (ns.disk == D_DISKLESS &&
995             ns.conn == C_STANDALONE &&
996             ns.role == R_SECONDARY &&
997             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
998                 set_bit(DEVICE_DYING, &mdev->flags);
999
1000         mdev->state.i = ns.i;
1001         wake_up(&mdev->misc_wait);
1002         wake_up(&mdev->state_wait);
1003
1004         /*   post-state-change actions   */
1005         if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1006                 set_bit(STOP_SYNC_TIMER, &mdev->flags);
1007                 mod_timer(&mdev->resync_timer, jiffies);
1008         }
1009
1010         /* aborted verify run. log the last position */
1011         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1012             ns.conn < C_CONNECTED) {
1013                 mdev->ov_start_sector =
1014                         BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1015                 dev_info(DEV, "Online Verify reached sector %llu\n",
1016                         (unsigned long long)mdev->ov_start_sector);
1017         }
1018
1019         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1020             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1021                 dev_info(DEV, "Syncer continues.\n");
1022                 mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1023                 if (ns.conn == C_SYNC_TARGET) {
1024                         if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1025                                 mod_timer(&mdev->resync_timer, jiffies);
1026                         /* This if (!test_bit) is only needed for the case
1027                            that a device that has ceased to used its timer,
1028                            i.e. it is already in drbd_resync_finished() gets
1029                            paused and resumed. */
1030                 }
1031         }
1032
1033         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1034             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1035                 dev_info(DEV, "Resync suspended\n");
1036                 mdev->rs_mark_time = jiffies;
1037                 if (ns.conn == C_PAUSED_SYNC_T)
1038                         set_bit(STOP_SYNC_TIMER, &mdev->flags);
1039         }
1040
1041         if (os.conn == C_CONNECTED &&
1042             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1043                 mdev->ov_position = 0;
1044                 mdev->rs_total =
1045                 mdev->rs_mark_left = drbd_bm_bits(mdev);
1046                 if (mdev->agreed_pro_version >= 90)
1047                         set_ov_position(mdev, ns.conn);
1048                 else
1049                         mdev->ov_start_sector = 0;
1050                 mdev->ov_left = mdev->rs_total
1051                               - BM_SECT_TO_BIT(mdev->ov_position);
1052                 mdev->rs_start     =
1053                 mdev->rs_mark_time = jiffies;
1054                 mdev->ov_last_oos_size = 0;
1055                 mdev->ov_last_oos_start = 0;
1056
1057                 if (ns.conn == C_VERIFY_S) {
1058                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1059                                         (unsigned long long)mdev->ov_position);
1060                         mod_timer(&mdev->resync_timer, jiffies);
1061                 }
1062         }
1063
1064         if (get_ldev(mdev)) {
1065                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1066                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1067                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1068
1069                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1070                         mdf |= MDF_CRASHED_PRIMARY;
1071                 if (mdev->state.role == R_PRIMARY ||
1072                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1073                         mdf |= MDF_PRIMARY_IND;
1074                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1075                         mdf |= MDF_CONNECTED_IND;
1076                 if (mdev->state.disk > D_INCONSISTENT)
1077                         mdf |= MDF_CONSISTENT;
1078                 if (mdev->state.disk > D_OUTDATED)
1079                         mdf |= MDF_WAS_UP_TO_DATE;
1080                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1081                         mdf |= MDF_PEER_OUT_DATED;
1082                 if (mdf != mdev->ldev->md.flags) {
1083                         mdev->ldev->md.flags = mdf;
1084                         drbd_md_mark_dirty(mdev);
1085                 }
1086                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1087                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1088                 put_ldev(mdev);
1089         }
1090
1091         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1092         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1093             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1094                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1095
1096         /* Receiver should clean up itself */
1097         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1098                 drbd_thread_stop_nowait(&mdev->receiver);
1099
1100         /* Now the receiver finished cleaning up itself, it should die */
1101         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1102                 drbd_thread_stop_nowait(&mdev->receiver);
1103
1104         /* Upon network failure, we need to restart the receiver. */
1105         if (os.conn > C_TEAR_DOWN &&
1106             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1107                 drbd_thread_restart_nowait(&mdev->receiver);
1108
1109         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1110         if (ascw) {
1111                 ascw->os = os;
1112                 ascw->ns = ns;
1113                 ascw->flags = flags;
1114                 ascw->w.cb = w_after_state_ch;
1115                 ascw->done = done;
1116                 drbd_queue_work(&mdev->data.work, &ascw->w);
1117         } else {
1118                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1119         }
1120
1121         return rv;
1122 }
1123
1124 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1125 {
1126         struct after_state_chg_work *ascw =
1127                 container_of(w, struct after_state_chg_work, w);
1128         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1129         if (ascw->flags & CS_WAIT_COMPLETE) {
1130                 D_ASSERT(ascw->done != NULL);
1131                 complete(ascw->done);
1132         }
1133         kfree(ascw);
1134
1135         return 1;
1136 }
1137
1138 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1139 {
1140         if (rv) {
1141                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1142                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1143                 return;
1144         }
1145
1146         switch (mdev->state.conn) {
1147         case C_STARTING_SYNC_T:
1148                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1149                 break;
1150         case C_STARTING_SYNC_S:
1151                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1152                 break;
1153         }
1154 }
1155
1156 /**
1157  * after_state_ch() - Perform after state change actions that may sleep
1158  * @mdev:       DRBD device.
1159  * @os:         old state.
1160  * @ns:         new state.
1161  * @flags:      Flags
1162  */
1163 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1164                            union drbd_state ns, enum chg_state_flags flags)
1165 {
1166         enum drbd_fencing_p fp;
1167
1168         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1169                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1170                 if (mdev->p_uuid)
1171                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1172         }
1173
1174         fp = FP_DONT_CARE;
1175         if (get_ldev(mdev)) {
1176                 fp = mdev->ldev->dc.fencing;
1177                 put_ldev(mdev);
1178         }
1179
1180         /* Inform userspace about the change... */
1181         drbd_bcast_state(mdev, ns);
1182
1183         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1184             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1185                 drbd_khelper(mdev, "pri-on-incon-degr");
1186
1187         /* Here we have the actions that are performed after a
1188            state change. This function might sleep */
1189
1190         if (fp == FP_STONITH && ns.susp) {
1191                 /* case1: The outdate peer handler is successful:
1192                  * case2: The connection was established again: */
1193                 if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1194                     (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1195                         tl_clear(mdev);
1196                         spin_lock_irq(&mdev->req_lock);
1197                         _drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1198                         spin_unlock_irq(&mdev->req_lock);
1199                 }
1200         }
1201         /* Do not change the order of the if above and the two below... */
1202         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1203                 drbd_send_uuids(mdev);
1204                 drbd_send_state(mdev);
1205         }
1206         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1207                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1208
1209         /* Lost contact to peer's copy of the data */
1210         if ((os.pdsk >= D_INCONSISTENT &&
1211              os.pdsk != D_UNKNOWN &&
1212              os.pdsk != D_OUTDATED)
1213         &&  (ns.pdsk < D_INCONSISTENT ||
1214              ns.pdsk == D_UNKNOWN ||
1215              ns.pdsk == D_OUTDATED)) {
1216                 if (get_ldev(mdev)) {
1217                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1218                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE &&
1219                             !atomic_read(&mdev->new_c_uuid))
1220                                 atomic_set(&mdev->new_c_uuid, 2);
1221                         put_ldev(mdev);
1222                 }
1223         }
1224
1225         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1226                 /* Diskless peer becomes primary or got connected do diskless, primary peer. */
1227                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0 &&
1228                     !atomic_read(&mdev->new_c_uuid))
1229                         atomic_set(&mdev->new_c_uuid, 2);
1230
1231                 /* D_DISKLESS Peer becomes secondary */
1232                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1233                         drbd_al_to_on_disk_bm(mdev);
1234                 put_ldev(mdev);
1235         }
1236
1237         /* Last part of the attaching process ... */
1238         if (ns.conn >= C_CONNECTED &&
1239             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1240                 kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1241                 mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1242                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1243                 drbd_send_uuids(mdev);
1244                 drbd_send_state(mdev);
1245         }
1246
1247         /* We want to pause/continue resync, tell peer. */
1248         if (ns.conn >= C_CONNECTED &&
1249              ((os.aftr_isp != ns.aftr_isp) ||
1250               (os.user_isp != ns.user_isp)))
1251                 drbd_send_state(mdev);
1252
1253         /* In case one of the isp bits got set, suspend other devices. */
1254         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1255             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1256                 suspend_other_sg(mdev);
1257
1258         /* Make sure the peer gets informed about eventual state
1259            changes (ISP bits) while we were in WFReportParams. */
1260         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1261                 drbd_send_state(mdev);
1262
1263         /* We are in the progress to start a full sync... */
1264         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1265             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1266                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1267
1268         /* We are invalidating our self... */
1269         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1270             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1271                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1272
1273         if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1274                 enum drbd_io_error_p eh;
1275
1276                 eh = EP_PASS_ON;
1277                 if (get_ldev_if_state(mdev, D_FAILED)) {
1278                         eh = mdev->ldev->dc.on_io_error;
1279                         put_ldev(mdev);
1280                 }
1281
1282                 drbd_rs_cancel_all(mdev);
1283                 /* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1284                    and it is D_DISKLESS here, local_cnt can only go down, it can
1285                    not increase... It will reach zero */
1286                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1287                 mdev->rs_total = 0;
1288                 mdev->rs_failed = 0;
1289                 atomic_set(&mdev->rs_pending_cnt, 0);
1290
1291                 spin_lock_irq(&mdev->req_lock);
1292                 _drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1293                 spin_unlock_irq(&mdev->req_lock);
1294
1295                 if (eh == EP_CALL_HELPER)
1296                         drbd_khelper(mdev, "local-io-error");
1297         }
1298
1299         if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1300
1301                 if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1302                         if (drbd_send_state(mdev))
1303                                 dev_warn(DEV, "Notified peer that my disk is broken.\n");
1304                         else
1305                                 dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1306                 }
1307
1308                 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1309                 lc_destroy(mdev->resync);
1310                 mdev->resync = NULL;
1311                 lc_destroy(mdev->act_log);
1312                 mdev->act_log = NULL;
1313                 __no_warn(local,
1314                         drbd_free_bc(mdev->ldev);
1315                         mdev->ldev = NULL;);
1316
1317                 if (mdev->md_io_tmpp)
1318                         __free_page(mdev->md_io_tmpp);
1319         }
1320
1321         /* Disks got bigger while they were detached */
1322         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1323             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1324                 if (ns.conn == C_CONNECTED)
1325                         resync_after_online_grow(mdev);
1326         }
1327
1328         /* A resync finished or aborted, wake paused devices... */
1329         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1330             (os.peer_isp && !ns.peer_isp) ||
1331             (os.user_isp && !ns.user_isp))
1332                 resume_next_sg(mdev);
1333
1334         /* Upon network connection, we need to start the receiver */
1335         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1336                 drbd_thread_start(&mdev->receiver);
1337
1338         /* Terminate worker thread if we are unconfigured - it will be
1339            restarted as needed... */
1340         if (ns.disk == D_DISKLESS &&
1341             ns.conn == C_STANDALONE &&
1342             ns.role == R_SECONDARY) {
1343                 if (os.aftr_isp != ns.aftr_isp)
1344                         resume_next_sg(mdev);
1345                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1346                 if (test_bit(DEVICE_DYING, &mdev->flags))
1347                         drbd_thread_stop_nowait(&mdev->worker);
1348         }
1349
1350         drbd_md_sync(mdev);
1351 }
1352
1353 static int w_new_current_uuid(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1354 {
1355         if (get_ldev(mdev)) {
1356                 if (mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1357                         drbd_uuid_new_current(mdev);
1358                         if (get_net_conf(mdev)) {
1359                                 drbd_send_uuids(mdev);
1360                                 put_net_conf(mdev);
1361                         }
1362                         drbd_md_sync(mdev);
1363                 }
1364                 put_ldev(mdev);
1365         }
1366         atomic_dec(&mdev->new_c_uuid);
1367         wake_up(&mdev->misc_wait);
1368
1369         return 1;
1370 }
1371
1372 static int drbd_thread_setup(void *arg)
1373 {
1374         struct drbd_thread *thi = (struct drbd_thread *) arg;
1375         struct drbd_conf *mdev = thi->mdev;
1376         unsigned long flags;
1377         int retval;
1378
1379 restart:
1380         retval = thi->function(thi);
1381
1382         spin_lock_irqsave(&thi->t_lock, flags);
1383
1384         /* if the receiver has been "Exiting", the last thing it did
1385          * was set the conn state to "StandAlone",
1386          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1387          * and receiver thread will be "started".
1388          * drbd_thread_start needs to set "Restarting" in that case.
1389          * t_state check and assignment needs to be within the same spinlock,
1390          * so either thread_start sees Exiting, and can remap to Restarting,
1391          * or thread_start see None, and can proceed as normal.
1392          */
1393
1394         if (thi->t_state == Restarting) {
1395                 dev_info(DEV, "Restarting %s\n", current->comm);
1396                 thi->t_state = Running;
1397                 spin_unlock_irqrestore(&thi->t_lock, flags);
1398                 goto restart;
1399         }
1400
1401         thi->task = NULL;
1402         thi->t_state = None;
1403         smp_mb();
1404         complete(&thi->stop);
1405         spin_unlock_irqrestore(&thi->t_lock, flags);
1406
1407         dev_info(DEV, "Terminating %s\n", current->comm);
1408
1409         /* Release mod reference taken when thread was started */
1410         module_put(THIS_MODULE);
1411         return retval;
1412 }
1413
1414 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1415                       int (*func) (struct drbd_thread *))
1416 {
1417         spin_lock_init(&thi->t_lock);
1418         thi->task    = NULL;
1419         thi->t_state = None;
1420         thi->function = func;
1421         thi->mdev = mdev;
1422 }
1423
1424 int drbd_thread_start(struct drbd_thread *thi)
1425 {
1426         struct drbd_conf *mdev = thi->mdev;
1427         struct task_struct *nt;
1428         unsigned long flags;
1429
1430         const char *me =
1431                 thi == &mdev->receiver ? "receiver" :
1432                 thi == &mdev->asender  ? "asender"  :
1433                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1434
1435         /* is used from state engine doing drbd_thread_stop_nowait,
1436          * while holding the req lock irqsave */
1437         spin_lock_irqsave(&thi->t_lock, flags);
1438
1439         switch (thi->t_state) {
1440         case None:
1441                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1442                                 me, current->comm, current->pid);
1443
1444                 /* Get ref on module for thread - this is released when thread exits */
1445                 if (!try_module_get(THIS_MODULE)) {
1446                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1447                         spin_unlock_irqrestore(&thi->t_lock, flags);
1448                         return FALSE;
1449                 }
1450
1451                 init_completion(&thi->stop);
1452                 D_ASSERT(thi->task == NULL);
1453                 thi->reset_cpu_mask = 1;
1454                 thi->t_state = Running;
1455                 spin_unlock_irqrestore(&thi->t_lock, flags);
1456                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1457
1458                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1459                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1460
1461                 if (IS_ERR(nt)) {
1462                         dev_err(DEV, "Couldn't start thread\n");
1463
1464                         module_put(THIS_MODULE);
1465                         return FALSE;
1466                 }
1467                 spin_lock_irqsave(&thi->t_lock, flags);
1468                 thi->task = nt;
1469                 thi->t_state = Running;
1470                 spin_unlock_irqrestore(&thi->t_lock, flags);
1471                 wake_up_process(nt);
1472                 break;
1473         case Exiting:
1474                 thi->t_state = Restarting;
1475                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1476                                 me, current->comm, current->pid);
1477                 /* fall through */
1478         case Running:
1479         case Restarting:
1480         default:
1481                 spin_unlock_irqrestore(&thi->t_lock, flags);
1482                 break;
1483         }
1484
1485         return TRUE;
1486 }
1487
1488
1489 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1490 {
1491         unsigned long flags;
1492
1493         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1494
1495         /* may be called from state engine, holding the req lock irqsave */
1496         spin_lock_irqsave(&thi->t_lock, flags);
1497
1498         if (thi->t_state == None) {
1499                 spin_unlock_irqrestore(&thi->t_lock, flags);
1500                 if (restart)
1501                         drbd_thread_start(thi);
1502                 return;
1503         }
1504
1505         if (thi->t_state != ns) {
1506                 if (thi->task == NULL) {
1507                         spin_unlock_irqrestore(&thi->t_lock, flags);
1508                         return;
1509                 }
1510
1511                 thi->t_state = ns;
1512                 smp_mb();
1513                 init_completion(&thi->stop);
1514                 if (thi->task != current)
1515                         force_sig(DRBD_SIGKILL, thi->task);
1516
1517         }
1518
1519         spin_unlock_irqrestore(&thi->t_lock, flags);
1520
1521         if (wait)
1522                 wait_for_completion(&thi->stop);
1523 }
1524
1525 #ifdef CONFIG_SMP
1526 /**
1527  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1528  * @mdev:       DRBD device.
1529  *
1530  * Forces all threads of a device onto the same CPU. This is beneficial for
1531  * DRBD's performance. May be overwritten by user's configuration.
1532  */
1533 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1534 {
1535         int ord, cpu;
1536
1537         /* user override. */
1538         if (cpumask_weight(mdev->cpu_mask))
1539                 return;
1540
1541         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1542         for_each_online_cpu(cpu) {
1543                 if (ord-- == 0) {
1544                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1545                         return;
1546                 }
1547         }
1548         /* should not be reached */
1549         cpumask_setall(mdev->cpu_mask);
1550 }
1551
1552 /**
1553  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1554  * @mdev:       DRBD device.
1555  *
1556  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1557  * prematurely.
1558  */
1559 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1560 {
1561         struct task_struct *p = current;
1562         struct drbd_thread *thi =
1563                 p == mdev->asender.task  ? &mdev->asender  :
1564                 p == mdev->receiver.task ? &mdev->receiver :
1565                 p == mdev->worker.task   ? &mdev->worker   :
1566                 NULL;
1567         ERR_IF(thi == NULL)
1568                 return;
1569         if (!thi->reset_cpu_mask)
1570                 return;
1571         thi->reset_cpu_mask = 0;
1572         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1573 }
1574 #endif
1575
1576 /* the appropriate socket mutex must be held already */
1577 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1578                           enum drbd_packets cmd, struct p_header *h,
1579                           size_t size, unsigned msg_flags)
1580 {
1581         int sent, ok;
1582
1583         ERR_IF(!h) return FALSE;
1584         ERR_IF(!size) return FALSE;
1585
1586         h->magic   = BE_DRBD_MAGIC;
1587         h->command = cpu_to_be16(cmd);
1588         h->length  = cpu_to_be16(size-sizeof(struct p_header));
1589
1590         sent = drbd_send(mdev, sock, h, size, msg_flags);
1591
1592         ok = (sent == size);
1593         if (!ok)
1594                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1595                     cmdname(cmd), (int)size, sent);
1596         return ok;
1597 }
1598
1599 /* don't pass the socket. we may only look at it
1600  * when we hold the appropriate socket mutex.
1601  */
1602 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1603                   enum drbd_packets cmd, struct p_header *h, size_t size)
1604 {
1605         int ok = 0;
1606         struct socket *sock;
1607
1608         if (use_data_socket) {
1609                 mutex_lock(&mdev->data.mutex);
1610                 sock = mdev->data.socket;
1611         } else {
1612                 mutex_lock(&mdev->meta.mutex);
1613                 sock = mdev->meta.socket;
1614         }
1615
1616         /* drbd_disconnect() could have called drbd_free_sock()
1617          * while we were waiting in down()... */
1618         if (likely(sock != NULL))
1619                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1620
1621         if (use_data_socket)
1622                 mutex_unlock(&mdev->data.mutex);
1623         else
1624                 mutex_unlock(&mdev->meta.mutex);
1625         return ok;
1626 }
1627
1628 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1629                    size_t size)
1630 {
1631         struct p_header h;
1632         int ok;
1633
1634         h.magic   = BE_DRBD_MAGIC;
1635         h.command = cpu_to_be16(cmd);
1636         h.length  = cpu_to_be16(size);
1637
1638         if (!drbd_get_data_sock(mdev))
1639                 return 0;
1640
1641         ok = (sizeof(h) ==
1642                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1643         ok = ok && (size ==
1644                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1645
1646         drbd_put_data_sock(mdev);
1647
1648         return ok;
1649 }
1650
1651 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1652 {
1653         struct p_rs_param_89 *p;
1654         struct socket *sock;
1655         int size, rv;
1656         const int apv = mdev->agreed_pro_version;
1657
1658         size = apv <= 87 ? sizeof(struct p_rs_param)
1659                 : apv == 88 ? sizeof(struct p_rs_param)
1660                         + strlen(mdev->sync_conf.verify_alg) + 1
1661                 : /* 89 */    sizeof(struct p_rs_param_89);
1662
1663         /* used from admin command context and receiver/worker context.
1664          * to avoid kmalloc, grab the socket right here,
1665          * then use the pre-allocated sbuf there */
1666         mutex_lock(&mdev->data.mutex);
1667         sock = mdev->data.socket;
1668
1669         if (likely(sock != NULL)) {
1670                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1671
1672                 p = &mdev->data.sbuf.rs_param_89;
1673
1674                 /* initialize verify_alg and csums_alg */
1675                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1676
1677                 p->rate = cpu_to_be32(sc->rate);
1678
1679                 if (apv >= 88)
1680                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1681                 if (apv >= 89)
1682                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1683
1684                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1685         } else
1686                 rv = 0; /* not ok */
1687
1688         mutex_unlock(&mdev->data.mutex);
1689
1690         return rv;
1691 }
1692
1693 int drbd_send_protocol(struct drbd_conf *mdev)
1694 {
1695         struct p_protocol *p;
1696         int size, cf, rv;
1697
1698         size = sizeof(struct p_protocol);
1699
1700         if (mdev->agreed_pro_version >= 87)
1701                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1702
1703         /* we must not recurse into our own queue,
1704          * as that is blocked during handshake */
1705         p = kmalloc(size, GFP_NOIO);
1706         if (p == NULL)
1707                 return 0;
1708
1709         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1710         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1711         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1712         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1713         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1714
1715         cf = 0;
1716         if (mdev->net_conf->want_lose)
1717                 cf |= CF_WANT_LOSE;
1718         if (mdev->net_conf->dry_run) {
1719                 if (mdev->agreed_pro_version >= 92)
1720                         cf |= CF_DRY_RUN;
1721                 else {
1722                         dev_err(DEV, "--dry-run is not supported by peer");
1723                         kfree(p);
1724                         return 0;
1725                 }
1726         }
1727         p->conn_flags    = cpu_to_be32(cf);
1728
1729         if (mdev->agreed_pro_version >= 87)
1730                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1731
1732         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1733                            (struct p_header *)p, size);
1734         kfree(p);
1735         return rv;
1736 }
1737
1738 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1739 {
1740         struct p_uuids p;
1741         int i;
1742
1743         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1744                 return 1;
1745
1746         for (i = UI_CURRENT; i < UI_SIZE; i++)
1747                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1748
1749         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1750         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1751         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1752         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1753         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1754         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1755
1756         put_ldev(mdev);
1757
1758         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1759                              (struct p_header *)&p, sizeof(p));
1760 }
1761
1762 int drbd_send_uuids(struct drbd_conf *mdev)
1763 {
1764         return _drbd_send_uuids(mdev, 0);
1765 }
1766
1767 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1768 {
1769         return _drbd_send_uuids(mdev, 8);
1770 }
1771
1772
1773 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1774 {
1775         struct p_rs_uuid p;
1776
1777         p.uuid = cpu_to_be64(val);
1778
1779         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1780                              (struct p_header *)&p, sizeof(p));
1781 }
1782
1783 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1784 {
1785         struct p_sizes p;
1786         sector_t d_size, u_size;
1787         int q_order_type;
1788         int ok;
1789
1790         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1791                 D_ASSERT(mdev->ldev->backing_bdev);
1792                 d_size = drbd_get_max_capacity(mdev->ldev);
1793                 u_size = mdev->ldev->dc.disk_size;
1794                 q_order_type = drbd_queue_order_type(mdev);
1795                 put_ldev(mdev);
1796         } else {
1797                 d_size = 0;
1798                 u_size = 0;
1799                 q_order_type = QUEUE_ORDERED_NONE;
1800         }
1801
1802         p.d_size = cpu_to_be64(d_size);
1803         p.u_size = cpu_to_be64(u_size);
1804         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1805         p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1806         p.queue_order_type = cpu_to_be16(q_order_type);
1807         p.dds_flags = cpu_to_be16(flags);
1808
1809         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1810                            (struct p_header *)&p, sizeof(p));
1811         return ok;
1812 }
1813
1814 /**
1815  * drbd_send_state() - Sends the drbd state to the peer
1816  * @mdev:       DRBD device.
1817  */
1818 int drbd_send_state(struct drbd_conf *mdev)
1819 {
1820         struct socket *sock;
1821         struct p_state p;
1822         int ok = 0;
1823
1824         /* Grab state lock so we wont send state if we're in the middle
1825          * of a cluster wide state change on another thread */
1826         drbd_state_lock(mdev);
1827
1828         mutex_lock(&mdev->data.mutex);
1829
1830         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1831         sock = mdev->data.socket;
1832
1833         if (likely(sock != NULL)) {
1834                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
1835                                     (struct p_header *)&p, sizeof(p), 0);
1836         }
1837
1838         mutex_unlock(&mdev->data.mutex);
1839
1840         drbd_state_unlock(mdev);
1841         return ok;
1842 }
1843
1844 int drbd_send_state_req(struct drbd_conf *mdev,
1845         union drbd_state mask, union drbd_state val)
1846 {
1847         struct p_req_state p;
1848
1849         p.mask    = cpu_to_be32(mask.i);
1850         p.val     = cpu_to_be32(val.i);
1851
1852         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1853                              (struct p_header *)&p, sizeof(p));
1854 }
1855
1856 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1857 {
1858         struct p_req_state_reply p;
1859
1860         p.retcode    = cpu_to_be32(retcode);
1861
1862         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1863                              (struct p_header *)&p, sizeof(p));
1864 }
1865
1866 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1867         struct p_compressed_bm *p,
1868         struct bm_xfer_ctx *c)
1869 {
1870         struct bitstream bs;
1871         unsigned long plain_bits;
1872         unsigned long tmp;
1873         unsigned long rl;
1874         unsigned len;
1875         unsigned toggle;
1876         int bits;
1877
1878         /* may we use this feature? */
1879         if ((mdev->sync_conf.use_rle == 0) ||
1880                 (mdev->agreed_pro_version < 90))
1881                         return 0;
1882
1883         if (c->bit_offset >= c->bm_bits)
1884                 return 0; /* nothing to do. */
1885
1886         /* use at most thus many bytes */
1887         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1888         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1889         /* plain bits covered in this code string */
1890         plain_bits = 0;
1891
1892         /* p->encoding & 0x80 stores whether the first run length is set.
1893          * bit offset is implicit.
1894          * start with toggle == 2 to be able to tell the first iteration */
1895         toggle = 2;
1896
1897         /* see how much plain bits we can stuff into one packet
1898          * using RLE and VLI. */
1899         do {
1900                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1901                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1902                 if (tmp == -1UL)
1903                         tmp = c->bm_bits;
1904                 rl = tmp - c->bit_offset;
1905
1906                 if (toggle == 2) { /* first iteration */
1907                         if (rl == 0) {
1908                                 /* the first checked bit was set,
1909                                  * store start value, */
1910                                 DCBP_set_start(p, 1);
1911                                 /* but skip encoding of zero run length */
1912                                 toggle = !toggle;
1913                                 continue;
1914                         }
1915                         DCBP_set_start(p, 0);
1916                 }
1917
1918                 /* paranoia: catch zero runlength.
1919                  * can only happen if bitmap is modified while we scan it. */
1920                 if (rl == 0) {
1921                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1922                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1923                         return -1;
1924                 }
1925
1926                 bits = vli_encode_bits(&bs, rl);
1927                 if (bits == -ENOBUFS) /* buffer full */
1928                         break;
1929                 if (bits <= 0) {
1930                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1931                         return 0;
1932                 }
1933
1934                 toggle = !toggle;
1935                 plain_bits += rl;
1936                 c->bit_offset = tmp;
1937         } while (c->bit_offset < c->bm_bits);
1938
1939         len = bs.cur.b - p->code + !!bs.cur.bit;
1940
1941         if (plain_bits < (len << 3)) {
1942                 /* incompressible with this method.
1943                  * we need to rewind both word and bit position. */
1944                 c->bit_offset -= plain_bits;
1945                 bm_xfer_ctx_bit_to_word_offset(c);
1946                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1947                 return 0;
1948         }
1949
1950         /* RLE + VLI was able to compress it just fine.
1951          * update c->word_offset. */
1952         bm_xfer_ctx_bit_to_word_offset(c);
1953
1954         /* store pad_bits */
1955         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1956
1957         return len;
1958 }
1959
1960 enum { OK, FAILED, DONE }
1961 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1962         struct p_header *h, struct bm_xfer_ctx *c)
1963 {
1964         struct p_compressed_bm *p = (void*)h;
1965         unsigned long num_words;
1966         int len;
1967         int ok;
1968
1969         len = fill_bitmap_rle_bits(mdev, p, c);
1970
1971         if (len < 0)
1972                 return FAILED;
1973
1974         if (len) {
1975                 DCBP_set_code(p, RLE_VLI_Bits);
1976                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1977                         sizeof(*p) + len, 0);
1978
1979                 c->packets[0]++;
1980                 c->bytes[0] += sizeof(*p) + len;
1981
1982                 if (c->bit_offset >= c->bm_bits)
1983                         len = 0; /* DONE */
1984         } else {
1985                 /* was not compressible.
1986                  * send a buffer full of plain text bits instead. */
1987                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1988                 len = num_words * sizeof(long);
1989                 if (len)
1990                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1991                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1992                                    h, sizeof(struct p_header) + len, 0);
1993                 c->word_offset += num_words;
1994                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1995
1996                 c->packets[1]++;
1997                 c->bytes[1] += sizeof(struct p_header) + len;
1998
1999                 if (c->bit_offset > c->bm_bits)
2000                         c->bit_offset = c->bm_bits;
2001         }
2002         ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2003
2004         if (ok == DONE)
2005                 INFO_bm_xfer_stats(mdev, "send", c);
2006         return ok;
2007 }
2008
2009 /* See the comment at receive_bitmap() */
2010 int _drbd_send_bitmap(struct drbd_conf *mdev)
2011 {
2012         struct bm_xfer_ctx c;
2013         struct p_header *p;
2014         int ret;
2015
2016         ERR_IF(!mdev->bitmap) return FALSE;
2017
2018         /* maybe we should use some per thread scratch page,
2019          * and allocate that during initial device creation? */
2020         p = (struct p_header *) __get_free_page(GFP_NOIO);
2021         if (!p) {
2022                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2023                 return FALSE;
2024         }
2025
2026         if (get_ldev(mdev)) {
2027                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2028                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2029                         drbd_bm_set_all(mdev);
2030                         if (drbd_bm_write(mdev)) {
2031                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2032                                  * but otherwise process as per normal - need to tell other
2033                                  * side that a full resync is required! */
2034                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2035                         } else {
2036                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2037                                 drbd_md_sync(mdev);
2038                         }
2039                 }
2040                 put_ldev(mdev);
2041         }
2042
2043         c = (struct bm_xfer_ctx) {
2044                 .bm_bits = drbd_bm_bits(mdev),
2045                 .bm_words = drbd_bm_words(mdev),
2046         };
2047
2048         do {
2049                 ret = send_bitmap_rle_or_plain(mdev, p, &c);
2050         } while (ret == OK);
2051
2052         free_page((unsigned long) p);
2053         return (ret == DONE);
2054 }
2055
2056 int drbd_send_bitmap(struct drbd_conf *mdev)
2057 {
2058         int err;
2059
2060         if (!drbd_get_data_sock(mdev))
2061                 return -1;
2062         err = !_drbd_send_bitmap(mdev);
2063         drbd_put_data_sock(mdev);
2064         return err;
2065 }
2066
2067 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2068 {
2069         int ok;
2070         struct p_barrier_ack p;
2071
2072         p.barrier  = barrier_nr;
2073         p.set_size = cpu_to_be32(set_size);
2074
2075         if (mdev->state.conn < C_CONNECTED)
2076                 return FALSE;
2077         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2078                         (struct p_header *)&p, sizeof(p));
2079         return ok;
2080 }
2081
2082 /**
2083  * _drbd_send_ack() - Sends an ack packet
2084  * @mdev:       DRBD device.
2085  * @cmd:        Packet command code.
2086  * @sector:     sector, needs to be in big endian byte order
2087  * @blksize:    size in byte, needs to be in big endian byte order
2088  * @block_id:   Id, big endian byte order
2089  */
2090 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2091                           u64 sector,
2092                           u32 blksize,
2093                           u64 block_id)
2094 {
2095         int ok;
2096         struct p_block_ack p;
2097
2098         p.sector   = sector;
2099         p.block_id = block_id;
2100         p.blksize  = blksize;
2101         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2102
2103         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2104                 return FALSE;
2105         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2106                                 (struct p_header *)&p, sizeof(p));
2107         return ok;
2108 }
2109
2110 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2111                      struct p_data *dp)
2112 {
2113         const int header_size = sizeof(struct p_data)
2114                               - sizeof(struct p_header);
2115         int data_size  = ((struct p_header *)dp)->length - header_size;
2116
2117         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2118                               dp->block_id);
2119 }
2120
2121 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2122                      struct p_block_req *rp)
2123 {
2124         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2125 }
2126
2127 /**
2128  * drbd_send_ack() - Sends an ack packet
2129  * @mdev:       DRBD device.
2130  * @cmd:        Packet command code.
2131  * @e:          Epoch entry.
2132  */
2133 int drbd_send_ack(struct drbd_conf *mdev,
2134         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2135 {
2136         return _drbd_send_ack(mdev, cmd,
2137                               cpu_to_be64(e->sector),
2138                               cpu_to_be32(e->size),
2139                               e->block_id);
2140 }
2141
2142 /* This function misuses the block_id field to signal if the blocks
2143  * are is sync or not. */
2144 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2145                      sector_t sector, int blksize, u64 block_id)
2146 {
2147         return _drbd_send_ack(mdev, cmd,
2148                               cpu_to_be64(sector),
2149                               cpu_to_be32(blksize),
2150                               cpu_to_be64(block_id));
2151 }
2152
2153 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2154                        sector_t sector, int size, u64 block_id)
2155 {
2156         int ok;
2157         struct p_block_req p;
2158
2159         p.sector   = cpu_to_be64(sector);
2160         p.block_id = block_id;
2161         p.blksize  = cpu_to_be32(size);
2162
2163         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2164                                 (struct p_header *)&p, sizeof(p));
2165         return ok;
2166 }
2167
2168 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2169                             sector_t sector, int size,
2170                             void *digest, int digest_size,
2171                             enum drbd_packets cmd)
2172 {
2173         int ok;
2174         struct p_block_req p;
2175
2176         p.sector   = cpu_to_be64(sector);
2177         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2178         p.blksize  = cpu_to_be32(size);
2179
2180         p.head.magic   = BE_DRBD_MAGIC;
2181         p.head.command = cpu_to_be16(cmd);
2182         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2183
2184         mutex_lock(&mdev->data.mutex);
2185
2186         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2187         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2188
2189         mutex_unlock(&mdev->data.mutex);
2190
2191         return ok;
2192 }
2193
2194 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2195 {
2196         int ok;
2197         struct p_block_req p;
2198
2199         p.sector   = cpu_to_be64(sector);
2200         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2201         p.blksize  = cpu_to_be32(size);
2202
2203         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2204                            (struct p_header *)&p, sizeof(p));
2205         return ok;
2206 }
2207
2208 static int drbd_send_delay_probe(struct drbd_conf *mdev, struct drbd_socket *ds)
2209 {
2210         struct p_delay_probe dp;
2211         int offset, ok = 0;
2212         struct timeval now;
2213
2214         mutex_lock(&ds->mutex);
2215         if (likely(ds->socket)) {
2216                 do_gettimeofday(&now);
2217                 offset = now.tv_usec - mdev->dps_time.tv_usec +
2218                          (now.tv_sec - mdev->dps_time.tv_sec) * 1000000;
2219                 dp.seq_num  = cpu_to_be32(mdev->delay_seq);
2220                 dp.offset   = cpu_to_be32(offset);
2221
2222                 ok = _drbd_send_cmd(mdev, ds->socket, P_DELAY_PROBE,
2223                                     (struct p_header *)&dp, sizeof(dp), 0);
2224         }
2225         mutex_unlock(&ds->mutex);
2226
2227         return ok;
2228 }
2229
2230 static int drbd_send_delay_probes(struct drbd_conf *mdev)
2231 {
2232         int ok;
2233
2234         mdev->delay_seq++;
2235         do_gettimeofday(&mdev->dps_time);
2236         ok = drbd_send_delay_probe(mdev, &mdev->meta);
2237         ok = ok && drbd_send_delay_probe(mdev, &mdev->data);
2238
2239         mdev->dp_volume_last = mdev->send_cnt;
2240         mod_timer(&mdev->delay_probe_timer, jiffies + mdev->sync_conf.dp_interval * HZ / 10);
2241
2242         return ok;
2243 }
2244
2245 /* called on sndtimeo
2246  * returns FALSE if we should retry,
2247  * TRUE if we think connection is dead
2248  */
2249 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2250 {
2251         int drop_it;
2252         /* long elapsed = (long)(jiffies - mdev->last_received); */
2253
2254         drop_it =   mdev->meta.socket == sock
2255                 || !mdev->asender.task
2256                 || get_t_state(&mdev->asender) != Running
2257                 || mdev->state.conn < C_CONNECTED;
2258
2259         if (drop_it)
2260                 return TRUE;
2261
2262         drop_it = !--mdev->ko_count;
2263         if (!drop_it) {
2264                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2265                        current->comm, current->pid, mdev->ko_count);
2266                 request_ping(mdev);
2267         }
2268
2269         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2270 }
2271
2272 /* The idea of sendpage seems to be to put some kind of reference
2273  * to the page into the skb, and to hand it over to the NIC. In
2274  * this process get_page() gets called.
2275  *
2276  * As soon as the page was really sent over the network put_page()
2277  * gets called by some part of the network layer. [ NIC driver? ]
2278  *
2279  * [ get_page() / put_page() increment/decrement the count. If count
2280  *   reaches 0 the page will be freed. ]
2281  *
2282  * This works nicely with pages from FSs.
2283  * But this means that in protocol A we might signal IO completion too early!
2284  *
2285  * In order not to corrupt data during a resync we must make sure
2286  * that we do not reuse our own buffer pages (EEs) to early, therefore
2287  * we have the net_ee list.
2288  *
2289  * XFS seems to have problems, still, it submits pages with page_count == 0!
2290  * As a workaround, we disable sendpage on pages
2291  * with page_count == 0 or PageSlab.
2292  */
2293 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2294                    int offset, size_t size)
2295 {
2296         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2297         kunmap(page);
2298         if (sent == size)
2299                 mdev->send_cnt += size>>9;
2300         return sent == size;
2301 }
2302
2303 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2304                     int offset, size_t size)
2305 {
2306         mm_segment_t oldfs = get_fs();
2307         int sent, ok;
2308         int len = size;
2309
2310         /* e.g. XFS meta- & log-data is in slab pages, which have a
2311          * page_count of 0 and/or have PageSlab() set.
2312          * we cannot use send_page for those, as that does get_page();
2313          * put_page(); and would cause either a VM_BUG directly, or
2314          * __page_cache_release a page that would actually still be referenced
2315          * by someone, leading to some obscure delayed Oops somewhere else. */
2316         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2317                 return _drbd_no_send_page(mdev, page, offset, size);
2318
2319         drbd_update_congested(mdev);
2320         set_fs(KERNEL_DS);
2321         do {
2322                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2323                                                         offset, len,
2324                                                         MSG_NOSIGNAL);
2325                 if (sent == -EAGAIN) {
2326                         if (we_should_drop_the_connection(mdev,
2327                                                           mdev->data.socket))
2328                                 break;
2329                         else
2330                                 continue;
2331                 }
2332                 if (sent <= 0) {
2333                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2334                              __func__, (int)size, len, sent);
2335                         break;
2336                 }
2337                 len    -= sent;
2338                 offset += sent;
2339         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2340         set_fs(oldfs);
2341         clear_bit(NET_CONGESTED, &mdev->flags);
2342
2343         ok = (len == 0);
2344         if (likely(ok))
2345                 mdev->send_cnt += size>>9;
2346         return ok;
2347 }
2348
2349 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2350 {
2351         struct bio_vec *bvec;
2352         int i;
2353         __bio_for_each_segment(bvec, bio, i, 0) {
2354                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2355                                      bvec->bv_offset, bvec->bv_len))
2356                         return 0;
2357         }
2358         return 1;
2359 }
2360
2361 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2362 {
2363         struct bio_vec *bvec;
2364         int i;
2365         __bio_for_each_segment(bvec, bio, i, 0) {
2366                 if (!_drbd_send_page(mdev, bvec->bv_page,
2367                                      bvec->bv_offset, bvec->bv_len))
2368                         return 0;
2369         }
2370
2371         return 1;
2372 }
2373
2374 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2375 {
2376         struct page *page = e->pages;
2377         unsigned len = e->size;
2378         page_chain_for_each(page) {
2379                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2380                 if (!_drbd_send_page(mdev, page, 0, l))
2381                         return 0;
2382                 len -= l;
2383         }
2384         return 1;
2385 }
2386
2387 static void consider_delay_probes(struct drbd_conf *mdev)
2388 {
2389         if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
2390                 return;
2391
2392         if (mdev->dp_volume_last + mdev->sync_conf.dp_volume * 2 < mdev->send_cnt)
2393                 drbd_send_delay_probes(mdev);
2394 }
2395
2396 static int w_delay_probes(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
2397 {
2398         if (!cancel && mdev->state.conn == C_SYNC_SOURCE)
2399                 drbd_send_delay_probes(mdev);
2400
2401         return 1;
2402 }
2403
2404 static void delay_probe_timer_fn(unsigned long data)
2405 {
2406         struct drbd_conf *mdev = (struct drbd_conf *) data;
2407
2408         if (list_empty(&mdev->delay_probe_work.list))
2409                 drbd_queue_work(&mdev->data.work, &mdev->delay_probe_work);
2410 }
2411
2412 /* Used to send write requests
2413  * R_PRIMARY -> Peer    (P_DATA)
2414  */
2415 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2416 {
2417         int ok = 1;
2418         struct p_data p;
2419         unsigned int dp_flags = 0;
2420         void *dgb;
2421         int dgs;
2422
2423         if (!drbd_get_data_sock(mdev))
2424                 return 0;
2425
2426         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2427                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2428
2429         p.head.magic   = BE_DRBD_MAGIC;
2430         p.head.command = cpu_to_be16(P_DATA);
2431         p.head.length  =
2432                 cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2433
2434         p.sector   = cpu_to_be64(req->sector);
2435         p.block_id = (unsigned long)req;
2436         p.seq_num  = cpu_to_be32(req->seq_num =
2437                                  atomic_add_return(1, &mdev->packet_seq));
2438         dp_flags = 0;
2439
2440         /* NOTE: no need to check if barriers supported here as we would
2441          *       not pass the test in make_request_common in that case
2442          */
2443         if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2444                 dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2445                 /* dp_flags |= DP_HARDBARRIER; */
2446         }
2447         if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2448                 dp_flags |= DP_RW_SYNC;
2449         /* for now handle SYNCIO and UNPLUG
2450          * as if they still were one and the same flag */
2451         if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2452                 dp_flags |= DP_RW_SYNC;
2453         if (mdev->state.conn >= C_SYNC_SOURCE &&
2454             mdev->state.conn <= C_PAUSED_SYNC_T)
2455                 dp_flags |= DP_MAY_SET_IN_SYNC;
2456
2457         p.dp_flags = cpu_to_be32(dp_flags);
2458         set_bit(UNPLUG_REMOTE, &mdev->flags);
2459         ok = (sizeof(p) ==
2460                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2461         if (ok && dgs) {
2462                 dgb = mdev->int_dig_out;
2463                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2464                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2465         }
2466         if (ok) {
2467                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2468                         ok = _drbd_send_bio(mdev, req->master_bio);
2469                 else
2470                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2471         }
2472
2473         drbd_put_data_sock(mdev);
2474
2475         if (ok)
2476                 consider_delay_probes(mdev);
2477
2478         return ok;
2479 }
2480
2481 /* answer packet, used to send data back for read requests:
2482  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2483  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2484  */
2485 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2486                     struct drbd_epoch_entry *e)
2487 {
2488         int ok;
2489         struct p_data p;
2490         void *dgb;
2491         int dgs;
2492
2493         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2494                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2495
2496         p.head.magic   = BE_DRBD_MAGIC;
2497         p.head.command = cpu_to_be16(cmd);
2498         p.head.length  =
2499                 cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2500
2501         p.sector   = cpu_to_be64(e->sector);
2502         p.block_id = e->block_id;
2503         /* p.seq_num  = 0;    No sequence numbers here.. */
2504
2505         /* Only called by our kernel thread.
2506          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2507          * in response to admin command or module unload.
2508          */
2509         if (!drbd_get_data_sock(mdev))
2510                 return 0;
2511
2512         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2513                                         sizeof(p), MSG_MORE);
2514         if (ok && dgs) {
2515                 dgb = mdev->int_dig_out;
2516                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2517                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2518         }
2519         if (ok)
2520                 ok = _drbd_send_zc_ee(mdev, e);
2521
2522         drbd_put_data_sock(mdev);
2523
2524         if (ok)
2525                 consider_delay_probes(mdev);
2526
2527         return ok;
2528 }
2529
2530 /*
2531   drbd_send distinguishes two cases:
2532
2533   Packets sent via the data socket "sock"
2534   and packets sent via the meta data socket "msock"
2535
2536                     sock                      msock
2537   -----------------+-------------------------+------------------------------
2538   timeout           conf.timeout / 2          conf.timeout / 2
2539   timeout action    send a ping via msock     Abort communication
2540                                               and close all sockets
2541 */
2542
2543 /*
2544  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2545  */
2546 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2547               void *buf, size_t size, unsigned msg_flags)
2548 {
2549         struct kvec iov;
2550         struct msghdr msg;
2551         int rv, sent = 0;
2552
2553         if (!sock)
2554                 return -1000;
2555
2556         /* THINK  if (signal_pending) return ... ? */
2557
2558         iov.iov_base = buf;
2559         iov.iov_len  = size;
2560
2561         msg.msg_name       = NULL;
2562         msg.msg_namelen    = 0;
2563         msg.msg_control    = NULL;
2564         msg.msg_controllen = 0;
2565         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2566
2567         if (sock == mdev->data.socket) {
2568                 mdev->ko_count = mdev->net_conf->ko_count;
2569                 drbd_update_congested(mdev);
2570         }
2571         do {
2572                 /* STRANGE
2573                  * tcp_sendmsg does _not_ use its size parameter at all ?
2574                  *
2575                  * -EAGAIN on timeout, -EINTR on signal.
2576                  */
2577 /* THINK
2578  * do we need to block DRBD_SIG if sock == &meta.socket ??
2579  * otherwise wake_asender() might interrupt some send_*Ack !
2580  */
2581                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2582                 if (rv == -EAGAIN) {
2583                         if (we_should_drop_the_connection(mdev, sock))
2584                                 break;
2585                         else
2586                                 continue;
2587                 }
2588                 D_ASSERT(rv != 0);
2589                 if (rv == -EINTR) {
2590                         flush_signals(current);
2591                         rv = 0;
2592                 }
2593                 if (rv < 0)
2594                         break;
2595                 sent += rv;
2596                 iov.iov_base += rv;
2597                 iov.iov_len  -= rv;
2598         } while (sent < size);
2599
2600         if (sock == mdev->data.socket)
2601                 clear_bit(NET_CONGESTED, &mdev->flags);
2602
2603         if (rv <= 0) {
2604                 if (rv != -EAGAIN) {
2605                         dev_err(DEV, "%s_sendmsg returned %d\n",
2606                             sock == mdev->meta.socket ? "msock" : "sock",
2607                             rv);
2608                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2609                 } else
2610                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2611         }
2612
2613         return sent;
2614 }
2615
2616 static int drbd_open(struct block_device *bdev, fmode_t mode)
2617 {
2618         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2619         unsigned long flags;
2620         int rv = 0;
2621
2622         spin_lock_irqsave(&mdev->req_lock, flags);
2623         /* to have a stable mdev->state.role
2624          * and no race with updating open_cnt */
2625
2626         if (mdev->state.role != R_PRIMARY) {
2627                 if (mode & FMODE_WRITE)
2628                         rv = -EROFS;
2629                 else if (!allow_oos)
2630                         rv = -EMEDIUMTYPE;
2631         }
2632
2633         if (!rv)
2634                 mdev->open_cnt++;
2635         spin_unlock_irqrestore(&mdev->req_lock, flags);
2636
2637         return rv;
2638 }
2639
2640 static int drbd_release(struct gendisk *gd, fmode_t mode)
2641 {
2642         struct drbd_conf *mdev = gd->private_data;
2643         mdev->open_cnt--;
2644         return 0;
2645 }
2646
2647 static void drbd_unplug_fn(struct request_queue *q)
2648 {
2649         struct drbd_conf *mdev = q->queuedata;
2650
2651         /* unplug FIRST */
2652         spin_lock_irq(q->queue_lock);
2653         blk_remove_plug(q);
2654         spin_unlock_irq(q->queue_lock);
2655
2656         /* only if connected */
2657         spin_lock_irq(&mdev->req_lock);
2658         if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2659                 D_ASSERT(mdev->state.role == R_PRIMARY);
2660                 if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2661                         /* add to the data.work queue,
2662                          * unless already queued.
2663                          * XXX this might be a good addition to drbd_queue_work
2664                          * anyways, to detect "double queuing" ... */
2665                         if (list_empty(&mdev->unplug_work.list))
2666                                 drbd_queue_work(&mdev->data.work,
2667                                                 &mdev->unplug_work);
2668                 }
2669         }
2670         spin_unlock_irq(&mdev->req_lock);
2671
2672         if (mdev->state.disk >= D_INCONSISTENT)
2673                 drbd_kick_lo(mdev);
2674 }
2675
2676 static void drbd_set_defaults(struct drbd_conf *mdev)
2677 {
2678         mdev->sync_conf.after      = DRBD_AFTER_DEF;
2679         mdev->sync_conf.rate       = DRBD_RATE_DEF;
2680         mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2681         mdev->state = (union drbd_state) {
2682                 { .role = R_SECONDARY,
2683                   .peer = R_UNKNOWN,
2684                   .conn = C_STANDALONE,
2685                   .disk = D_DISKLESS,
2686                   .pdsk = D_UNKNOWN,
2687                   .susp = 0
2688                 } };
2689 }
2690
2691 void drbd_init_set_defaults(struct drbd_conf *mdev)
2692 {
2693         /* the memset(,0,) did most of this.
2694          * note: only assignments, no allocation in here */
2695
2696         drbd_set_defaults(mdev);
2697
2698         /* for now, we do NOT yet support it,
2699          * even though we start some framework
2700          * to eventually support barriers */
2701         set_bit(NO_BARRIER_SUPP, &mdev->flags);
2702
2703         atomic_set(&mdev->ap_bio_cnt, 0);
2704         atomic_set(&mdev->ap_pending_cnt, 0);
2705         atomic_set(&mdev->rs_pending_cnt, 0);
2706         atomic_set(&mdev->unacked_cnt, 0);
2707         atomic_set(&mdev->local_cnt, 0);
2708         atomic_set(&mdev->net_cnt, 0);
2709         atomic_set(&mdev->packet_seq, 0);
2710         atomic_set(&mdev->pp_in_use, 0);
2711         atomic_set(&mdev->new_c_uuid, 0);
2712
2713         mutex_init(&mdev->md_io_mutex);
2714         mutex_init(&mdev->data.mutex);
2715         mutex_init(&mdev->meta.mutex);
2716         sema_init(&mdev->data.work.s, 0);
2717         sema_init(&mdev->meta.work.s, 0);
2718         mutex_init(&mdev->state_mutex);
2719
2720         spin_lock_init(&mdev->data.work.q_lock);
2721         spin_lock_init(&mdev->meta.work.q_lock);
2722
2723         spin_lock_init(&mdev->al_lock);
2724         spin_lock_init(&mdev->req_lock);
2725         spin_lock_init(&mdev->peer_seq_lock);
2726         spin_lock_init(&mdev->epoch_lock);
2727
2728         INIT_LIST_HEAD(&mdev->active_ee);
2729         INIT_LIST_HEAD(&mdev->sync_ee);
2730         INIT_LIST_HEAD(&mdev->done_ee);
2731         INIT_LIST_HEAD(&mdev->read_ee);
2732         INIT_LIST_HEAD(&mdev->net_ee);
2733         INIT_LIST_HEAD(&mdev->resync_reads);
2734         INIT_LIST_HEAD(&mdev->data.work.q);
2735         INIT_LIST_HEAD(&mdev->meta.work.q);
2736         INIT_LIST_HEAD(&mdev->resync_work.list);
2737         INIT_LIST_HEAD(&mdev->unplug_work.list);
2738         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2739         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2740         INIT_LIST_HEAD(&mdev->delay_probes);
2741         INIT_LIST_HEAD(&mdev->delay_probe_work.list);
2742         INIT_LIST_HEAD(&mdev->uuid_work.list);
2743
2744         mdev->resync_work.cb  = w_resync_inactive;
2745         mdev->unplug_work.cb  = w_send_write_hint;
2746         mdev->md_sync_work.cb = w_md_sync;
2747         mdev->bm_io_work.w.cb = w_bitmap_io;
2748         mdev->delay_probe_work.cb = w_delay_probes;
2749         mdev->uuid_work.cb = w_new_current_uuid;
2750         init_timer(&mdev->resync_timer);
2751         init_timer(&mdev->md_sync_timer);
2752         init_timer(&mdev->delay_probe_timer);
2753         mdev->resync_timer.function = resync_timer_fn;
2754         mdev->resync_timer.data = (unsigned long) mdev;
2755         mdev->md_sync_timer.function = md_sync_timer_fn;
2756         mdev->md_sync_timer.data = (unsigned long) mdev;
2757         mdev->delay_probe_timer.function = delay_probe_timer_fn;
2758         mdev->delay_probe_timer.data = (unsigned long) mdev;
2759
2760
2761         init_waitqueue_head(&mdev->misc_wait);
2762         init_waitqueue_head(&mdev->state_wait);
2763         init_waitqueue_head(&mdev->ee_wait);
2764         init_waitqueue_head(&mdev->al_wait);
2765         init_waitqueue_head(&mdev->seq_wait);
2766
2767         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2768         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2769         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2770
2771         mdev->agreed_pro_version = PRO_VERSION_MAX;
2772         mdev->write_ordering = WO_bio_barrier;
2773         mdev->resync_wenr = LC_FREE;
2774 }
2775
2776 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2777 {
2778         if (mdev->receiver.t_state != None)
2779                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2780                                 mdev->receiver.t_state);
2781
2782         /* no need to lock it, I'm the only thread alive */
2783         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2784                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2785         mdev->al_writ_cnt  =
2786         mdev->bm_writ_cnt  =
2787         mdev->read_cnt     =
2788         mdev->recv_cnt     =
2789         mdev->send_cnt     =
2790         mdev->writ_cnt     =
2791         mdev->p_size       =
2792         mdev->rs_start     =
2793         mdev->rs_total     =
2794         mdev->rs_failed    =
2795         mdev->rs_mark_left =
2796         mdev->rs_mark_time = 0;
2797         D_ASSERT(mdev->net_conf == NULL);
2798
2799         drbd_set_my_capacity(mdev, 0);
2800         if (mdev->bitmap) {
2801                 /* maybe never allocated. */
2802                 drbd_bm_resize(mdev, 0, 1);
2803                 drbd_bm_cleanup(mdev);
2804         }
2805
2806         drbd_free_resources(mdev);
2807
2808         /*
2809          * currently we drbd_init_ee only on module load, so
2810          * we may do drbd_release_ee only on module unload!
2811          */
2812         D_ASSERT(list_empty(&mdev->active_ee));
2813         D_ASSERT(list_empty(&mdev->sync_ee));
2814         D_ASSERT(list_empty(&mdev->done_ee));
2815         D_ASSERT(list_empty(&mdev->read_ee));
2816         D_ASSERT(list_empty(&mdev->net_ee));
2817         D_ASSERT(list_empty(&mdev->resync_reads));
2818         D_ASSERT(list_empty(&mdev->data.work.q));
2819         D_ASSERT(list_empty(&mdev->meta.work.q));
2820         D_ASSERT(list_empty(&mdev->resync_work.list));
2821         D_ASSERT(list_empty(&mdev->unplug_work.list));
2822
2823 }
2824
2825
2826 static void drbd_destroy_mempools(void)
2827 {
2828         struct page *page;
2829
2830         while (drbd_pp_pool) {
2831                 page = drbd_pp_pool;
2832                 drbd_pp_pool = (struct page *)page_private(page);
2833                 __free_page(page);
2834                 drbd_pp_vacant--;
2835         }
2836
2837         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2838
2839         if (drbd_ee_mempool)
2840                 mempool_destroy(drbd_ee_mempool);
2841         if (drbd_request_mempool)
2842                 mempool_destroy(drbd_request_mempool);
2843         if (drbd_ee_cache)
2844                 kmem_cache_destroy(drbd_ee_cache);
2845         if (drbd_request_cache)
2846                 kmem_cache_destroy(drbd_request_cache);
2847         if (drbd_bm_ext_cache)
2848                 kmem_cache_destroy(drbd_bm_ext_cache);
2849         if (drbd_al_ext_cache)
2850                 kmem_cache_destroy(drbd_al_ext_cache);
2851
2852         drbd_ee_mempool      = NULL;
2853         drbd_request_mempool = NULL;
2854         drbd_ee_cache        = NULL;
2855         drbd_request_cache   = NULL;
2856         drbd_bm_ext_cache    = NULL;
2857         drbd_al_ext_cache    = NULL;
2858
2859         return;
2860 }
2861
2862 static int drbd_create_mempools(void)
2863 {
2864         struct page *page;
2865         const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2866         int i;
2867
2868         /* prepare our caches and mempools */
2869         drbd_request_mempool = NULL;
2870         drbd_ee_cache        = NULL;
2871         drbd_request_cache   = NULL;
2872         drbd_bm_ext_cache    = NULL;
2873         drbd_al_ext_cache    = NULL;
2874         drbd_pp_pool         = NULL;
2875
2876         /* caches */
2877         drbd_request_cache = kmem_cache_create(
2878                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2879         if (drbd_request_cache == NULL)
2880                 goto Enomem;
2881
2882         drbd_ee_cache = kmem_cache_create(
2883                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2884         if (drbd_ee_cache == NULL)
2885                 goto Enomem;
2886
2887         drbd_bm_ext_cache = kmem_cache_create(
2888                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2889         if (drbd_bm_ext_cache == NULL)
2890                 goto Enomem;
2891
2892         drbd_al_ext_cache = kmem_cache_create(
2893                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2894         if (drbd_al_ext_cache == NULL)
2895                 goto Enomem;
2896
2897         /* mempools */
2898         drbd_request_mempool = mempool_create(number,
2899                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2900         if (drbd_request_mempool == NULL)
2901                 goto Enomem;
2902
2903         drbd_ee_mempool = mempool_create(number,
2904                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2905         if (drbd_request_mempool == NULL)
2906                 goto Enomem;
2907
2908         /* drbd's page pool */
2909         spin_lock_init(&drbd_pp_lock);
2910
2911         for (i = 0; i < number; i++) {
2912                 page = alloc_page(GFP_HIGHUSER);
2913                 if (!page)
2914                         goto Enomem;
2915                 set_page_private(page, (unsigned long)drbd_pp_pool);
2916                 drbd_pp_pool = page;
2917         }
2918         drbd_pp_vacant = number;
2919
2920         return 0;
2921
2922 Enomem:
2923         drbd_destroy_mempools(); /* in case we allocated some */
2924         return -ENOMEM;
2925 }
2926
2927 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2928         void *unused)
2929 {
2930         /* just so we have it.  you never know what interesting things we
2931          * might want to do here some day...
2932          */
2933
2934         return NOTIFY_DONE;
2935 }
2936
2937 static struct notifier_block drbd_notifier = {
2938         .notifier_call = drbd_notify_sys,
2939 };
2940
2941 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2942 {
2943         int rr;
2944
2945         rr = drbd_release_ee(mdev, &mdev->active_ee);
2946         if (rr)
2947                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2948
2949         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2950         if (rr)
2951                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2952
2953         rr = drbd_release_ee(mdev, &mdev->read_ee);
2954         if (rr)
2955                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2956
2957         rr = drbd_release_ee(mdev, &mdev->done_ee);
2958         if (rr)
2959                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2960
2961         rr = drbd_release_ee(mdev, &mdev->net_ee);
2962         if (rr)
2963                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2964 }
2965
2966 /* caution. no locking.
2967  * currently only used from module cleanup code. */
2968 static void drbd_delete_device(unsigned int minor)
2969 {
2970         struct drbd_conf *mdev = minor_to_mdev(minor);
2971
2972         if (!mdev)
2973                 return;
2974
2975         /* paranoia asserts */
2976         if (mdev->open_cnt != 0)
2977                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2978                                 __FILE__ , __LINE__);
2979
2980         ERR_IF (!list_empty(&mdev->data.work.q)) {
2981                 struct list_head *lp;
2982                 list_for_each(lp, &mdev->data.work.q) {
2983                         dev_err(DEV, "lp = %p\n", lp);
2984                 }
2985         };
2986         /* end paranoia asserts */
2987
2988         del_gendisk(mdev->vdisk);
2989
2990         /* cleanup stuff that may have been allocated during
2991          * device (re-)configuration or state changes */
2992
2993         if (mdev->this_bdev)
2994                 bdput(mdev->this_bdev);
2995
2996         drbd_free_resources(mdev);
2997
2998         drbd_release_ee_lists(mdev);
2999
3000         /* should be free'd on disconnect? */
3001         kfree(mdev->ee_hash);
3002         /*
3003         mdev->ee_hash_s = 0;
3004         mdev->ee_hash = NULL;
3005         */
3006
3007         lc_destroy(mdev->act_log);
3008         lc_destroy(mdev->resync);
3009
3010         kfree(mdev->p_uuid);
3011         /* mdev->p_uuid = NULL; */
3012
3013         kfree(mdev->int_dig_out);
3014         kfree(mdev->int_dig_in);
3015         kfree(mdev->int_dig_vv);
3016
3017         /* cleanup the rest that has been
3018          * allocated from drbd_new_device
3019          * and actually free the mdev itself */
3020         drbd_free_mdev(mdev);
3021 }
3022
3023 static void drbd_cleanup(void)
3024 {
3025         unsigned int i;
3026
3027         unregister_reboot_notifier(&drbd_notifier);
3028
3029         drbd_nl_cleanup();
3030
3031         if (minor_table) {
3032                 if (drbd_proc)
3033                         remove_proc_entry("drbd", NULL);
3034                 i = minor_count;
3035                 while (i--)
3036                         drbd_delete_device(i);
3037                 drbd_destroy_mempools();
3038         }
3039
3040         kfree(minor_table);
3041
3042         unregister_blkdev(DRBD_MAJOR, "drbd");
3043
3044         printk(KERN_INFO "drbd: module cleanup done.\n");
3045 }
3046
3047 /**
3048  * drbd_congested() - Callback for pdflush
3049  * @congested_data:     User data
3050  * @bdi_bits:           Bits pdflush is currently interested in
3051  *
3052  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3053  */
3054 static int drbd_congested(void *congested_data, int bdi_bits)
3055 {
3056         struct drbd_conf *mdev = congested_data;
3057         struct request_queue *q;
3058         char reason = '-';
3059         int r = 0;
3060
3061         if (!__inc_ap_bio_cond(mdev)) {
3062                 /* DRBD has frozen IO */
3063                 r = bdi_bits;
3064                 reason = 'd';
3065                 goto out;
3066         }
3067
3068         if (get_ldev(mdev)) {
3069                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3070                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3071                 put_ldev(mdev);
3072                 if (r)
3073                         reason = 'b';
3074         }
3075
3076         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3077                 r |= (1 << BDI_async_congested);
3078                 reason = reason == 'b' ? 'a' : 'n';
3079         }
3080
3081 out:
3082         mdev->congestion_reason = reason;
3083         return r;
3084 }
3085
3086 struct drbd_conf *drbd_new_device(unsigned int minor)
3087 {
3088         struct drbd_conf *mdev;
3089         struct gendisk *disk;
3090         struct request_queue *q;
3091
3092         /* GFP_KERNEL, we are outside of all write-out paths */
3093         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3094         if (!mdev)
3095                 return NULL;
3096         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3097                 goto out_no_cpumask;
3098
3099         mdev->minor = minor;
3100
3101         drbd_init_set_defaults(mdev);
3102
3103         q = blk_alloc_queue(GFP_KERNEL);
3104         if (!q)
3105                 goto out_no_q;
3106         mdev->rq_queue = q;
3107         q->queuedata   = mdev;
3108
3109         disk = alloc_disk(1);
3110         if (!disk)
3111                 goto out_no_disk;
3112         mdev->vdisk = disk;
3113
3114         set_disk_ro(disk, TRUE);
3115
3116         disk->queue = q;
3117         disk->major = DRBD_MAJOR;
3118         disk->first_minor = minor;
3119         disk->fops = &drbd_ops;
3120         sprintf(disk->disk_name, "drbd%d", minor);
3121         disk->private_data = mdev;
3122
3123         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3124         /* we have no partitions. we contain only ourselves. */
3125         mdev->this_bdev->bd_contains = mdev->this_bdev;
3126
3127         q->backing_dev_info.congested_fn = drbd_congested;
3128         q->backing_dev_info.congested_data = mdev;
3129
3130         blk_queue_make_request(q, drbd_make_request_26);
3131         blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3132         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3133         blk_queue_merge_bvec(q, drbd_merge_bvec);
3134         q->queue_lock = &mdev->req_lock; /* needed since we use */
3135                 /* plugging on a queue, that actually has no requests! */
3136         q->unplug_fn = drbd_unplug_fn;
3137
3138         mdev->md_io_page = alloc_page(GFP_KERNEL);
3139         if (!mdev->md_io_page)
3140                 goto out_no_io_page;
3141
3142         if (drbd_bm_init(mdev))
3143                 goto out_no_bitmap;
3144         /* no need to lock access, we are still initializing this minor device. */
3145         if (!tl_init(mdev))
3146                 goto out_no_tl;
3147
3148         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3149         if (!mdev->app_reads_hash)
3150                 goto out_no_app_reads;
3151
3152         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3153         if (!mdev->current_epoch)
3154                 goto out_no_epoch;
3155
3156         INIT_LIST_HEAD(&mdev->current_epoch->list);
3157         mdev->epochs = 1;
3158
3159         return mdev;
3160
3161 /* out_whatever_else:
3162         kfree(mdev->current_epoch); */
3163 out_no_epoch:
3164         kfree(mdev->app_reads_hash);
3165 out_no_app_reads:
3166         tl_cleanup(mdev);
3167 out_no_tl:
3168         drbd_bm_cleanup(mdev);
3169 out_no_bitmap:
3170         __free_page(mdev->md_io_page);
3171 out_no_io_page:
3172         put_disk(disk);
3173 out_no_disk:
3174         blk_cleanup_queue(q);
3175 out_no_q:
3176         free_cpumask_var(mdev->cpu_mask);
3177 out_no_cpumask:
3178         kfree(mdev);
3179         return NULL;
3180 }
3181
3182 /* counterpart of drbd_new_device.
3183  * last part of drbd_delete_device. */
3184 void drbd_free_mdev(struct drbd_conf *mdev)
3185 {
3186         kfree(mdev->current_epoch);
3187         kfree(mdev->app_reads_hash);
3188         tl_cleanup(mdev);
3189         if (mdev->bitmap) /* should no longer be there. */
3190                 drbd_bm_cleanup(mdev);
3191         __free_page(mdev->md_io_page);
3192         put_disk(mdev->vdisk);
3193         blk_cleanup_queue(mdev->rq_queue);
3194         free_cpumask_var(mdev->cpu_mask);
3195         kfree(mdev);
3196 }
3197
3198
3199 int __init drbd_init(void)
3200 {
3201         int err;
3202
3203         if (sizeof(struct p_handshake) != 80) {
3204                 printk(KERN_ERR
3205                        "drbd: never change the size or layout "
3206                        "of the HandShake packet.\n");
3207                 return -EINVAL;
3208         }
3209
3210         if (1 > minor_count || minor_count > 255) {
3211                 printk(KERN_ERR
3212                         "drbd: invalid minor_count (%d)\n", minor_count);
3213 #ifdef MODULE
3214                 return -EINVAL;
3215 #else
3216                 minor_count = 8;
3217 #endif
3218         }
3219
3220         err = drbd_nl_init();
3221         if (err)
3222                 return err;
3223
3224         err = register_blkdev(DRBD_MAJOR, "drbd");
3225         if (err) {
3226                 printk(KERN_ERR
3227                        "drbd: unable to register block device major %d\n",
3228                        DRBD_MAJOR);
3229                 return err;
3230         }
3231
3232         register_reboot_notifier(&drbd_notifier);
3233
3234         /*
3235          * allocate all necessary structs
3236          */
3237         err = -ENOMEM;
3238
3239         init_waitqueue_head(&drbd_pp_wait);
3240
3241         drbd_proc = NULL; /* play safe for drbd_cleanup */
3242         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3243                                 GFP_KERNEL);
3244         if (!minor_table)
3245                 goto Enomem;
3246
3247         err = drbd_create_mempools();
3248         if (err)
3249                 goto Enomem;
3250
3251         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3252         if (!drbd_proc) {
3253                 printk(KERN_ERR "drbd: unable to register proc file\n");
3254                 goto Enomem;
3255         }
3256
3257         rwlock_init(&global_state_lock);
3258
3259         printk(KERN_INFO "drbd: initialized. "
3260                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3261                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3262         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3263         printk(KERN_INFO "drbd: registered as block device major %d\n",
3264                 DRBD_MAJOR);
3265         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3266
3267         return 0; /* Success! */
3268
3269 Enomem:
3270         drbd_cleanup();
3271         if (err == -ENOMEM)
3272                 /* currently always the case */
3273                 printk(KERN_ERR "drbd: ran out of memory\n");
3274         else
3275                 printk(KERN_ERR "drbd: initialization failure\n");
3276         return err;
3277 }
3278
3279 void drbd_free_bc(struct drbd_backing_dev *ldev)
3280 {
3281         if (ldev == NULL)
3282                 return;
3283
3284         bd_release(ldev->backing_bdev);
3285         bd_release(ldev->md_bdev);
3286
3287         fput(ldev->lo_file);
3288         fput(ldev->md_file);
3289
3290         kfree(ldev);
3291 }
3292
3293 void drbd_free_sock(struct drbd_conf *mdev)
3294 {
3295         if (mdev->data.socket) {
3296                 mutex_lock(&mdev->data.mutex);
3297                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3298                 sock_release(mdev->data.socket);
3299                 mdev->data.socket = NULL;
3300                 mutex_unlock(&mdev->data.mutex);
3301         }
3302         if (mdev->meta.socket) {
3303                 mutex_lock(&mdev->meta.mutex);
3304                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3305                 sock_release(mdev->meta.socket);
3306                 mdev->meta.socket = NULL;
3307                 mutex_unlock(&mdev->meta.mutex);
3308         }
3309 }
3310
3311
3312 void drbd_free_resources(struct drbd_conf *mdev)
3313 {
3314         crypto_free_hash(mdev->csums_tfm);
3315         mdev->csums_tfm = NULL;
3316         crypto_free_hash(mdev->verify_tfm);
3317         mdev->verify_tfm = NULL;
3318         crypto_free_hash(mdev->cram_hmac_tfm);
3319         mdev->cram_hmac_tfm = NULL;
3320         crypto_free_hash(mdev->integrity_w_tfm);
3321         mdev->integrity_w_tfm = NULL;
3322         crypto_free_hash(mdev->integrity_r_tfm);
3323         mdev->integrity_r_tfm = NULL;
3324
3325         drbd_free_sock(mdev);
3326
3327         __no_warn(local,
3328                   drbd_free_bc(mdev->ldev);
3329                   mdev->ldev = NULL;);
3330 }
3331
3332 /* meta data management */
3333
3334 struct meta_data_on_disk {
3335         u64 la_size;           /* last agreed size. */
3336         u64 uuid[UI_SIZE];   /* UUIDs. */
3337         u64 device_uuid;
3338         u64 reserved_u64_1;
3339         u32 flags;             /* MDF */
3340         u32 magic;
3341         u32 md_size_sect;
3342         u32 al_offset;         /* offset to this block */
3343         u32 al_nr_extents;     /* important for restoring the AL */
3344               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3345         u32 bm_offset;         /* offset to the bitmap, from here */
3346         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3347         u32 reserved_u32[4];
3348
3349 } __packed;
3350
3351 /**
3352  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3353  * @mdev:       DRBD device.
3354  */
3355 void drbd_md_sync(struct drbd_conf *mdev)
3356 {
3357         struct meta_data_on_disk *buffer;
3358         sector_t sector;
3359         int i;
3360
3361         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3362                 return;
3363         del_timer(&mdev->md_sync_timer);
3364
3365         /* We use here D_FAILED and not D_ATTACHING because we try to write
3366          * metadata even if we detach due to a disk failure! */
3367         if (!get_ldev_if_state(mdev, D_FAILED))
3368                 return;
3369
3370         mutex_lock(&mdev->md_io_mutex);
3371         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3372         memset(buffer, 0, 512);
3373
3374         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3375         for (i = UI_CURRENT; i < UI_SIZE; i++)
3376                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3377         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3378         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3379
3380         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3381         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3382         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3383         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3384         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3385
3386         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3387
3388         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3389         sector = mdev->ldev->md.md_offset;
3390
3391         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3392                 clear_bit(MD_DIRTY, &mdev->flags);
3393         } else {
3394                 /* this was a try anyways ... */
3395                 dev_err(DEV, "meta data update failed!\n");
3396
3397                 drbd_chk_io_error(mdev, 1, TRUE);
3398         }
3399
3400         /* Update mdev->ldev->md.la_size_sect,
3401          * since we updated it on metadata. */
3402         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3403
3404         mutex_unlock(&mdev->md_io_mutex);
3405         put_ldev(mdev);
3406 }
3407
3408 /**
3409  * drbd_md_read() - Reads in the meta data super block
3410  * @mdev:       DRBD device.
3411  * @bdev:       Device from which the meta data should be read in.
3412  *
3413  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3414  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3415  */
3416 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3417 {
3418         struct meta_data_on_disk *buffer;
3419         int i, rv = NO_ERROR;
3420
3421         if (!get_ldev_if_state(mdev, D_ATTACHING))
3422                 return ERR_IO_MD_DISK;
3423
3424         mutex_lock(&mdev->md_io_mutex);
3425         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3426
3427         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3428                 /* NOTE: cant do normal error processing here as this is
3429                    called BEFORE disk is attached */
3430                 dev_err(DEV, "Error while reading metadata.\n");
3431                 rv = ERR_IO_MD_DISK;
3432                 goto err;
3433         }
3434
3435         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3436                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3437                 rv = ERR_MD_INVALID;
3438                 goto err;
3439         }
3440         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3441                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3442                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3443                 rv = ERR_MD_INVALID;
3444                 goto err;
3445         }
3446         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3447                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3448                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3449                 rv = ERR_MD_INVALID;
3450                 goto err;
3451         }
3452         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3453                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3454                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3455                 rv = ERR_MD_INVALID;
3456                 goto err;
3457         }
3458
3459         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3460                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3461                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3462                 rv = ERR_MD_INVALID;
3463                 goto err;
3464         }
3465
3466         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3467         for (i = UI_CURRENT; i < UI_SIZE; i++)
3468                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3469         bdev->md.flags = be32_to_cpu(buffer->flags);
3470         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3471         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3472
3473         if (mdev->sync_conf.al_extents < 7)
3474                 mdev->sync_conf.al_extents = 127;
3475
3476  err:
3477         mutex_unlock(&mdev->md_io_mutex);
3478         put_ldev(mdev);
3479
3480         return rv;
3481 }
3482
3483 /**
3484  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3485  * @mdev:       DRBD device.
3486  *
3487  * Call this function if you change anything that should be written to
3488  * the meta-data super block. This function sets MD_DIRTY, and starts a
3489  * timer that ensures that within five seconds you have to call drbd_md_sync().
3490  */
3491 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3492 {
3493         set_bit(MD_DIRTY, &mdev->flags);
3494         mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3495 }
3496
3497
3498 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3499 {
3500         int i;
3501
3502         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3503                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3504 }
3505
3506 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3507 {
3508         if (idx == UI_CURRENT) {
3509                 if (mdev->state.role == R_PRIMARY)
3510                         val |= 1;
3511                 else
3512                         val &= ~((u64)1);
3513
3514                 drbd_set_ed_uuid(mdev, val);
3515         }
3516
3517         mdev->ldev->md.uuid[idx] = val;
3518         drbd_md_mark_dirty(mdev);
3519 }
3520
3521
3522 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3523 {
3524         if (mdev->ldev->md.uuid[idx]) {
3525                 drbd_uuid_move_history(mdev);
3526                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3527         }
3528         _drbd_uuid_set(mdev, idx, val);
3529 }
3530
3531 /**
3532  * drbd_uuid_new_current() - Creates a new current UUID
3533  * @mdev:       DRBD device.
3534  *
3535  * Creates a new current UUID, and rotates the old current UUID into
3536  * the bitmap slot. Causes an incremental resync upon next connect.
3537  */
3538 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3539 {
3540         u64 val;
3541
3542         dev_info(DEV, "Creating new current UUID\n");
3543         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3544         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3545
3546         get_random_bytes(&val, sizeof(u64));
3547         _drbd_uuid_set(mdev, UI_CURRENT, val);
3548 }
3549
3550 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3551 {
3552         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3553                 return;
3554
3555         if (val == 0) {
3556                 drbd_uuid_move_history(mdev);
3557                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3558                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3559         } else {
3560                 if (mdev->ldev->md.uuid[UI_BITMAP])
3561                         dev_warn(DEV, "bm UUID already set");
3562
3563                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3564                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3565
3566         }
3567         drbd_md_mark_dirty(mdev);
3568 }
3569
3570 /**
3571  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3572  * @mdev:       DRBD device.
3573  *
3574  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3575  */
3576 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3577 {
3578         int rv = -EIO;
3579
3580         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3581                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3582                 drbd_md_sync(mdev);
3583                 drbd_bm_set_all(mdev);
3584
3585                 rv = drbd_bm_write(mdev);
3586
3587                 if (!rv) {
3588                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3589                         drbd_md_sync(mdev);
3590                 }
3591
3592                 put_ldev(mdev);
3593         }
3594
3595         return rv;
3596 }
3597
3598 /**
3599  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3600  * @mdev:       DRBD device.
3601  *
3602  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3603  */
3604 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3605 {
3606         int rv = -EIO;
3607
3608         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3609                 drbd_bm_clear_all(mdev);
3610                 rv = drbd_bm_write(mdev);
3611                 put_ldev(mdev);
3612         }
3613
3614         return rv;
3615 }
3616
3617 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3618 {
3619         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3620         int rv;
3621
3622         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3623
3624         drbd_bm_lock(mdev, work->why);
3625         rv = work->io_fn(mdev);
3626         drbd_bm_unlock(mdev);
3627
3628         clear_bit(BITMAP_IO, &mdev->flags);
3629         wake_up(&mdev->misc_wait);
3630
3631         if (work->done)
3632                 work->done(mdev, rv);
3633
3634         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3635         work->why = NULL;
3636
3637         return 1;
3638 }
3639
3640 /**
3641  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3642  * @mdev:       DRBD device.
3643  * @io_fn:      IO callback to be called when bitmap IO is possible
3644  * @done:       callback to be called after the bitmap IO was performed
3645  * @why:        Descriptive text of the reason for doing the IO
3646  *
3647  * While IO on the bitmap happens we freeze application IO thus we ensure
3648  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3649  * called from worker context. It MUST NOT be used while a previous such
3650  * work is still pending!
3651  */
3652 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3653                           int (*io_fn)(struct drbd_conf *),
3654                           void (*done)(struct drbd_conf *, int),
3655                           char *why)
3656 {
3657         D_ASSERT(current == mdev->worker.task);
3658
3659         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3660         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3661         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3662         if (mdev->bm_io_work.why)
3663                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3664                         why, mdev->bm_io_work.why);
3665
3666         mdev->bm_io_work.io_fn = io_fn;
3667         mdev->bm_io_work.done = done;
3668         mdev->bm_io_work.why = why;
3669
3670         set_bit(BITMAP_IO, &mdev->flags);
3671         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3672                 if (list_empty(&mdev->bm_io_work.w.list)) {
3673                         set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3674                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3675                 } else
3676                         dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3677         }
3678 }
3679
3680 /**
3681  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3682  * @mdev:       DRBD device.
3683  * @io_fn:      IO callback to be called when bitmap IO is possible
3684  * @why:        Descriptive text of the reason for doing the IO
3685  *
3686  * freezes application IO while that the actual IO operations runs. This
3687  * functions MAY NOT be called from worker context.
3688  */
3689 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3690 {
3691         int rv;
3692
3693         D_ASSERT(current != mdev->worker.task);
3694
3695         drbd_suspend_io(mdev);
3696
3697         drbd_bm_lock(mdev, why);
3698         rv = io_fn(mdev);
3699         drbd_bm_unlock(mdev);
3700
3701         drbd_resume_io(mdev);
3702
3703         return rv;
3704 }
3705
3706 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3707 {
3708         if ((mdev->ldev->md.flags & flag) != flag) {
3709                 drbd_md_mark_dirty(mdev);
3710                 mdev->ldev->md.flags |= flag;
3711         }
3712 }
3713
3714 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3715 {
3716         if ((mdev->ldev->md.flags & flag) != 0) {
3717                 drbd_md_mark_dirty(mdev);
3718                 mdev->ldev->md.flags &= ~flag;
3719         }
3720 }
3721 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3722 {
3723         return (bdev->md.flags & flag) != 0;
3724 }
3725
3726 static void md_sync_timer_fn(unsigned long data)
3727 {
3728         struct drbd_conf *mdev = (struct drbd_conf *) data;
3729
3730         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3731 }
3732
3733 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3734 {
3735         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3736         drbd_md_sync(mdev);
3737
3738         return 1;
3739 }
3740
3741 #ifdef CONFIG_DRBD_FAULT_INJECTION
3742 /* Fault insertion support including random number generator shamelessly
3743  * stolen from kernel/rcutorture.c */
3744 struct fault_random_state {
3745         unsigned long state;
3746         unsigned long count;
3747 };
3748
3749 #define FAULT_RANDOM_MULT 39916801  /* prime */
3750 #define FAULT_RANDOM_ADD        479001701 /* prime */
3751 #define FAULT_RANDOM_REFRESH 10000
3752
3753 /*
3754  * Crude but fast random-number generator.  Uses a linear congruential
3755  * generator, with occasional help from get_random_bytes().
3756  */
3757 static unsigned long
3758 _drbd_fault_random(struct fault_random_state *rsp)
3759 {
3760         long refresh;
3761
3762         if (!rsp->count--) {
3763                 get_random_bytes(&refresh, sizeof(refresh));
3764                 rsp->state += refresh;
3765                 rsp->count = FAULT_RANDOM_REFRESH;
3766         }
3767         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3768         return swahw32(rsp->state);
3769 }
3770
3771 static char *
3772 _drbd_fault_str(unsigned int type) {
3773         static char *_faults[] = {
3774                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3775                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3776                 [DRBD_FAULT_RS_WR] = "Resync write",
3777                 [DRBD_FAULT_RS_RD] = "Resync read",
3778                 [DRBD_FAULT_DT_WR] = "Data write",
3779                 [DRBD_FAULT_DT_RD] = "Data read",
3780                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3781                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3782                 [DRBD_FAULT_AL_EE] = "EE allocation",
3783                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3784         };
3785
3786         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3787 }
3788
3789 unsigned int
3790 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3791 {
3792         static struct fault_random_state rrs = {0, 0};
3793
3794         unsigned int ret = (
3795                 (fault_devs == 0 ||
3796                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3797                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3798
3799         if (ret) {
3800                 fault_count++;
3801
3802                 if (printk_ratelimit())
3803                         dev_warn(DEV, "***Simulating %s failure\n",
3804                                 _drbd_fault_str(type));
3805         }
3806
3807         return ret;
3808 }
3809 #endif
3810
3811 const char *drbd_buildtag(void)
3812 {
3813         /* DRBD built from external sources has here a reference to the
3814            git hash of the source code. */
3815
3816         static char buildtag[38] = "\0uilt-in";
3817
3818         if (buildtag[0] == 0) {
3819 #ifdef CONFIG_MODULES
3820                 if (THIS_MODULE != NULL)
3821                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3822                 else
3823 #endif
3824                         buildtag[0] = 'b';
3825         }
3826
3827         return buildtag;
3828 }
3829
3830 module_init(drbd_init)
3831 module_exit(drbd_cleanup)
3832
3833 EXPORT_SYMBOL(drbd_conn_str);
3834 EXPORT_SYMBOL(drbd_role_str);
3835 EXPORT_SYMBOL(drbd_disk_str);
3836 EXPORT_SYMBOL(drbd_set_st_err_str);