drbd: drop wrong debug asserts, fix recently introduced race
[pandora-kernel.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 struct after_state_chg_work {
60         struct drbd_work w;
61         union drbd_state os;
62         union drbd_state ns;
63         enum chg_state_flags flags;
64         struct completion *done;
65 };
66
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76                            union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81
82 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
83               "Lars Ellenberg <lars@linbit.com>");
84 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
85 MODULE_VERSION(REL_VERSION);
86 MODULE_LICENSE("GPL");
87 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
88 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
89
90 #include <linux/moduleparam.h>
91 /* allow_open_on_secondary */
92 MODULE_PARM_DESC(allow_oos, "DONT USE!");
93 /* thanks to these macros, if compiled into the kernel (not-module),
94  * this becomes the boot parameter drbd.minor_count */
95 module_param(minor_count, uint, 0444);
96 module_param(disable_sendpage, bool, 0644);
97 module_param(allow_oos, bool, 0);
98 module_param(cn_idx, uint, 0444);
99 module_param(proc_details, int, 0644);
100
101 #ifdef CONFIG_DRBD_FAULT_INJECTION
102 int enable_faults;
103 int fault_rate;
104 static int fault_count;
105 int fault_devs;
106 /* bitmap of enabled faults */
107 module_param(enable_faults, int, 0664);
108 /* fault rate % value - applies to all enabled faults */
109 module_param(fault_rate, int, 0664);
110 /* count of faults inserted */
111 module_param(fault_count, int, 0664);
112 /* bitmap of devices to insert faults on */
113 module_param(fault_devs, int, 0644);
114 #endif
115
116 /* module parameter, defined */
117 unsigned int minor_count = 32;
118 int disable_sendpage;
119 int allow_oos;
120 unsigned int cn_idx = CN_IDX_DRBD;
121 int proc_details;       /* Detail level in proc drbd*/
122
123 /* Module parameter for setting the user mode helper program
124  * to run. Default is /sbin/drbdadm */
125 char usermode_helper[80] = "/sbin/drbdadm";
126
127 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
128
129 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
130  * as member "struct gendisk *vdisk;"
131  */
132 struct drbd_conf **minor_table;
133
134 struct kmem_cache *drbd_request_cache;
135 struct kmem_cache *drbd_ee_cache;       /* epoch entries */
136 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
137 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
138 mempool_t *drbd_request_mempool;
139 mempool_t *drbd_ee_mempool;
140
141 /* I do not use a standard mempool, because:
142    1) I want to hand out the pre-allocated objects first.
143    2) I want to be able to interrupt sleeping allocation with a signal.
144    Note: This is a single linked list, the next pointer is the private
145          member of struct page.
146  */
147 struct page *drbd_pp_pool;
148 spinlock_t   drbd_pp_lock;
149 int          drbd_pp_vacant;
150 wait_queue_head_t drbd_pp_wait;
151
152 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
153
154 static const struct block_device_operations drbd_ops = {
155         .owner =   THIS_MODULE,
156         .open =    drbd_open,
157         .release = drbd_release,
158 };
159
160 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
161
162 #ifdef __CHECKER__
163 /* When checking with sparse, and this is an inline function, sparse will
164    give tons of false positives. When this is a real functions sparse works.
165  */
166 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
167 {
168         int io_allowed;
169
170         atomic_inc(&mdev->local_cnt);
171         io_allowed = (mdev->state.disk >= mins);
172         if (!io_allowed) {
173                 if (atomic_dec_and_test(&mdev->local_cnt))
174                         wake_up(&mdev->misc_wait);
175         }
176         return io_allowed;
177 }
178
179 #endif
180
181 /**
182  * DOC: The transfer log
183  *
184  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
185  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
186  * of the list. There is always at least one &struct drbd_tl_epoch object.
187  *
188  * Each &struct drbd_tl_epoch has a circular double linked list of requests
189  * attached.
190  */
191 static int tl_init(struct drbd_conf *mdev)
192 {
193         struct drbd_tl_epoch *b;
194
195         /* during device minor initialization, we may well use GFP_KERNEL */
196         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
197         if (!b)
198                 return 0;
199         INIT_LIST_HEAD(&b->requests);
200         INIT_LIST_HEAD(&b->w.list);
201         b->next = NULL;
202         b->br_number = 4711;
203         b->n_writes = 0;
204         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
205
206         mdev->oldest_tle = b;
207         mdev->newest_tle = b;
208         INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
209
210         mdev->tl_hash = NULL;
211         mdev->tl_hash_s = 0;
212
213         return 1;
214 }
215
216 static void tl_cleanup(struct drbd_conf *mdev)
217 {
218         D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
219         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
220         kfree(mdev->oldest_tle);
221         mdev->oldest_tle = NULL;
222         kfree(mdev->unused_spare_tle);
223         mdev->unused_spare_tle = NULL;
224         kfree(mdev->tl_hash);
225         mdev->tl_hash = NULL;
226         mdev->tl_hash_s = 0;
227 }
228
229 /**
230  * _tl_add_barrier() - Adds a barrier to the transfer log
231  * @mdev:       DRBD device.
232  * @new:        Barrier to be added before the current head of the TL.
233  *
234  * The caller must hold the req_lock.
235  */
236 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
237 {
238         struct drbd_tl_epoch *newest_before;
239
240         INIT_LIST_HEAD(&new->requests);
241         INIT_LIST_HEAD(&new->w.list);
242         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
243         new->next = NULL;
244         new->n_writes = 0;
245
246         newest_before = mdev->newest_tle;
247         /* never send a barrier number == 0, because that is special-cased
248          * when using TCQ for our write ordering code */
249         new->br_number = (newest_before->br_number+1) ?: 1;
250         if (mdev->newest_tle != new) {
251                 mdev->newest_tle->next = new;
252                 mdev->newest_tle = new;
253         }
254 }
255
256 /**
257  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
258  * @mdev:       DRBD device.
259  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
260  * @set_size:   Expected number of requests before that barrier.
261  *
262  * In case the passed barrier_nr or set_size does not match the oldest
263  * &struct drbd_tl_epoch objects this function will cause a termination
264  * of the connection.
265  */
266 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
267                        unsigned int set_size)
268 {
269         struct drbd_tl_epoch *b, *nob; /* next old barrier */
270         struct list_head *le, *tle;
271         struct drbd_request *r;
272
273         spin_lock_irq(&mdev->req_lock);
274
275         b = mdev->oldest_tle;
276
277         /* first some paranoia code */
278         if (b == NULL) {
279                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
280                         barrier_nr);
281                 goto bail;
282         }
283         if (b->br_number != barrier_nr) {
284                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
285                         barrier_nr, b->br_number);
286                 goto bail;
287         }
288         if (b->n_writes != set_size) {
289                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
290                         barrier_nr, set_size, b->n_writes);
291                 goto bail;
292         }
293
294         /* Clean up list of requests processed during current epoch */
295         list_for_each_safe(le, tle, &b->requests) {
296                 r = list_entry(le, struct drbd_request, tl_requests);
297                 _req_mod(r, barrier_acked);
298         }
299         /* There could be requests on the list waiting for completion
300            of the write to the local disk. To avoid corruptions of
301            slab's data structures we have to remove the lists head.
302
303            Also there could have been a barrier ack out of sequence, overtaking
304            the write acks - which would be a bug and violating write ordering.
305            To not deadlock in case we lose connection while such requests are
306            still pending, we need some way to find them for the
307            _req_mode(connection_lost_while_pending).
308
309            These have been list_move'd to the out_of_sequence_requests list in
310            _req_mod(, barrier_acked) above.
311            */
312         list_del_init(&b->requests);
313
314         nob = b->next;
315         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
316                 _tl_add_barrier(mdev, b);
317                 if (nob)
318                         mdev->oldest_tle = nob;
319                 /* if nob == NULL b was the only barrier, and becomes the new
320                    barrier. Therefore mdev->oldest_tle points already to b */
321         } else {
322                 D_ASSERT(nob != NULL);
323                 mdev->oldest_tle = nob;
324                 kfree(b);
325         }
326
327         spin_unlock_irq(&mdev->req_lock);
328         dec_ap_pending(mdev);
329
330         return;
331
332 bail:
333         spin_unlock_irq(&mdev->req_lock);
334         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
335 }
336
337 /**
338  * _tl_restart() - Walks the transfer log, and applies an action to all requests
339  * @mdev:       DRBD device.
340  * @what:       The action/event to perform with all request objects
341  *
342  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
343  * restart_frozen_disk_io.
344  */
345 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
346 {
347         struct drbd_tl_epoch *b, *tmp, **pn;
348         struct list_head *le, *tle, carry_reads;
349         struct drbd_request *req;
350         int rv, n_writes, n_reads;
351
352         b = mdev->oldest_tle;
353         pn = &mdev->oldest_tle;
354         while (b) {
355                 n_writes = 0;
356                 n_reads = 0;
357                 INIT_LIST_HEAD(&carry_reads);
358                 list_for_each_safe(le, tle, &b->requests) {
359                         req = list_entry(le, struct drbd_request, tl_requests);
360                         rv = _req_mod(req, what);
361
362                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
363                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
364                 }
365                 tmp = b->next;
366
367                 if (n_writes) {
368                         if (what == resend) {
369                                 b->n_writes = n_writes;
370                                 if (b->w.cb == NULL) {
371                                         b->w.cb = w_send_barrier;
372                                         inc_ap_pending(mdev);
373                                         set_bit(CREATE_BARRIER, &mdev->flags);
374                                 }
375
376                                 drbd_queue_work(&mdev->data.work, &b->w);
377                         }
378                         pn = &b->next;
379                 } else {
380                         if (n_reads)
381                                 list_add(&carry_reads, &b->requests);
382                         /* there could still be requests on that ring list,
383                          * in case local io is still pending */
384                         list_del(&b->requests);
385
386                         /* dec_ap_pending corresponding to queue_barrier.
387                          * the newest barrier may not have been queued yet,
388                          * in which case w.cb is still NULL. */
389                         if (b->w.cb != NULL)
390                                 dec_ap_pending(mdev);
391
392                         if (b == mdev->newest_tle) {
393                                 /* recycle, but reinit! */
394                                 D_ASSERT(tmp == NULL);
395                                 INIT_LIST_HEAD(&b->requests);
396                                 list_splice(&carry_reads, &b->requests);
397                                 INIT_LIST_HEAD(&b->w.list);
398                                 b->w.cb = NULL;
399                                 b->br_number = net_random();
400                                 b->n_writes = 0;
401
402                                 *pn = b;
403                                 break;
404                         }
405                         *pn = tmp;
406                         kfree(b);
407                 }
408                 b = tmp;
409                 list_splice(&carry_reads, &b->requests);
410         }
411 }
412
413
414 /**
415  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
416  * @mdev:       DRBD device.
417  *
418  * This is called after the connection to the peer was lost. The storage covered
419  * by the requests on the transfer gets marked as our of sync. Called from the
420  * receiver thread and the worker thread.
421  */
422 void tl_clear(struct drbd_conf *mdev)
423 {
424         struct list_head *le, *tle;
425         struct drbd_request *r;
426
427         spin_lock_irq(&mdev->req_lock);
428
429         _tl_restart(mdev, connection_lost_while_pending);
430
431         /* we expect this list to be empty. */
432         D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
433
434         /* but just in case, clean it up anyways! */
435         list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
436                 r = list_entry(le, struct drbd_request, tl_requests);
437                 /* It would be nice to complete outside of spinlock.
438                  * But this is easier for now. */
439                 _req_mod(r, connection_lost_while_pending);
440         }
441
442         /* ensure bit indicating barrier is required is clear */
443         clear_bit(CREATE_BARRIER, &mdev->flags);
444
445         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
446
447         spin_unlock_irq(&mdev->req_lock);
448 }
449
450 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
451 {
452         spin_lock_irq(&mdev->req_lock);
453         _tl_restart(mdev, what);
454         spin_unlock_irq(&mdev->req_lock);
455 }
456
457 /**
458  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
459  * @mdev:       DRBD device.
460  * @os:         old (current) state.
461  * @ns:         new (wanted) state.
462  */
463 static int cl_wide_st_chg(struct drbd_conf *mdev,
464                           union drbd_state os, union drbd_state ns)
465 {
466         return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
467                  ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
468                   (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
469                   (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
470                   (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
471                 (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
472                 (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
473 }
474
475 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
476                       union drbd_state mask, union drbd_state val)
477 {
478         unsigned long flags;
479         union drbd_state os, ns;
480         int rv;
481
482         spin_lock_irqsave(&mdev->req_lock, flags);
483         os = mdev->state;
484         ns.i = (os.i & ~mask.i) | val.i;
485         rv = _drbd_set_state(mdev, ns, f, NULL);
486         ns = mdev->state;
487         spin_unlock_irqrestore(&mdev->req_lock, flags);
488
489         return rv;
490 }
491
492 /**
493  * drbd_force_state() - Impose a change which happens outside our control on our state
494  * @mdev:       DRBD device.
495  * @mask:       mask of state bits to change.
496  * @val:        value of new state bits.
497  */
498 void drbd_force_state(struct drbd_conf *mdev,
499         union drbd_state mask, union drbd_state val)
500 {
501         drbd_change_state(mdev, CS_HARD, mask, val);
502 }
503
504 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
505 static int is_valid_state_transition(struct drbd_conf *,
506                                      union drbd_state, union drbd_state);
507 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
508                                        union drbd_state ns, const char **warn_sync_abort);
509 int drbd_send_state_req(struct drbd_conf *,
510                         union drbd_state, union drbd_state);
511
512 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
513                                     union drbd_state mask, union drbd_state val)
514 {
515         union drbd_state os, ns;
516         unsigned long flags;
517         int rv;
518
519         if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
520                 return SS_CW_SUCCESS;
521
522         if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
523                 return SS_CW_FAILED_BY_PEER;
524
525         rv = 0;
526         spin_lock_irqsave(&mdev->req_lock, flags);
527         os = mdev->state;
528         ns.i = (os.i & ~mask.i) | val.i;
529         ns = sanitize_state(mdev, os, ns, NULL);
530
531         if (!cl_wide_st_chg(mdev, os, ns))
532                 rv = SS_CW_NO_NEED;
533         if (!rv) {
534                 rv = is_valid_state(mdev, ns);
535                 if (rv == SS_SUCCESS) {
536                         rv = is_valid_state_transition(mdev, ns, os);
537                         if (rv == SS_SUCCESS)
538                                 rv = 0; /* cont waiting, otherwise fail. */
539                 }
540         }
541         spin_unlock_irqrestore(&mdev->req_lock, flags);
542
543         return rv;
544 }
545
546 /**
547  * drbd_req_state() - Perform an eventually cluster wide state change
548  * @mdev:       DRBD device.
549  * @mask:       mask of state bits to change.
550  * @val:        value of new state bits.
551  * @f:          flags
552  *
553  * Should not be called directly, use drbd_request_state() or
554  * _drbd_request_state().
555  */
556 static int drbd_req_state(struct drbd_conf *mdev,
557                           union drbd_state mask, union drbd_state val,
558                           enum chg_state_flags f)
559 {
560         struct completion done;
561         unsigned long flags;
562         union drbd_state os, ns;
563         int rv;
564
565         init_completion(&done);
566
567         if (f & CS_SERIALIZE)
568                 mutex_lock(&mdev->state_mutex);
569
570         spin_lock_irqsave(&mdev->req_lock, flags);
571         os = mdev->state;
572         ns.i = (os.i & ~mask.i) | val.i;
573         ns = sanitize_state(mdev, os, ns, NULL);
574
575         if (cl_wide_st_chg(mdev, os, ns)) {
576                 rv = is_valid_state(mdev, ns);
577                 if (rv == SS_SUCCESS)
578                         rv = is_valid_state_transition(mdev, ns, os);
579                 spin_unlock_irqrestore(&mdev->req_lock, flags);
580
581                 if (rv < SS_SUCCESS) {
582                         if (f & CS_VERBOSE)
583                                 print_st_err(mdev, os, ns, rv);
584                         goto abort;
585                 }
586
587                 drbd_state_lock(mdev);
588                 if (!drbd_send_state_req(mdev, mask, val)) {
589                         drbd_state_unlock(mdev);
590                         rv = SS_CW_FAILED_BY_PEER;
591                         if (f & CS_VERBOSE)
592                                 print_st_err(mdev, os, ns, rv);
593                         goto abort;
594                 }
595
596                 wait_event(mdev->state_wait,
597                         (rv = _req_st_cond(mdev, mask, val)));
598
599                 if (rv < SS_SUCCESS) {
600                         drbd_state_unlock(mdev);
601                         if (f & CS_VERBOSE)
602                                 print_st_err(mdev, os, ns, rv);
603                         goto abort;
604                 }
605                 spin_lock_irqsave(&mdev->req_lock, flags);
606                 os = mdev->state;
607                 ns.i = (os.i & ~mask.i) | val.i;
608                 rv = _drbd_set_state(mdev, ns, f, &done);
609                 drbd_state_unlock(mdev);
610         } else {
611                 rv = _drbd_set_state(mdev, ns, f, &done);
612         }
613
614         spin_unlock_irqrestore(&mdev->req_lock, flags);
615
616         if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
617                 D_ASSERT(current != mdev->worker.task);
618                 wait_for_completion(&done);
619         }
620
621 abort:
622         if (f & CS_SERIALIZE)
623                 mutex_unlock(&mdev->state_mutex);
624
625         return rv;
626 }
627
628 /**
629  * _drbd_request_state() - Request a state change (with flags)
630  * @mdev:       DRBD device.
631  * @mask:       mask of state bits to change.
632  * @val:        value of new state bits.
633  * @f:          flags
634  *
635  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
636  * flag, or when logging of failed state change requests is not desired.
637  */
638 int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
639                         union drbd_state val,   enum chg_state_flags f)
640 {
641         int rv;
642
643         wait_event(mdev->state_wait,
644                    (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
645
646         return rv;
647 }
648
649 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
650 {
651         dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
652             name,
653             drbd_conn_str(ns.conn),
654             drbd_role_str(ns.role),
655             drbd_role_str(ns.peer),
656             drbd_disk_str(ns.disk),
657             drbd_disk_str(ns.pdsk),
658             is_susp(ns) ? 's' : 'r',
659             ns.aftr_isp ? 'a' : '-',
660             ns.peer_isp ? 'p' : '-',
661             ns.user_isp ? 'u' : '-'
662             );
663 }
664
665 void print_st_err(struct drbd_conf *mdev,
666         union drbd_state os, union drbd_state ns, int err)
667 {
668         if (err == SS_IN_TRANSIENT_STATE)
669                 return;
670         dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
671         print_st(mdev, " state", os);
672         print_st(mdev, "wanted", ns);
673 }
674
675
676 #define drbd_peer_str drbd_role_str
677 #define drbd_pdsk_str drbd_disk_str
678
679 #define drbd_susp_str(A)     ((A) ? "1" : "0")
680 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
681 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
682 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
683
684 #define PSC(A) \
685         ({ if (ns.A != os.A) { \
686                 pbp += sprintf(pbp, #A "( %s -> %s ) ", \
687                               drbd_##A##_str(os.A), \
688                               drbd_##A##_str(ns.A)); \
689         } })
690
691 /**
692  * is_valid_state() - Returns an SS_ error code if ns is not valid
693  * @mdev:       DRBD device.
694  * @ns:         State to consider.
695  */
696 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
697 {
698         /* See drbd_state_sw_errors in drbd_strings.c */
699
700         enum drbd_fencing_p fp;
701         int rv = SS_SUCCESS;
702
703         fp = FP_DONT_CARE;
704         if (get_ldev(mdev)) {
705                 fp = mdev->ldev->dc.fencing;
706                 put_ldev(mdev);
707         }
708
709         if (get_net_conf(mdev)) {
710                 if (!mdev->net_conf->two_primaries &&
711                     ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
712                         rv = SS_TWO_PRIMARIES;
713                 put_net_conf(mdev);
714         }
715
716         if (rv <= 0)
717                 /* already found a reason to abort */;
718         else if (ns.role == R_SECONDARY && mdev->open_cnt)
719                 rv = SS_DEVICE_IN_USE;
720
721         else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
722                 rv = SS_NO_UP_TO_DATE_DISK;
723
724         else if (fp >= FP_RESOURCE &&
725                  ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
726                 rv = SS_PRIMARY_NOP;
727
728         else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
729                 rv = SS_NO_UP_TO_DATE_DISK;
730
731         else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
732                 rv = SS_NO_LOCAL_DISK;
733
734         else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
735                 rv = SS_NO_REMOTE_DISK;
736
737         else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
738                 rv = SS_NO_UP_TO_DATE_DISK;
739
740         else if ((ns.conn == C_CONNECTED ||
741                   ns.conn == C_WF_BITMAP_S ||
742                   ns.conn == C_SYNC_SOURCE ||
743                   ns.conn == C_PAUSED_SYNC_S) &&
744                   ns.disk == D_OUTDATED)
745                 rv = SS_CONNECTED_OUTDATES;
746
747         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
748                  (mdev->sync_conf.verify_alg[0] == 0))
749                 rv = SS_NO_VERIFY_ALG;
750
751         else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
752                   mdev->agreed_pro_version < 88)
753                 rv = SS_NOT_SUPPORTED;
754
755         return rv;
756 }
757
758 /**
759  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
760  * @mdev:       DRBD device.
761  * @ns:         new state.
762  * @os:         old state.
763  */
764 static int is_valid_state_transition(struct drbd_conf *mdev,
765                                      union drbd_state ns, union drbd_state os)
766 {
767         int rv = SS_SUCCESS;
768
769         if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
770             os.conn > C_CONNECTED)
771                 rv = SS_RESYNC_RUNNING;
772
773         if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
774                 rv = SS_ALREADY_STANDALONE;
775
776         if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
777                 rv = SS_IS_DISKLESS;
778
779         if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
780                 rv = SS_NO_NET_CONFIG;
781
782         if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
783                 rv = SS_LOWER_THAN_OUTDATED;
784
785         if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
786                 rv = SS_IN_TRANSIENT_STATE;
787
788         if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
789                 rv = SS_IN_TRANSIENT_STATE;
790
791         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
792                 rv = SS_NEED_CONNECTION;
793
794         if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
795             ns.conn != os.conn && os.conn > C_CONNECTED)
796                 rv = SS_RESYNC_RUNNING;
797
798         if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
799             os.conn < C_CONNECTED)
800                 rv = SS_NEED_CONNECTION;
801
802         return rv;
803 }
804
805 /**
806  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
807  * @mdev:       DRBD device.
808  * @os:         old state.
809  * @ns:         new state.
810  * @warn_sync_abort:
811  *
812  * When we loose connection, we have to set the state of the peers disk (pdsk)
813  * to D_UNKNOWN. This rule and many more along those lines are in this function.
814  */
815 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
816                                        union drbd_state ns, const char **warn_sync_abort)
817 {
818         enum drbd_fencing_p fp;
819
820         fp = FP_DONT_CARE;
821         if (get_ldev(mdev)) {
822                 fp = mdev->ldev->dc.fencing;
823                 put_ldev(mdev);
824         }
825
826         /* Disallow Network errors to configure a device's network part */
827         if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
828             os.conn <= C_DISCONNECTING)
829                 ns.conn = os.conn;
830
831         /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
832          * If you try to go into some Sync* state, that shall fail (elsewhere). */
833         if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
834             ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
835                 ns.conn = os.conn;
836
837         /* After C_DISCONNECTING only C_STANDALONE may follow */
838         if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
839                 ns.conn = os.conn;
840
841         if (ns.conn < C_CONNECTED) {
842                 ns.peer_isp = 0;
843                 ns.peer = R_UNKNOWN;
844                 if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
845                         ns.pdsk = D_UNKNOWN;
846         }
847
848         /* Clear the aftr_isp when becoming unconfigured */
849         if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
850                 ns.aftr_isp = 0;
851
852         /* Abort resync if a disk fails/detaches */
853         if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
854             (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
855                 if (warn_sync_abort)
856                         *warn_sync_abort =
857                                 os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
858                                 "Online-verify" : "Resync";
859                 ns.conn = C_CONNECTED;
860         }
861
862         if (ns.conn >= C_CONNECTED &&
863             ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
864              (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
865                 switch (ns.conn) {
866                 case C_WF_BITMAP_T:
867                 case C_PAUSED_SYNC_T:
868                         ns.disk = D_OUTDATED;
869                         break;
870                 case C_CONNECTED:
871                 case C_WF_BITMAP_S:
872                 case C_SYNC_SOURCE:
873                 case C_PAUSED_SYNC_S:
874                         ns.disk = D_UP_TO_DATE;
875                         break;
876                 case C_SYNC_TARGET:
877                         ns.disk = D_INCONSISTENT;
878                         dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
879                         break;
880                 }
881                 if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
882                         dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
883         }
884
885         if (ns.conn >= C_CONNECTED &&
886             (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
887                 switch (ns.conn) {
888                 case C_CONNECTED:
889                 case C_WF_BITMAP_T:
890                 case C_PAUSED_SYNC_T:
891                 case C_SYNC_TARGET:
892                         ns.pdsk = D_UP_TO_DATE;
893                         break;
894                 case C_WF_BITMAP_S:
895                 case C_PAUSED_SYNC_S:
896                         /* remap any consistent state to D_OUTDATED,
897                          * but disallow "upgrade" of not even consistent states.
898                          */
899                         ns.pdsk =
900                                 (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
901                                 ? os.pdsk : D_OUTDATED;
902                         break;
903                 case C_SYNC_SOURCE:
904                         ns.pdsk = D_INCONSISTENT;
905                         dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
906                         break;
907                 }
908                 if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
909                         dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
910         }
911
912         /* Connection breaks down before we finished "Negotiating" */
913         if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
914             get_ldev_if_state(mdev, D_NEGOTIATING)) {
915                 if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
916                         ns.disk = mdev->new_state_tmp.disk;
917                         ns.pdsk = mdev->new_state_tmp.pdsk;
918                 } else {
919                         dev_alert(DEV, "Connection lost while negotiating, no data!\n");
920                         ns.disk = D_DISKLESS;
921                         ns.pdsk = D_UNKNOWN;
922                 }
923                 put_ldev(mdev);
924         }
925
926         if (fp == FP_STONITH &&
927             (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
928             !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
929                 ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
930
931         if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
932             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
933             !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
934                 ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
935
936         if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
937                 if (ns.conn == C_SYNC_SOURCE)
938                         ns.conn = C_PAUSED_SYNC_S;
939                 if (ns.conn == C_SYNC_TARGET)
940                         ns.conn = C_PAUSED_SYNC_T;
941         } else {
942                 if (ns.conn == C_PAUSED_SYNC_S)
943                         ns.conn = C_SYNC_SOURCE;
944                 if (ns.conn == C_PAUSED_SYNC_T)
945                         ns.conn = C_SYNC_TARGET;
946         }
947
948         return ns;
949 }
950
951 /* helper for __drbd_set_state */
952 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
953 {
954         if (cs == C_VERIFY_T) {
955                 /* starting online verify from an arbitrary position
956                  * does not fit well into the existing protocol.
957                  * on C_VERIFY_T, we initialize ov_left and friends
958                  * implicitly in receive_DataRequest once the
959                  * first P_OV_REQUEST is received */
960                 mdev->ov_start_sector = ~(sector_t)0;
961         } else {
962                 unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
963                 if (bit >= mdev->rs_total)
964                         mdev->ov_start_sector =
965                                 BM_BIT_TO_SECT(mdev->rs_total - 1);
966                 mdev->ov_position = mdev->ov_start_sector;
967         }
968 }
969
970 static void drbd_resume_al(struct drbd_conf *mdev)
971 {
972         if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
973                 dev_info(DEV, "Resumed AL updates\n");
974 }
975
976 /**
977  * __drbd_set_state() - Set a new DRBD state
978  * @mdev:       DRBD device.
979  * @ns:         new state.
980  * @flags:      Flags
981  * @done:       Optional completion, that will get completed after the after_state_ch() finished
982  *
983  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
984  */
985 int __drbd_set_state(struct drbd_conf *mdev,
986                     union drbd_state ns, enum chg_state_flags flags,
987                     struct completion *done)
988 {
989         union drbd_state os;
990         int rv = SS_SUCCESS;
991         const char *warn_sync_abort = NULL;
992         struct after_state_chg_work *ascw;
993
994         os = mdev->state;
995
996         ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
997
998         if (ns.i == os.i)
999                 return SS_NOTHING_TO_DO;
1000
1001         if (!(flags & CS_HARD)) {
1002                 /*  pre-state-change checks ; only look at ns  */
1003                 /* See drbd_state_sw_errors in drbd_strings.c */
1004
1005                 rv = is_valid_state(mdev, ns);
1006                 if (rv < SS_SUCCESS) {
1007                         /* If the old state was illegal as well, then let
1008                            this happen...*/
1009
1010                         if (is_valid_state(mdev, os) == rv)
1011                                 rv = is_valid_state_transition(mdev, ns, os);
1012                 } else
1013                         rv = is_valid_state_transition(mdev, ns, os);
1014         }
1015
1016         if (rv < SS_SUCCESS) {
1017                 if (flags & CS_VERBOSE)
1018                         print_st_err(mdev, os, ns, rv);
1019                 return rv;
1020         }
1021
1022         if (warn_sync_abort)
1023                 dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1024
1025         {
1026                 char *pbp, pb[300];
1027                 pbp = pb;
1028                 *pbp = 0;
1029                 PSC(role);
1030                 PSC(peer);
1031                 PSC(conn);
1032                 PSC(disk);
1033                 PSC(pdsk);
1034                 if (is_susp(ns) != is_susp(os))
1035                         pbp += sprintf(pbp, "susp( %s -> %s ) ",
1036                                        drbd_susp_str(is_susp(os)),
1037                                        drbd_susp_str(is_susp(ns)));
1038                 PSC(aftr_isp);
1039                 PSC(peer_isp);
1040                 PSC(user_isp);
1041                 dev_info(DEV, "%s\n", pb);
1042         }
1043
1044         /* solve the race between becoming unconfigured,
1045          * worker doing the cleanup, and
1046          * admin reconfiguring us:
1047          * on (re)configure, first set CONFIG_PENDING,
1048          * then wait for a potentially exiting worker,
1049          * start the worker, and schedule one no_op.
1050          * then proceed with configuration.
1051          */
1052         if (ns.disk == D_DISKLESS &&
1053             ns.conn == C_STANDALONE &&
1054             ns.role == R_SECONDARY &&
1055             !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1056                 set_bit(DEVICE_DYING, &mdev->flags);
1057
1058         mdev->state.i = ns.i;
1059         wake_up(&mdev->misc_wait);
1060         wake_up(&mdev->state_wait);
1061
1062         /* aborted verify run. log the last position */
1063         if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1064             ns.conn < C_CONNECTED) {
1065                 mdev->ov_start_sector =
1066                         BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1067                 dev_info(DEV, "Online Verify reached sector %llu\n",
1068                         (unsigned long long)mdev->ov_start_sector);
1069         }
1070
1071         if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1072             (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1073                 dev_info(DEV, "Syncer continues.\n");
1074                 mdev->rs_paused += (long)jiffies
1075                                   -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1076                 if (ns.conn == C_SYNC_TARGET)
1077                         mod_timer(&mdev->resync_timer, jiffies);
1078         }
1079
1080         if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1081             (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1082                 dev_info(DEV, "Resync suspended\n");
1083                 mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1084         }
1085
1086         if (os.conn == C_CONNECTED &&
1087             (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1088                 unsigned long now = jiffies;
1089                 int i;
1090
1091                 mdev->ov_position = 0;
1092                 mdev->rs_total = drbd_bm_bits(mdev);
1093                 if (mdev->agreed_pro_version >= 90)
1094                         set_ov_position(mdev, ns.conn);
1095                 else
1096                         mdev->ov_start_sector = 0;
1097                 mdev->ov_left = mdev->rs_total
1098                               - BM_SECT_TO_BIT(mdev->ov_position);
1099                 mdev->rs_start = now;
1100                 mdev->rs_last_events = 0;
1101                 mdev->rs_last_sect_ev = 0;
1102                 mdev->ov_last_oos_size = 0;
1103                 mdev->ov_last_oos_start = 0;
1104
1105                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1106                         mdev->rs_mark_left[i] = mdev->rs_total;
1107                         mdev->rs_mark_time[i] = now;
1108                 }
1109
1110                 if (ns.conn == C_VERIFY_S) {
1111                         dev_info(DEV, "Starting Online Verify from sector %llu\n",
1112                                         (unsigned long long)mdev->ov_position);
1113                         mod_timer(&mdev->resync_timer, jiffies);
1114                 }
1115         }
1116
1117         if (get_ldev(mdev)) {
1118                 u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1119                                                  MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1120                                                  MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1121
1122                 if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1123                         mdf |= MDF_CRASHED_PRIMARY;
1124                 if (mdev->state.role == R_PRIMARY ||
1125                     (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1126                         mdf |= MDF_PRIMARY_IND;
1127                 if (mdev->state.conn > C_WF_REPORT_PARAMS)
1128                         mdf |= MDF_CONNECTED_IND;
1129                 if (mdev->state.disk > D_INCONSISTENT)
1130                         mdf |= MDF_CONSISTENT;
1131                 if (mdev->state.disk > D_OUTDATED)
1132                         mdf |= MDF_WAS_UP_TO_DATE;
1133                 if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1134                         mdf |= MDF_PEER_OUT_DATED;
1135                 if (mdf != mdev->ldev->md.flags) {
1136                         mdev->ldev->md.flags = mdf;
1137                         drbd_md_mark_dirty(mdev);
1138                 }
1139                 if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1140                         drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1141                 put_ldev(mdev);
1142         }
1143
1144         /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1145         if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1146             os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1147                 set_bit(CONSIDER_RESYNC, &mdev->flags);
1148
1149         /* Receiver should clean up itself */
1150         if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1151                 drbd_thread_stop_nowait(&mdev->receiver);
1152
1153         /* Now the receiver finished cleaning up itself, it should die */
1154         if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1155                 drbd_thread_stop_nowait(&mdev->receiver);
1156
1157         /* Upon network failure, we need to restart the receiver. */
1158         if (os.conn > C_TEAR_DOWN &&
1159             ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1160                 drbd_thread_restart_nowait(&mdev->receiver);
1161
1162         /* Resume AL writing if we get a connection */
1163         if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1164                 drbd_resume_al(mdev);
1165
1166         ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1167         if (ascw) {
1168                 ascw->os = os;
1169                 ascw->ns = ns;
1170                 ascw->flags = flags;
1171                 ascw->w.cb = w_after_state_ch;
1172                 ascw->done = done;
1173                 drbd_queue_work(&mdev->data.work, &ascw->w);
1174         } else {
1175                 dev_warn(DEV, "Could not kmalloc an ascw\n");
1176         }
1177
1178         return rv;
1179 }
1180
1181 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1182 {
1183         struct after_state_chg_work *ascw =
1184                 container_of(w, struct after_state_chg_work, w);
1185         after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1186         if (ascw->flags & CS_WAIT_COMPLETE) {
1187                 D_ASSERT(ascw->done != NULL);
1188                 complete(ascw->done);
1189         }
1190         kfree(ascw);
1191
1192         return 1;
1193 }
1194
1195 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1196 {
1197         if (rv) {
1198                 dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1199                 _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1200                 return;
1201         }
1202
1203         switch (mdev->state.conn) {
1204         case C_STARTING_SYNC_T:
1205                 _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1206                 break;
1207         case C_STARTING_SYNC_S:
1208                 drbd_start_resync(mdev, C_SYNC_SOURCE);
1209                 break;
1210         }
1211 }
1212
1213 /**
1214  * after_state_ch() - Perform after state change actions that may sleep
1215  * @mdev:       DRBD device.
1216  * @os:         old state.
1217  * @ns:         new state.
1218  * @flags:      Flags
1219  */
1220 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1221                            union drbd_state ns, enum chg_state_flags flags)
1222 {
1223         enum drbd_fencing_p fp;
1224         enum drbd_req_event what = nothing;
1225         union drbd_state nsm = (union drbd_state){ .i = -1 };
1226
1227         if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1228                 clear_bit(CRASHED_PRIMARY, &mdev->flags);
1229                 if (mdev->p_uuid)
1230                         mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1231         }
1232
1233         fp = FP_DONT_CARE;
1234         if (get_ldev(mdev)) {
1235                 fp = mdev->ldev->dc.fencing;
1236                 put_ldev(mdev);
1237         }
1238
1239         /* Inform userspace about the change... */
1240         drbd_bcast_state(mdev, ns);
1241
1242         if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1243             (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1244                 drbd_khelper(mdev, "pri-on-incon-degr");
1245
1246         /* Here we have the actions that are performed after a
1247            state change. This function might sleep */
1248
1249         nsm.i = -1;
1250         if (ns.susp_nod) {
1251                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1252                         if (ns.conn == C_CONNECTED)
1253                                 what = resend, nsm.susp_nod = 0;
1254                         else /* ns.conn > C_CONNECTED */
1255                                 dev_err(DEV, "Unexpected Resynd going on!\n");
1256                 }
1257
1258                 if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1259                         what = restart_frozen_disk_io, nsm.susp_nod = 0;
1260
1261         }
1262
1263         if (ns.susp_fen) {
1264                 /* case1: The outdate peer handler is successful: */
1265                 if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1266                         tl_clear(mdev);
1267                         if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1268                                 drbd_uuid_new_current(mdev);
1269                                 clear_bit(NEW_CUR_UUID, &mdev->flags);
1270                                 drbd_md_sync(mdev);
1271                         }
1272                         spin_lock_irq(&mdev->req_lock);
1273                         _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1274                         spin_unlock_irq(&mdev->req_lock);
1275                 }
1276                 /* case2: The connection was established again: */
1277                 if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1278                         clear_bit(NEW_CUR_UUID, &mdev->flags);
1279                         what = resend;
1280                         nsm.susp_fen = 0;
1281                 }
1282         }
1283
1284         if (what != nothing) {
1285                 spin_lock_irq(&mdev->req_lock);
1286                 _tl_restart(mdev, what);
1287                 nsm.i &= mdev->state.i;
1288                 _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1289                 spin_unlock_irq(&mdev->req_lock);
1290         }
1291
1292         /* Do not change the order of the if above and the two below... */
1293         if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1294                 drbd_send_uuids(mdev);
1295                 drbd_send_state(mdev);
1296         }
1297         if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1298                 drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1299
1300         /* Lost contact to peer's copy of the data */
1301         if ((os.pdsk >= D_INCONSISTENT &&
1302              os.pdsk != D_UNKNOWN &&
1303              os.pdsk != D_OUTDATED)
1304         &&  (ns.pdsk < D_INCONSISTENT ||
1305              ns.pdsk == D_UNKNOWN ||
1306              ns.pdsk == D_OUTDATED)) {
1307                 if (get_ldev(mdev)) {
1308                         if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1309                             mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1310                                 if (is_susp(mdev->state)) {
1311                                         set_bit(NEW_CUR_UUID, &mdev->flags);
1312                                 } else {
1313                                         drbd_uuid_new_current(mdev);
1314                                         drbd_send_uuids(mdev);
1315                                 }
1316                         }
1317                         put_ldev(mdev);
1318                 }
1319         }
1320
1321         if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1322                 if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1323                         drbd_uuid_new_current(mdev);
1324                         drbd_send_uuids(mdev);
1325                 }
1326
1327                 /* D_DISKLESS Peer becomes secondary */
1328                 if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1329                         drbd_al_to_on_disk_bm(mdev);
1330                 put_ldev(mdev);
1331         }
1332
1333         /* Last part of the attaching process ... */
1334         if (ns.conn >= C_CONNECTED &&
1335             os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1336                 drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1337                 drbd_send_uuids(mdev);
1338                 drbd_send_state(mdev);
1339         }
1340
1341         /* We want to pause/continue resync, tell peer. */
1342         if (ns.conn >= C_CONNECTED &&
1343              ((os.aftr_isp != ns.aftr_isp) ||
1344               (os.user_isp != ns.user_isp)))
1345                 drbd_send_state(mdev);
1346
1347         /* In case one of the isp bits got set, suspend other devices. */
1348         if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1349             (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1350                 suspend_other_sg(mdev);
1351
1352         /* Make sure the peer gets informed about eventual state
1353            changes (ISP bits) while we were in WFReportParams. */
1354         if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1355                 drbd_send_state(mdev);
1356
1357         /* We are in the progress to start a full sync... */
1358         if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1359             (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1360                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1361
1362         /* We are invalidating our self... */
1363         if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1364             os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1365                 drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1366
1367         /* first half of local IO error */
1368         if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1369                 enum drbd_io_error_p eh = EP_PASS_ON;
1370
1371                 if (drbd_send_state(mdev))
1372                         dev_warn(DEV, "Notified peer that my disk is broken.\n");
1373                 else
1374                         dev_err(DEV, "Sending state for drbd_io_error() failed\n");
1375
1376                 drbd_rs_cancel_all(mdev);
1377
1378                 if (get_ldev_if_state(mdev, D_FAILED)) {
1379                         eh = mdev->ldev->dc.on_io_error;
1380                         put_ldev(mdev);
1381                 }
1382                 if (eh == EP_CALL_HELPER)
1383                         drbd_khelper(mdev, "local-io-error");
1384         }
1385
1386
1387         /* second half of local IO error handling,
1388          * after local_cnt references have reached zero: */
1389         if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
1390                 mdev->rs_total = 0;
1391                 mdev->rs_failed = 0;
1392                 atomic_set(&mdev->rs_pending_cnt, 0);
1393         }
1394
1395         if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1396                 /* We must still be diskless,
1397                  * re-attach has to be serialized with this! */
1398                 if (mdev->state.disk != D_DISKLESS)
1399                         dev_err(DEV,
1400                                 "ASSERT FAILED: disk is %s while going diskless\n",
1401                                 drbd_disk_str(mdev->state.disk));
1402
1403                 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state
1404                  * will inc/dec it frequently. Since we became D_DISKLESS, no
1405                  * one has touched the protected members anymore, though, so we
1406                  * are safe to free them here. */
1407                 if (drbd_send_state(mdev))
1408                         dev_warn(DEV, "Notified peer that I detached my disk.\n");
1409                 else
1410                         dev_err(DEV, "Sending state for detach failed\n");
1411
1412                 lc_destroy(mdev->resync);
1413                 mdev->resync = NULL;
1414                 lc_destroy(mdev->act_log);
1415                 mdev->act_log = NULL;
1416                 __no_warn(local,
1417                         drbd_free_bc(mdev->ldev);
1418                         mdev->ldev = NULL;);
1419
1420                 if (mdev->md_io_tmpp) {
1421                         __free_page(mdev->md_io_tmpp);
1422                         mdev->md_io_tmpp = NULL;
1423                 }
1424         }
1425
1426         /* Disks got bigger while they were detached */
1427         if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1428             test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1429                 if (ns.conn == C_CONNECTED)
1430                         resync_after_online_grow(mdev);
1431         }
1432
1433         /* A resync finished or aborted, wake paused devices... */
1434         if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1435             (os.peer_isp && !ns.peer_isp) ||
1436             (os.user_isp && !ns.user_isp))
1437                 resume_next_sg(mdev);
1438
1439         /* sync target done with resync.  Explicitly notify peer, even though
1440          * it should (at least for non-empty resyncs) already know itself. */
1441         if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1442                 drbd_send_state(mdev);
1443
1444         /* free tl_hash if we Got thawed and are C_STANDALONE */
1445         if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1446                 drbd_free_tl_hash(mdev);
1447
1448         /* Upon network connection, we need to start the receiver */
1449         if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1450                 drbd_thread_start(&mdev->receiver);
1451
1452         /* Terminate worker thread if we are unconfigured - it will be
1453            restarted as needed... */
1454         if (ns.disk == D_DISKLESS &&
1455             ns.conn == C_STANDALONE &&
1456             ns.role == R_SECONDARY) {
1457                 if (os.aftr_isp != ns.aftr_isp)
1458                         resume_next_sg(mdev);
1459                 /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1460                 if (test_bit(DEVICE_DYING, &mdev->flags))
1461                         drbd_thread_stop_nowait(&mdev->worker);
1462         }
1463
1464         drbd_md_sync(mdev);
1465 }
1466
1467
1468 static int drbd_thread_setup(void *arg)
1469 {
1470         struct drbd_thread *thi = (struct drbd_thread *) arg;
1471         struct drbd_conf *mdev = thi->mdev;
1472         unsigned long flags;
1473         int retval;
1474
1475 restart:
1476         retval = thi->function(thi);
1477
1478         spin_lock_irqsave(&thi->t_lock, flags);
1479
1480         /* if the receiver has been "Exiting", the last thing it did
1481          * was set the conn state to "StandAlone",
1482          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1483          * and receiver thread will be "started".
1484          * drbd_thread_start needs to set "Restarting" in that case.
1485          * t_state check and assignment needs to be within the same spinlock,
1486          * so either thread_start sees Exiting, and can remap to Restarting,
1487          * or thread_start see None, and can proceed as normal.
1488          */
1489
1490         if (thi->t_state == Restarting) {
1491                 dev_info(DEV, "Restarting %s\n", current->comm);
1492                 thi->t_state = Running;
1493                 spin_unlock_irqrestore(&thi->t_lock, flags);
1494                 goto restart;
1495         }
1496
1497         thi->task = NULL;
1498         thi->t_state = None;
1499         smp_mb();
1500         complete(&thi->stop);
1501         spin_unlock_irqrestore(&thi->t_lock, flags);
1502
1503         dev_info(DEV, "Terminating %s\n", current->comm);
1504
1505         /* Release mod reference taken when thread was started */
1506         module_put(THIS_MODULE);
1507         return retval;
1508 }
1509
1510 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1511                       int (*func) (struct drbd_thread *))
1512 {
1513         spin_lock_init(&thi->t_lock);
1514         thi->task    = NULL;
1515         thi->t_state = None;
1516         thi->function = func;
1517         thi->mdev = mdev;
1518 }
1519
1520 int drbd_thread_start(struct drbd_thread *thi)
1521 {
1522         struct drbd_conf *mdev = thi->mdev;
1523         struct task_struct *nt;
1524         unsigned long flags;
1525
1526         const char *me =
1527                 thi == &mdev->receiver ? "receiver" :
1528                 thi == &mdev->asender  ? "asender"  :
1529                 thi == &mdev->worker   ? "worker"   : "NONSENSE";
1530
1531         /* is used from state engine doing drbd_thread_stop_nowait,
1532          * while holding the req lock irqsave */
1533         spin_lock_irqsave(&thi->t_lock, flags);
1534
1535         switch (thi->t_state) {
1536         case None:
1537                 dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1538                                 me, current->comm, current->pid);
1539
1540                 /* Get ref on module for thread - this is released when thread exits */
1541                 if (!try_module_get(THIS_MODULE)) {
1542                         dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1543                         spin_unlock_irqrestore(&thi->t_lock, flags);
1544                         return FALSE;
1545                 }
1546
1547                 init_completion(&thi->stop);
1548                 D_ASSERT(thi->task == NULL);
1549                 thi->reset_cpu_mask = 1;
1550                 thi->t_state = Running;
1551                 spin_unlock_irqrestore(&thi->t_lock, flags);
1552                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1553
1554                 nt = kthread_create(drbd_thread_setup, (void *) thi,
1555                                     "drbd%d_%s", mdev_to_minor(mdev), me);
1556
1557                 if (IS_ERR(nt)) {
1558                         dev_err(DEV, "Couldn't start thread\n");
1559
1560                         module_put(THIS_MODULE);
1561                         return FALSE;
1562                 }
1563                 spin_lock_irqsave(&thi->t_lock, flags);
1564                 thi->task = nt;
1565                 thi->t_state = Running;
1566                 spin_unlock_irqrestore(&thi->t_lock, flags);
1567                 wake_up_process(nt);
1568                 break;
1569         case Exiting:
1570                 thi->t_state = Restarting;
1571                 dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1572                                 me, current->comm, current->pid);
1573                 /* fall through */
1574         case Running:
1575         case Restarting:
1576         default:
1577                 spin_unlock_irqrestore(&thi->t_lock, flags);
1578                 break;
1579         }
1580
1581         return TRUE;
1582 }
1583
1584
1585 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1586 {
1587         unsigned long flags;
1588
1589         enum drbd_thread_state ns = restart ? Restarting : Exiting;
1590
1591         /* may be called from state engine, holding the req lock irqsave */
1592         spin_lock_irqsave(&thi->t_lock, flags);
1593
1594         if (thi->t_state == None) {
1595                 spin_unlock_irqrestore(&thi->t_lock, flags);
1596                 if (restart)
1597                         drbd_thread_start(thi);
1598                 return;
1599         }
1600
1601         if (thi->t_state != ns) {
1602                 if (thi->task == NULL) {
1603                         spin_unlock_irqrestore(&thi->t_lock, flags);
1604                         return;
1605                 }
1606
1607                 thi->t_state = ns;
1608                 smp_mb();
1609                 init_completion(&thi->stop);
1610                 if (thi->task != current)
1611                         force_sig(DRBD_SIGKILL, thi->task);
1612
1613         }
1614
1615         spin_unlock_irqrestore(&thi->t_lock, flags);
1616
1617         if (wait)
1618                 wait_for_completion(&thi->stop);
1619 }
1620
1621 #ifdef CONFIG_SMP
1622 /**
1623  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1624  * @mdev:       DRBD device.
1625  *
1626  * Forces all threads of a device onto the same CPU. This is beneficial for
1627  * DRBD's performance. May be overwritten by user's configuration.
1628  */
1629 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1630 {
1631         int ord, cpu;
1632
1633         /* user override. */
1634         if (cpumask_weight(mdev->cpu_mask))
1635                 return;
1636
1637         ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1638         for_each_online_cpu(cpu) {
1639                 if (ord-- == 0) {
1640                         cpumask_set_cpu(cpu, mdev->cpu_mask);
1641                         return;
1642                 }
1643         }
1644         /* should not be reached */
1645         cpumask_setall(mdev->cpu_mask);
1646 }
1647
1648 /**
1649  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1650  * @mdev:       DRBD device.
1651  *
1652  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1653  * prematurely.
1654  */
1655 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1656 {
1657         struct task_struct *p = current;
1658         struct drbd_thread *thi =
1659                 p == mdev->asender.task  ? &mdev->asender  :
1660                 p == mdev->receiver.task ? &mdev->receiver :
1661                 p == mdev->worker.task   ? &mdev->worker   :
1662                 NULL;
1663         ERR_IF(thi == NULL)
1664                 return;
1665         if (!thi->reset_cpu_mask)
1666                 return;
1667         thi->reset_cpu_mask = 0;
1668         set_cpus_allowed_ptr(p, mdev->cpu_mask);
1669 }
1670 #endif
1671
1672 /* the appropriate socket mutex must be held already */
1673 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1674                           enum drbd_packets cmd, struct p_header80 *h,
1675                           size_t size, unsigned msg_flags)
1676 {
1677         int sent, ok;
1678
1679         ERR_IF(!h) return FALSE;
1680         ERR_IF(!size) return FALSE;
1681
1682         h->magic   = BE_DRBD_MAGIC;
1683         h->command = cpu_to_be16(cmd);
1684         h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1685
1686         sent = drbd_send(mdev, sock, h, size, msg_flags);
1687
1688         ok = (sent == size);
1689         if (!ok)
1690                 dev_err(DEV, "short sent %s size=%d sent=%d\n",
1691                     cmdname(cmd), (int)size, sent);
1692         return ok;
1693 }
1694
1695 /* don't pass the socket. we may only look at it
1696  * when we hold the appropriate socket mutex.
1697  */
1698 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1699                   enum drbd_packets cmd, struct p_header80 *h, size_t size)
1700 {
1701         int ok = 0;
1702         struct socket *sock;
1703
1704         if (use_data_socket) {
1705                 mutex_lock(&mdev->data.mutex);
1706                 sock = mdev->data.socket;
1707         } else {
1708                 mutex_lock(&mdev->meta.mutex);
1709                 sock = mdev->meta.socket;
1710         }
1711
1712         /* drbd_disconnect() could have called drbd_free_sock()
1713          * while we were waiting in down()... */
1714         if (likely(sock != NULL))
1715                 ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1716
1717         if (use_data_socket)
1718                 mutex_unlock(&mdev->data.mutex);
1719         else
1720                 mutex_unlock(&mdev->meta.mutex);
1721         return ok;
1722 }
1723
1724 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1725                    size_t size)
1726 {
1727         struct p_header80 h;
1728         int ok;
1729
1730         h.magic   = BE_DRBD_MAGIC;
1731         h.command = cpu_to_be16(cmd);
1732         h.length  = cpu_to_be16(size);
1733
1734         if (!drbd_get_data_sock(mdev))
1735                 return 0;
1736
1737         ok = (sizeof(h) ==
1738                 drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1739         ok = ok && (size ==
1740                 drbd_send(mdev, mdev->data.socket, data, size, 0));
1741
1742         drbd_put_data_sock(mdev);
1743
1744         return ok;
1745 }
1746
1747 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1748 {
1749         struct p_rs_param_95 *p;
1750         struct socket *sock;
1751         int size, rv;
1752         const int apv = mdev->agreed_pro_version;
1753
1754         size = apv <= 87 ? sizeof(struct p_rs_param)
1755                 : apv == 88 ? sizeof(struct p_rs_param)
1756                         + strlen(mdev->sync_conf.verify_alg) + 1
1757                 : apv <= 94 ? sizeof(struct p_rs_param_89)
1758                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1759
1760         /* used from admin command context and receiver/worker context.
1761          * to avoid kmalloc, grab the socket right here,
1762          * then use the pre-allocated sbuf there */
1763         mutex_lock(&mdev->data.mutex);
1764         sock = mdev->data.socket;
1765
1766         if (likely(sock != NULL)) {
1767                 enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1768
1769                 p = &mdev->data.sbuf.rs_param_95;
1770
1771                 /* initialize verify_alg and csums_alg */
1772                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1773
1774                 p->rate = cpu_to_be32(sc->rate);
1775                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1776                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1777                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1778                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1779
1780                 if (apv >= 88)
1781                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1782                 if (apv >= 89)
1783                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1784
1785                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1786         } else
1787                 rv = 0; /* not ok */
1788
1789         mutex_unlock(&mdev->data.mutex);
1790
1791         return rv;
1792 }
1793
1794 int drbd_send_protocol(struct drbd_conf *mdev)
1795 {
1796         struct p_protocol *p;
1797         int size, cf, rv;
1798
1799         size = sizeof(struct p_protocol);
1800
1801         if (mdev->agreed_pro_version >= 87)
1802                 size += strlen(mdev->net_conf->integrity_alg) + 1;
1803
1804         /* we must not recurse into our own queue,
1805          * as that is blocked during handshake */
1806         p = kmalloc(size, GFP_NOIO);
1807         if (p == NULL)
1808                 return 0;
1809
1810         p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1811         p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1812         p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1813         p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1814         p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1815
1816         cf = 0;
1817         if (mdev->net_conf->want_lose)
1818                 cf |= CF_WANT_LOSE;
1819         if (mdev->net_conf->dry_run) {
1820                 if (mdev->agreed_pro_version >= 92)
1821                         cf |= CF_DRY_RUN;
1822                 else {
1823                         dev_err(DEV, "--dry-run is not supported by peer");
1824                         kfree(p);
1825                         return 0;
1826                 }
1827         }
1828         p->conn_flags    = cpu_to_be32(cf);
1829
1830         if (mdev->agreed_pro_version >= 87)
1831                 strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1832
1833         rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1834                            (struct p_header80 *)p, size);
1835         kfree(p);
1836         return rv;
1837 }
1838
1839 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1840 {
1841         struct p_uuids p;
1842         int i;
1843
1844         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1845                 return 1;
1846
1847         for (i = UI_CURRENT; i < UI_SIZE; i++)
1848                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1849
1850         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1851         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1852         uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1853         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1854         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1855         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1856
1857         put_ldev(mdev);
1858
1859         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1860                              (struct p_header80 *)&p, sizeof(p));
1861 }
1862
1863 int drbd_send_uuids(struct drbd_conf *mdev)
1864 {
1865         return _drbd_send_uuids(mdev, 0);
1866 }
1867
1868 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1869 {
1870         return _drbd_send_uuids(mdev, 8);
1871 }
1872
1873
1874 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1875 {
1876         struct p_rs_uuid p;
1877
1878         p.uuid = cpu_to_be64(val);
1879
1880         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1881                              (struct p_header80 *)&p, sizeof(p));
1882 }
1883
1884 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1885 {
1886         struct p_sizes p;
1887         sector_t d_size, u_size;
1888         int q_order_type;
1889         int ok;
1890
1891         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1892                 D_ASSERT(mdev->ldev->backing_bdev);
1893                 d_size = drbd_get_max_capacity(mdev->ldev);
1894                 u_size = mdev->ldev->dc.disk_size;
1895                 q_order_type = drbd_queue_order_type(mdev);
1896                 put_ldev(mdev);
1897         } else {
1898                 d_size = 0;
1899                 u_size = 0;
1900                 q_order_type = QUEUE_ORDERED_NONE;
1901         }
1902
1903         p.d_size = cpu_to_be64(d_size);
1904         p.u_size = cpu_to_be64(u_size);
1905         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1906         p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1907         p.queue_order_type = cpu_to_be16(q_order_type);
1908         p.dds_flags = cpu_to_be16(flags);
1909
1910         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1911                            (struct p_header80 *)&p, sizeof(p));
1912         return ok;
1913 }
1914
1915 /**
1916  * drbd_send_state() - Sends the drbd state to the peer
1917  * @mdev:       DRBD device.
1918  */
1919 int drbd_send_state(struct drbd_conf *mdev)
1920 {
1921         struct socket *sock;
1922         struct p_state p;
1923         int ok = 0;
1924
1925         /* Grab state lock so we wont send state if we're in the middle
1926          * of a cluster wide state change on another thread */
1927         drbd_state_lock(mdev);
1928
1929         mutex_lock(&mdev->data.mutex);
1930
1931         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1932         sock = mdev->data.socket;
1933
1934         if (likely(sock != NULL)) {
1935                 ok = _drbd_send_cmd(mdev, sock, P_STATE,
1936                                     (struct p_header80 *)&p, sizeof(p), 0);
1937         }
1938
1939         mutex_unlock(&mdev->data.mutex);
1940
1941         drbd_state_unlock(mdev);
1942         return ok;
1943 }
1944
1945 int drbd_send_state_req(struct drbd_conf *mdev,
1946         union drbd_state mask, union drbd_state val)
1947 {
1948         struct p_req_state p;
1949
1950         p.mask    = cpu_to_be32(mask.i);
1951         p.val     = cpu_to_be32(val.i);
1952
1953         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1954                              (struct p_header80 *)&p, sizeof(p));
1955 }
1956
1957 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1958 {
1959         struct p_req_state_reply p;
1960
1961         p.retcode    = cpu_to_be32(retcode);
1962
1963         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1964                              (struct p_header80 *)&p, sizeof(p));
1965 }
1966
1967 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1968         struct p_compressed_bm *p,
1969         struct bm_xfer_ctx *c)
1970 {
1971         struct bitstream bs;
1972         unsigned long plain_bits;
1973         unsigned long tmp;
1974         unsigned long rl;
1975         unsigned len;
1976         unsigned toggle;
1977         int bits;
1978
1979         /* may we use this feature? */
1980         if ((mdev->sync_conf.use_rle == 0) ||
1981                 (mdev->agreed_pro_version < 90))
1982                         return 0;
1983
1984         if (c->bit_offset >= c->bm_bits)
1985                 return 0; /* nothing to do. */
1986
1987         /* use at most thus many bytes */
1988         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1989         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1990         /* plain bits covered in this code string */
1991         plain_bits = 0;
1992
1993         /* p->encoding & 0x80 stores whether the first run length is set.
1994          * bit offset is implicit.
1995          * start with toggle == 2 to be able to tell the first iteration */
1996         toggle = 2;
1997
1998         /* see how much plain bits we can stuff into one packet
1999          * using RLE and VLI. */
2000         do {
2001                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2002                                     : _drbd_bm_find_next(mdev, c->bit_offset);
2003                 if (tmp == -1UL)
2004                         tmp = c->bm_bits;
2005                 rl = tmp - c->bit_offset;
2006
2007                 if (toggle == 2) { /* first iteration */
2008                         if (rl == 0) {
2009                                 /* the first checked bit was set,
2010                                  * store start value, */
2011                                 DCBP_set_start(p, 1);
2012                                 /* but skip encoding of zero run length */
2013                                 toggle = !toggle;
2014                                 continue;
2015                         }
2016                         DCBP_set_start(p, 0);
2017                 }
2018
2019                 /* paranoia: catch zero runlength.
2020                  * can only happen if bitmap is modified while we scan it. */
2021                 if (rl == 0) {
2022                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2023                             "t:%u bo:%lu\n", toggle, c->bit_offset);
2024                         return -1;
2025                 }
2026
2027                 bits = vli_encode_bits(&bs, rl);
2028                 if (bits == -ENOBUFS) /* buffer full */
2029                         break;
2030                 if (bits <= 0) {
2031                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2032                         return 0;
2033                 }
2034
2035                 toggle = !toggle;
2036                 plain_bits += rl;
2037                 c->bit_offset = tmp;
2038         } while (c->bit_offset < c->bm_bits);
2039
2040         len = bs.cur.b - p->code + !!bs.cur.bit;
2041
2042         if (plain_bits < (len << 3)) {
2043                 /* incompressible with this method.
2044                  * we need to rewind both word and bit position. */
2045                 c->bit_offset -= plain_bits;
2046                 bm_xfer_ctx_bit_to_word_offset(c);
2047                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2048                 return 0;
2049         }
2050
2051         /* RLE + VLI was able to compress it just fine.
2052          * update c->word_offset. */
2053         bm_xfer_ctx_bit_to_word_offset(c);
2054
2055         /* store pad_bits */
2056         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2057
2058         return len;
2059 }
2060
2061 enum { OK, FAILED, DONE }
2062 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2063         struct p_header80 *h, struct bm_xfer_ctx *c)
2064 {
2065         struct p_compressed_bm *p = (void*)h;
2066         unsigned long num_words;
2067         int len;
2068         int ok;
2069
2070         len = fill_bitmap_rle_bits(mdev, p, c);
2071
2072         if (len < 0)
2073                 return FAILED;
2074
2075         if (len) {
2076                 DCBP_set_code(p, RLE_VLI_Bits);
2077                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2078                         sizeof(*p) + len, 0);
2079
2080                 c->packets[0]++;
2081                 c->bytes[0] += sizeof(*p) + len;
2082
2083                 if (c->bit_offset >= c->bm_bits)
2084                         len = 0; /* DONE */
2085         } else {
2086                 /* was not compressible.
2087                  * send a buffer full of plain text bits instead. */
2088                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2089                 len = num_words * sizeof(long);
2090                 if (len)
2091                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2092                 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2093                                    h, sizeof(struct p_header80) + len, 0);
2094                 c->word_offset += num_words;
2095                 c->bit_offset = c->word_offset * BITS_PER_LONG;
2096
2097                 c->packets[1]++;
2098                 c->bytes[1] += sizeof(struct p_header80) + len;
2099
2100                 if (c->bit_offset > c->bm_bits)
2101                         c->bit_offset = c->bm_bits;
2102         }
2103         ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2104
2105         if (ok == DONE)
2106                 INFO_bm_xfer_stats(mdev, "send", c);
2107         return ok;
2108 }
2109
2110 /* See the comment at receive_bitmap() */
2111 int _drbd_send_bitmap(struct drbd_conf *mdev)
2112 {
2113         struct bm_xfer_ctx c;
2114         struct p_header80 *p;
2115         int ret;
2116
2117         ERR_IF(!mdev->bitmap) return FALSE;
2118
2119         /* maybe we should use some per thread scratch page,
2120          * and allocate that during initial device creation? */
2121         p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2122         if (!p) {
2123                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2124                 return FALSE;
2125         }
2126
2127         if (get_ldev(mdev)) {
2128                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2129                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2130                         drbd_bm_set_all(mdev);
2131                         if (drbd_bm_write(mdev)) {
2132                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2133                                  * but otherwise process as per normal - need to tell other
2134                                  * side that a full resync is required! */
2135                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
2136                         } else {
2137                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2138                                 drbd_md_sync(mdev);
2139                         }
2140                 }
2141                 put_ldev(mdev);
2142         }
2143
2144         c = (struct bm_xfer_ctx) {
2145                 .bm_bits = drbd_bm_bits(mdev),
2146                 .bm_words = drbd_bm_words(mdev),
2147         };
2148
2149         do {
2150                 ret = send_bitmap_rle_or_plain(mdev, p, &c);
2151         } while (ret == OK);
2152
2153         free_page((unsigned long) p);
2154         return (ret == DONE);
2155 }
2156
2157 int drbd_send_bitmap(struct drbd_conf *mdev)
2158 {
2159         int err;
2160
2161         if (!drbd_get_data_sock(mdev))
2162                 return -1;
2163         err = !_drbd_send_bitmap(mdev);
2164         drbd_put_data_sock(mdev);
2165         return err;
2166 }
2167
2168 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2169 {
2170         int ok;
2171         struct p_barrier_ack p;
2172
2173         p.barrier  = barrier_nr;
2174         p.set_size = cpu_to_be32(set_size);
2175
2176         if (mdev->state.conn < C_CONNECTED)
2177                 return FALSE;
2178         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2179                         (struct p_header80 *)&p, sizeof(p));
2180         return ok;
2181 }
2182
2183 /**
2184  * _drbd_send_ack() - Sends an ack packet
2185  * @mdev:       DRBD device.
2186  * @cmd:        Packet command code.
2187  * @sector:     sector, needs to be in big endian byte order
2188  * @blksize:    size in byte, needs to be in big endian byte order
2189  * @block_id:   Id, big endian byte order
2190  */
2191 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2192                           u64 sector,
2193                           u32 blksize,
2194                           u64 block_id)
2195 {
2196         int ok;
2197         struct p_block_ack p;
2198
2199         p.sector   = sector;
2200         p.block_id = block_id;
2201         p.blksize  = blksize;
2202         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2203
2204         if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2205                 return FALSE;
2206         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2207                                 (struct p_header80 *)&p, sizeof(p));
2208         return ok;
2209 }
2210
2211 /* dp->sector and dp->block_id already/still in network byte order,
2212  * data_size is payload size according to dp->head,
2213  * and may need to be corrected for digest size. */
2214 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2215                      struct p_data *dp, int data_size)
2216 {
2217         data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2218                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2219         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2220                               dp->block_id);
2221 }
2222
2223 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2224                      struct p_block_req *rp)
2225 {
2226         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2227 }
2228
2229 /**
2230  * drbd_send_ack() - Sends an ack packet
2231  * @mdev:       DRBD device.
2232  * @cmd:        Packet command code.
2233  * @e:          Epoch entry.
2234  */
2235 int drbd_send_ack(struct drbd_conf *mdev,
2236         enum drbd_packets cmd, struct drbd_epoch_entry *e)
2237 {
2238         return _drbd_send_ack(mdev, cmd,
2239                               cpu_to_be64(e->sector),
2240                               cpu_to_be32(e->size),
2241                               e->block_id);
2242 }
2243
2244 /* This function misuses the block_id field to signal if the blocks
2245  * are is sync or not. */
2246 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2247                      sector_t sector, int blksize, u64 block_id)
2248 {
2249         return _drbd_send_ack(mdev, cmd,
2250                               cpu_to_be64(sector),
2251                               cpu_to_be32(blksize),
2252                               cpu_to_be64(block_id));
2253 }
2254
2255 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2256                        sector_t sector, int size, u64 block_id)
2257 {
2258         int ok;
2259         struct p_block_req p;
2260
2261         p.sector   = cpu_to_be64(sector);
2262         p.block_id = block_id;
2263         p.blksize  = cpu_to_be32(size);
2264
2265         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2266                                 (struct p_header80 *)&p, sizeof(p));
2267         return ok;
2268 }
2269
2270 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2271                             sector_t sector, int size,
2272                             void *digest, int digest_size,
2273                             enum drbd_packets cmd)
2274 {
2275         int ok;
2276         struct p_block_req p;
2277
2278         p.sector   = cpu_to_be64(sector);
2279         p.block_id = BE_DRBD_MAGIC + 0xbeef;
2280         p.blksize  = cpu_to_be32(size);
2281
2282         p.head.magic   = BE_DRBD_MAGIC;
2283         p.head.command = cpu_to_be16(cmd);
2284         p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2285
2286         mutex_lock(&mdev->data.mutex);
2287
2288         ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2289         ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2290
2291         mutex_unlock(&mdev->data.mutex);
2292
2293         return ok;
2294 }
2295
2296 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2297 {
2298         int ok;
2299         struct p_block_req p;
2300
2301         p.sector   = cpu_to_be64(sector);
2302         p.block_id = BE_DRBD_MAGIC + 0xbabe;
2303         p.blksize  = cpu_to_be32(size);
2304
2305         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2306                            (struct p_header80 *)&p, sizeof(p));
2307         return ok;
2308 }
2309
2310 /* called on sndtimeo
2311  * returns FALSE if we should retry,
2312  * TRUE if we think connection is dead
2313  */
2314 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2315 {
2316         int drop_it;
2317         /* long elapsed = (long)(jiffies - mdev->last_received); */
2318
2319         drop_it =   mdev->meta.socket == sock
2320                 || !mdev->asender.task
2321                 || get_t_state(&mdev->asender) != Running
2322                 || mdev->state.conn < C_CONNECTED;
2323
2324         if (drop_it)
2325                 return TRUE;
2326
2327         drop_it = !--mdev->ko_count;
2328         if (!drop_it) {
2329                 dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2330                        current->comm, current->pid, mdev->ko_count);
2331                 request_ping(mdev);
2332         }
2333
2334         return drop_it; /* && (mdev->state == R_PRIMARY) */;
2335 }
2336
2337 /* The idea of sendpage seems to be to put some kind of reference
2338  * to the page into the skb, and to hand it over to the NIC. In
2339  * this process get_page() gets called.
2340  *
2341  * As soon as the page was really sent over the network put_page()
2342  * gets called by some part of the network layer. [ NIC driver? ]
2343  *
2344  * [ get_page() / put_page() increment/decrement the count. If count
2345  *   reaches 0 the page will be freed. ]
2346  *
2347  * This works nicely with pages from FSs.
2348  * But this means that in protocol A we might signal IO completion too early!
2349  *
2350  * In order not to corrupt data during a resync we must make sure
2351  * that we do not reuse our own buffer pages (EEs) to early, therefore
2352  * we have the net_ee list.
2353  *
2354  * XFS seems to have problems, still, it submits pages with page_count == 0!
2355  * As a workaround, we disable sendpage on pages
2356  * with page_count == 0 or PageSlab.
2357  */
2358 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2359                    int offset, size_t size, unsigned msg_flags)
2360 {
2361         int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2362         kunmap(page);
2363         if (sent == size)
2364                 mdev->send_cnt += size>>9;
2365         return sent == size;
2366 }
2367
2368 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2369                     int offset, size_t size, unsigned msg_flags)
2370 {
2371         mm_segment_t oldfs = get_fs();
2372         int sent, ok;
2373         int len = size;
2374
2375         /* e.g. XFS meta- & log-data is in slab pages, which have a
2376          * page_count of 0 and/or have PageSlab() set.
2377          * we cannot use send_page for those, as that does get_page();
2378          * put_page(); and would cause either a VM_BUG directly, or
2379          * __page_cache_release a page that would actually still be referenced
2380          * by someone, leading to some obscure delayed Oops somewhere else. */
2381         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2382                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2383
2384         msg_flags |= MSG_NOSIGNAL;
2385         drbd_update_congested(mdev);
2386         set_fs(KERNEL_DS);
2387         do {
2388                 sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2389                                                         offset, len,
2390                                                         msg_flags);
2391                 if (sent == -EAGAIN) {
2392                         if (we_should_drop_the_connection(mdev,
2393                                                           mdev->data.socket))
2394                                 break;
2395                         else
2396                                 continue;
2397                 }
2398                 if (sent <= 0) {
2399                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2400                              __func__, (int)size, len, sent);
2401                         break;
2402                 }
2403                 len    -= sent;
2404                 offset += sent;
2405         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2406         set_fs(oldfs);
2407         clear_bit(NET_CONGESTED, &mdev->flags);
2408
2409         ok = (len == 0);
2410         if (likely(ok))
2411                 mdev->send_cnt += size>>9;
2412         return ok;
2413 }
2414
2415 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2416 {
2417         struct bio_vec *bvec;
2418         int i;
2419         /* hint all but last page with MSG_MORE */
2420         __bio_for_each_segment(bvec, bio, i, 0) {
2421                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
2422                                      bvec->bv_offset, bvec->bv_len,
2423                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2424                         return 0;
2425         }
2426         return 1;
2427 }
2428
2429 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2430 {
2431         struct bio_vec *bvec;
2432         int i;
2433         /* hint all but last page with MSG_MORE */
2434         __bio_for_each_segment(bvec, bio, i, 0) {
2435                 if (!_drbd_send_page(mdev, bvec->bv_page,
2436                                      bvec->bv_offset, bvec->bv_len,
2437                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2438                         return 0;
2439         }
2440         return 1;
2441 }
2442
2443 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2444 {
2445         struct page *page = e->pages;
2446         unsigned len = e->size;
2447         /* hint all but last page with MSG_MORE */
2448         page_chain_for_each(page) {
2449                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
2450                 if (!_drbd_send_page(mdev, page, 0, l,
2451                                 page_chain_next(page) ? MSG_MORE : 0))
2452                         return 0;
2453                 len -= l;
2454         }
2455         return 1;
2456 }
2457
2458 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2459 {
2460         if (mdev->agreed_pro_version >= 95)
2461                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2462                         (bi_rw & REQ_UNPLUG ? DP_UNPLUG : 0) |
2463                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
2464                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2465                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2466         else
2467                 return bi_rw & (REQ_SYNC | REQ_UNPLUG) ? DP_RW_SYNC : 0;
2468 }
2469
2470 /* Used to send write requests
2471  * R_PRIMARY -> Peer    (P_DATA)
2472  */
2473 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2474 {
2475         int ok = 1;
2476         struct p_data p;
2477         unsigned int dp_flags = 0;
2478         void *dgb;
2479         int dgs;
2480
2481         if (!drbd_get_data_sock(mdev))
2482                 return 0;
2483
2484         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2485                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2486
2487         if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2488                 p.head.h80.magic   = BE_DRBD_MAGIC;
2489                 p.head.h80.command = cpu_to_be16(P_DATA);
2490                 p.head.h80.length  =
2491                         cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2492         } else {
2493                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2494                 p.head.h95.command = cpu_to_be16(P_DATA);
2495                 p.head.h95.length  =
2496                         cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2497         }
2498
2499         p.sector   = cpu_to_be64(req->sector);
2500         p.block_id = (unsigned long)req;
2501         p.seq_num  = cpu_to_be32(req->seq_num =
2502                                  atomic_add_return(1, &mdev->packet_seq));
2503
2504         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2505
2506         if (mdev->state.conn >= C_SYNC_SOURCE &&
2507             mdev->state.conn <= C_PAUSED_SYNC_T)
2508                 dp_flags |= DP_MAY_SET_IN_SYNC;
2509
2510         p.dp_flags = cpu_to_be32(dp_flags);
2511         set_bit(UNPLUG_REMOTE, &mdev->flags);
2512         ok = (sizeof(p) ==
2513                 drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2514         if (ok && dgs) {
2515                 dgb = mdev->int_dig_out;
2516                 drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2517                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2518         }
2519         if (ok) {
2520                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2521                         ok = _drbd_send_bio(mdev, req->master_bio);
2522                 else
2523                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
2524         }
2525
2526         drbd_put_data_sock(mdev);
2527
2528         return ok;
2529 }
2530
2531 /* answer packet, used to send data back for read requests:
2532  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2533  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2534  */
2535 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2536                     struct drbd_epoch_entry *e)
2537 {
2538         int ok;
2539         struct p_data p;
2540         void *dgb;
2541         int dgs;
2542
2543         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2544                 crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2545
2546         if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2547                 p.head.h80.magic   = BE_DRBD_MAGIC;
2548                 p.head.h80.command = cpu_to_be16(cmd);
2549                 p.head.h80.length  =
2550                         cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2551         } else {
2552                 p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2553                 p.head.h95.command = cpu_to_be16(cmd);
2554                 p.head.h95.length  =
2555                         cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2556         }
2557
2558         p.sector   = cpu_to_be64(e->sector);
2559         p.block_id = e->block_id;
2560         /* p.seq_num  = 0;    No sequence numbers here.. */
2561
2562         /* Only called by our kernel thread.
2563          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2564          * in response to admin command or module unload.
2565          */
2566         if (!drbd_get_data_sock(mdev))
2567                 return 0;
2568
2569         ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2570         if (ok && dgs) {
2571                 dgb = mdev->int_dig_out;
2572                 drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2573                 ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2574         }
2575         if (ok)
2576                 ok = _drbd_send_zc_ee(mdev, e);
2577
2578         drbd_put_data_sock(mdev);
2579
2580         return ok;
2581 }
2582
2583 /*
2584   drbd_send distinguishes two cases:
2585
2586   Packets sent via the data socket "sock"
2587   and packets sent via the meta data socket "msock"
2588
2589                     sock                      msock
2590   -----------------+-------------------------+------------------------------
2591   timeout           conf.timeout / 2          conf.timeout / 2
2592   timeout action    send a ping via msock     Abort communication
2593                                               and close all sockets
2594 */
2595
2596 /*
2597  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2598  */
2599 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2600               void *buf, size_t size, unsigned msg_flags)
2601 {
2602         struct kvec iov;
2603         struct msghdr msg;
2604         int rv, sent = 0;
2605
2606         if (!sock)
2607                 return -1000;
2608
2609         /* THINK  if (signal_pending) return ... ? */
2610
2611         iov.iov_base = buf;
2612         iov.iov_len  = size;
2613
2614         msg.msg_name       = NULL;
2615         msg.msg_namelen    = 0;
2616         msg.msg_control    = NULL;
2617         msg.msg_controllen = 0;
2618         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2619
2620         if (sock == mdev->data.socket) {
2621                 mdev->ko_count = mdev->net_conf->ko_count;
2622                 drbd_update_congested(mdev);
2623         }
2624         do {
2625                 /* STRANGE
2626                  * tcp_sendmsg does _not_ use its size parameter at all ?
2627                  *
2628                  * -EAGAIN on timeout, -EINTR on signal.
2629                  */
2630 /* THINK
2631  * do we need to block DRBD_SIG if sock == &meta.socket ??
2632  * otherwise wake_asender() might interrupt some send_*Ack !
2633  */
2634                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2635                 if (rv == -EAGAIN) {
2636                         if (we_should_drop_the_connection(mdev, sock))
2637                                 break;
2638                         else
2639                                 continue;
2640                 }
2641                 D_ASSERT(rv != 0);
2642                 if (rv == -EINTR) {
2643                         flush_signals(current);
2644                         rv = 0;
2645                 }
2646                 if (rv < 0)
2647                         break;
2648                 sent += rv;
2649                 iov.iov_base += rv;
2650                 iov.iov_len  -= rv;
2651         } while (sent < size);
2652
2653         if (sock == mdev->data.socket)
2654                 clear_bit(NET_CONGESTED, &mdev->flags);
2655
2656         if (rv <= 0) {
2657                 if (rv != -EAGAIN) {
2658                         dev_err(DEV, "%s_sendmsg returned %d\n",
2659                             sock == mdev->meta.socket ? "msock" : "sock",
2660                             rv);
2661                         drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2662                 } else
2663                         drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2664         }
2665
2666         return sent;
2667 }
2668
2669 static int drbd_open(struct block_device *bdev, fmode_t mode)
2670 {
2671         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2672         unsigned long flags;
2673         int rv = 0;
2674
2675         lock_kernel();
2676         spin_lock_irqsave(&mdev->req_lock, flags);
2677         /* to have a stable mdev->state.role
2678          * and no race with updating open_cnt */
2679
2680         if (mdev->state.role != R_PRIMARY) {
2681                 if (mode & FMODE_WRITE)
2682                         rv = -EROFS;
2683                 else if (!allow_oos)
2684                         rv = -EMEDIUMTYPE;
2685         }
2686
2687         if (!rv)
2688                 mdev->open_cnt++;
2689         spin_unlock_irqrestore(&mdev->req_lock, flags);
2690         unlock_kernel();
2691
2692         return rv;
2693 }
2694
2695 static int drbd_release(struct gendisk *gd, fmode_t mode)
2696 {
2697         struct drbd_conf *mdev = gd->private_data;
2698         lock_kernel();
2699         mdev->open_cnt--;
2700         unlock_kernel();
2701         return 0;
2702 }
2703
2704 static void drbd_unplug_fn(struct request_queue *q)
2705 {
2706         struct drbd_conf *mdev = q->queuedata;
2707
2708         /* unplug FIRST */
2709         spin_lock_irq(q->queue_lock);
2710         blk_remove_plug(q);
2711         spin_unlock_irq(q->queue_lock);
2712
2713         /* only if connected */
2714         spin_lock_irq(&mdev->req_lock);
2715         if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2716                 D_ASSERT(mdev->state.role == R_PRIMARY);
2717                 if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2718                         /* add to the data.work queue,
2719                          * unless already queued.
2720                          * XXX this might be a good addition to drbd_queue_work
2721                          * anyways, to detect "double queuing" ... */
2722                         if (list_empty(&mdev->unplug_work.list))
2723                                 drbd_queue_work(&mdev->data.work,
2724                                                 &mdev->unplug_work);
2725                 }
2726         }
2727         spin_unlock_irq(&mdev->req_lock);
2728
2729         if (mdev->state.disk >= D_INCONSISTENT)
2730                 drbd_kick_lo(mdev);
2731 }
2732
2733 static void drbd_set_defaults(struct drbd_conf *mdev)
2734 {
2735         /* This way we get a compile error when sync_conf grows,
2736            and we forgot to initialize it here */
2737         mdev->sync_conf = (struct syncer_conf) {
2738                 /* .rate = */           DRBD_RATE_DEF,
2739                 /* .after = */          DRBD_AFTER_DEF,
2740                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2741                 /* .verify_alg = */     {}, 0,
2742                 /* .cpu_mask = */       {}, 0,
2743                 /* .csums_alg = */      {}, 0,
2744                 /* .use_rle = */        0,
2745                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2746                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2747                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2748                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2749                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2750                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2751         };
2752
2753         /* Have to use that way, because the layout differs between
2754            big endian and little endian */
2755         mdev->state = (union drbd_state) {
2756                 { .role = R_SECONDARY,
2757                   .peer = R_UNKNOWN,
2758                   .conn = C_STANDALONE,
2759                   .disk = D_DISKLESS,
2760                   .pdsk = D_UNKNOWN,
2761                   .susp = 0,
2762                   .susp_nod = 0,
2763                   .susp_fen = 0
2764                 } };
2765 }
2766
2767 void drbd_init_set_defaults(struct drbd_conf *mdev)
2768 {
2769         /* the memset(,0,) did most of this.
2770          * note: only assignments, no allocation in here */
2771
2772         drbd_set_defaults(mdev);
2773
2774         /* for now, we do NOT yet support it,
2775          * even though we start some framework
2776          * to eventually support barriers */
2777         set_bit(NO_BARRIER_SUPP, &mdev->flags);
2778
2779         atomic_set(&mdev->ap_bio_cnt, 0);
2780         atomic_set(&mdev->ap_pending_cnt, 0);
2781         atomic_set(&mdev->rs_pending_cnt, 0);
2782         atomic_set(&mdev->unacked_cnt, 0);
2783         atomic_set(&mdev->local_cnt, 0);
2784         atomic_set(&mdev->net_cnt, 0);
2785         atomic_set(&mdev->packet_seq, 0);
2786         atomic_set(&mdev->pp_in_use, 0);
2787         atomic_set(&mdev->pp_in_use_by_net, 0);
2788         atomic_set(&mdev->rs_sect_in, 0);
2789         atomic_set(&mdev->rs_sect_ev, 0);
2790
2791         mutex_init(&mdev->md_io_mutex);
2792         mutex_init(&mdev->data.mutex);
2793         mutex_init(&mdev->meta.mutex);
2794         sema_init(&mdev->data.work.s, 0);
2795         sema_init(&mdev->meta.work.s, 0);
2796         mutex_init(&mdev->state_mutex);
2797
2798         spin_lock_init(&mdev->data.work.q_lock);
2799         spin_lock_init(&mdev->meta.work.q_lock);
2800
2801         spin_lock_init(&mdev->al_lock);
2802         spin_lock_init(&mdev->req_lock);
2803         spin_lock_init(&mdev->peer_seq_lock);
2804         spin_lock_init(&mdev->epoch_lock);
2805
2806         INIT_LIST_HEAD(&mdev->active_ee);
2807         INIT_LIST_HEAD(&mdev->sync_ee);
2808         INIT_LIST_HEAD(&mdev->done_ee);
2809         INIT_LIST_HEAD(&mdev->read_ee);
2810         INIT_LIST_HEAD(&mdev->net_ee);
2811         INIT_LIST_HEAD(&mdev->resync_reads);
2812         INIT_LIST_HEAD(&mdev->data.work.q);
2813         INIT_LIST_HEAD(&mdev->meta.work.q);
2814         INIT_LIST_HEAD(&mdev->resync_work.list);
2815         INIT_LIST_HEAD(&mdev->unplug_work.list);
2816         INIT_LIST_HEAD(&mdev->go_diskless.list);
2817         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2818         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2819
2820         mdev->resync_work.cb  = w_resync_inactive;
2821         mdev->unplug_work.cb  = w_send_write_hint;
2822         mdev->go_diskless.cb  = w_go_diskless;
2823         mdev->md_sync_work.cb = w_md_sync;
2824         mdev->bm_io_work.w.cb = w_bitmap_io;
2825         init_timer(&mdev->resync_timer);
2826         init_timer(&mdev->md_sync_timer);
2827         mdev->resync_timer.function = resync_timer_fn;
2828         mdev->resync_timer.data = (unsigned long) mdev;
2829         mdev->md_sync_timer.function = md_sync_timer_fn;
2830         mdev->md_sync_timer.data = (unsigned long) mdev;
2831
2832         init_waitqueue_head(&mdev->misc_wait);
2833         init_waitqueue_head(&mdev->state_wait);
2834         init_waitqueue_head(&mdev->net_cnt_wait);
2835         init_waitqueue_head(&mdev->ee_wait);
2836         init_waitqueue_head(&mdev->al_wait);
2837         init_waitqueue_head(&mdev->seq_wait);
2838
2839         drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2840         drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2841         drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2842
2843         mdev->agreed_pro_version = PRO_VERSION_MAX;
2844         mdev->write_ordering = WO_bio_barrier;
2845         mdev->resync_wenr = LC_FREE;
2846 }
2847
2848 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2849 {
2850         int i;
2851         if (mdev->receiver.t_state != None)
2852                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2853                                 mdev->receiver.t_state);
2854
2855         /* no need to lock it, I'm the only thread alive */
2856         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2857                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2858         mdev->al_writ_cnt  =
2859         mdev->bm_writ_cnt  =
2860         mdev->read_cnt     =
2861         mdev->recv_cnt     =
2862         mdev->send_cnt     =
2863         mdev->writ_cnt     =
2864         mdev->p_size       =
2865         mdev->rs_start     =
2866         mdev->rs_total     =
2867         mdev->rs_failed    = 0;
2868         mdev->rs_last_events = 0;
2869         mdev->rs_last_sect_ev = 0;
2870         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2871                 mdev->rs_mark_left[i] = 0;
2872                 mdev->rs_mark_time[i] = 0;
2873         }
2874         D_ASSERT(mdev->net_conf == NULL);
2875
2876         drbd_set_my_capacity(mdev, 0);
2877         if (mdev->bitmap) {
2878                 /* maybe never allocated. */
2879                 drbd_bm_resize(mdev, 0, 1);
2880                 drbd_bm_cleanup(mdev);
2881         }
2882
2883         drbd_free_resources(mdev);
2884         clear_bit(AL_SUSPENDED, &mdev->flags);
2885
2886         /*
2887          * currently we drbd_init_ee only on module load, so
2888          * we may do drbd_release_ee only on module unload!
2889          */
2890         D_ASSERT(list_empty(&mdev->active_ee));
2891         D_ASSERT(list_empty(&mdev->sync_ee));
2892         D_ASSERT(list_empty(&mdev->done_ee));
2893         D_ASSERT(list_empty(&mdev->read_ee));
2894         D_ASSERT(list_empty(&mdev->net_ee));
2895         D_ASSERT(list_empty(&mdev->resync_reads));
2896         D_ASSERT(list_empty(&mdev->data.work.q));
2897         D_ASSERT(list_empty(&mdev->meta.work.q));
2898         D_ASSERT(list_empty(&mdev->resync_work.list));
2899         D_ASSERT(list_empty(&mdev->unplug_work.list));
2900         D_ASSERT(list_empty(&mdev->go_diskless.list));
2901
2902 }
2903
2904
2905 static void drbd_destroy_mempools(void)
2906 {
2907         struct page *page;
2908
2909         while (drbd_pp_pool) {
2910                 page = drbd_pp_pool;
2911                 drbd_pp_pool = (struct page *)page_private(page);
2912                 __free_page(page);
2913                 drbd_pp_vacant--;
2914         }
2915
2916         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2917
2918         if (drbd_ee_mempool)
2919                 mempool_destroy(drbd_ee_mempool);
2920         if (drbd_request_mempool)
2921                 mempool_destroy(drbd_request_mempool);
2922         if (drbd_ee_cache)
2923                 kmem_cache_destroy(drbd_ee_cache);
2924         if (drbd_request_cache)
2925                 kmem_cache_destroy(drbd_request_cache);
2926         if (drbd_bm_ext_cache)
2927                 kmem_cache_destroy(drbd_bm_ext_cache);
2928         if (drbd_al_ext_cache)
2929                 kmem_cache_destroy(drbd_al_ext_cache);
2930
2931         drbd_ee_mempool      = NULL;
2932         drbd_request_mempool = NULL;
2933         drbd_ee_cache        = NULL;
2934         drbd_request_cache   = NULL;
2935         drbd_bm_ext_cache    = NULL;
2936         drbd_al_ext_cache    = NULL;
2937
2938         return;
2939 }
2940
2941 static int drbd_create_mempools(void)
2942 {
2943         struct page *page;
2944         const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2945         int i;
2946
2947         /* prepare our caches and mempools */
2948         drbd_request_mempool = NULL;
2949         drbd_ee_cache        = NULL;
2950         drbd_request_cache   = NULL;
2951         drbd_bm_ext_cache    = NULL;
2952         drbd_al_ext_cache    = NULL;
2953         drbd_pp_pool         = NULL;
2954
2955         /* caches */
2956         drbd_request_cache = kmem_cache_create(
2957                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2958         if (drbd_request_cache == NULL)
2959                 goto Enomem;
2960
2961         drbd_ee_cache = kmem_cache_create(
2962                 "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2963         if (drbd_ee_cache == NULL)
2964                 goto Enomem;
2965
2966         drbd_bm_ext_cache = kmem_cache_create(
2967                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2968         if (drbd_bm_ext_cache == NULL)
2969                 goto Enomem;
2970
2971         drbd_al_ext_cache = kmem_cache_create(
2972                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2973         if (drbd_al_ext_cache == NULL)
2974                 goto Enomem;
2975
2976         /* mempools */
2977         drbd_request_mempool = mempool_create(number,
2978                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2979         if (drbd_request_mempool == NULL)
2980                 goto Enomem;
2981
2982         drbd_ee_mempool = mempool_create(number,
2983                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2984         if (drbd_request_mempool == NULL)
2985                 goto Enomem;
2986
2987         /* drbd's page pool */
2988         spin_lock_init(&drbd_pp_lock);
2989
2990         for (i = 0; i < number; i++) {
2991                 page = alloc_page(GFP_HIGHUSER);
2992                 if (!page)
2993                         goto Enomem;
2994                 set_page_private(page, (unsigned long)drbd_pp_pool);
2995                 drbd_pp_pool = page;
2996         }
2997         drbd_pp_vacant = number;
2998
2999         return 0;
3000
3001 Enomem:
3002         drbd_destroy_mempools(); /* in case we allocated some */
3003         return -ENOMEM;
3004 }
3005
3006 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3007         void *unused)
3008 {
3009         /* just so we have it.  you never know what interesting things we
3010          * might want to do here some day...
3011          */
3012
3013         return NOTIFY_DONE;
3014 }
3015
3016 static struct notifier_block drbd_notifier = {
3017         .notifier_call = drbd_notify_sys,
3018 };
3019
3020 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3021 {
3022         int rr;
3023
3024         rr = drbd_release_ee(mdev, &mdev->active_ee);
3025         if (rr)
3026                 dev_err(DEV, "%d EEs in active list found!\n", rr);
3027
3028         rr = drbd_release_ee(mdev, &mdev->sync_ee);
3029         if (rr)
3030                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
3031
3032         rr = drbd_release_ee(mdev, &mdev->read_ee);
3033         if (rr)
3034                 dev_err(DEV, "%d EEs in read list found!\n", rr);
3035
3036         rr = drbd_release_ee(mdev, &mdev->done_ee);
3037         if (rr)
3038                 dev_err(DEV, "%d EEs in done list found!\n", rr);
3039
3040         rr = drbd_release_ee(mdev, &mdev->net_ee);
3041         if (rr)
3042                 dev_err(DEV, "%d EEs in net list found!\n", rr);
3043 }
3044
3045 /* caution. no locking.
3046  * currently only used from module cleanup code. */
3047 static void drbd_delete_device(unsigned int minor)
3048 {
3049         struct drbd_conf *mdev = minor_to_mdev(minor);
3050
3051         if (!mdev)
3052                 return;
3053
3054         /* paranoia asserts */
3055         if (mdev->open_cnt != 0)
3056                 dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3057                                 __FILE__ , __LINE__);
3058
3059         ERR_IF (!list_empty(&mdev->data.work.q)) {
3060                 struct list_head *lp;
3061                 list_for_each(lp, &mdev->data.work.q) {
3062                         dev_err(DEV, "lp = %p\n", lp);
3063                 }
3064         };
3065         /* end paranoia asserts */
3066
3067         del_gendisk(mdev->vdisk);
3068
3069         /* cleanup stuff that may have been allocated during
3070          * device (re-)configuration or state changes */
3071
3072         if (mdev->this_bdev)
3073                 bdput(mdev->this_bdev);
3074
3075         drbd_free_resources(mdev);
3076
3077         drbd_release_ee_lists(mdev);
3078
3079         /* should be free'd on disconnect? */
3080         kfree(mdev->ee_hash);
3081         /*
3082         mdev->ee_hash_s = 0;
3083         mdev->ee_hash = NULL;
3084         */
3085
3086         lc_destroy(mdev->act_log);
3087         lc_destroy(mdev->resync);
3088
3089         kfree(mdev->p_uuid);
3090         /* mdev->p_uuid = NULL; */
3091
3092         kfree(mdev->int_dig_out);
3093         kfree(mdev->int_dig_in);
3094         kfree(mdev->int_dig_vv);
3095
3096         /* cleanup the rest that has been
3097          * allocated from drbd_new_device
3098          * and actually free the mdev itself */
3099         drbd_free_mdev(mdev);
3100 }
3101
3102 static void drbd_cleanup(void)
3103 {
3104         unsigned int i;
3105
3106         unregister_reboot_notifier(&drbd_notifier);
3107
3108         drbd_nl_cleanup();
3109
3110         if (minor_table) {
3111                 if (drbd_proc)
3112                         remove_proc_entry("drbd", NULL);
3113                 i = minor_count;
3114                 while (i--)
3115                         drbd_delete_device(i);
3116                 drbd_destroy_mempools();
3117         }
3118
3119         kfree(minor_table);
3120
3121         unregister_blkdev(DRBD_MAJOR, "drbd");
3122
3123         printk(KERN_INFO "drbd: module cleanup done.\n");
3124 }
3125
3126 /**
3127  * drbd_congested() - Callback for pdflush
3128  * @congested_data:     User data
3129  * @bdi_bits:           Bits pdflush is currently interested in
3130  *
3131  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3132  */
3133 static int drbd_congested(void *congested_data, int bdi_bits)
3134 {
3135         struct drbd_conf *mdev = congested_data;
3136         struct request_queue *q;
3137         char reason = '-';
3138         int r = 0;
3139
3140         if (!__inc_ap_bio_cond(mdev)) {
3141                 /* DRBD has frozen IO */
3142                 r = bdi_bits;
3143                 reason = 'd';
3144                 goto out;
3145         }
3146
3147         if (get_ldev(mdev)) {
3148                 q = bdev_get_queue(mdev->ldev->backing_bdev);
3149                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
3150                 put_ldev(mdev);
3151                 if (r)
3152                         reason = 'b';
3153         }
3154
3155         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3156                 r |= (1 << BDI_async_congested);
3157                 reason = reason == 'b' ? 'a' : 'n';
3158         }
3159
3160 out:
3161         mdev->congestion_reason = reason;
3162         return r;
3163 }
3164
3165 struct drbd_conf *drbd_new_device(unsigned int minor)
3166 {
3167         struct drbd_conf *mdev;
3168         struct gendisk *disk;
3169         struct request_queue *q;
3170
3171         /* GFP_KERNEL, we are outside of all write-out paths */
3172         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3173         if (!mdev)
3174                 return NULL;
3175         if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3176                 goto out_no_cpumask;
3177
3178         mdev->minor = minor;
3179
3180         drbd_init_set_defaults(mdev);
3181
3182         q = blk_alloc_queue(GFP_KERNEL);
3183         if (!q)
3184                 goto out_no_q;
3185         mdev->rq_queue = q;
3186         q->queuedata   = mdev;
3187
3188         disk = alloc_disk(1);
3189         if (!disk)
3190                 goto out_no_disk;
3191         mdev->vdisk = disk;
3192
3193         set_disk_ro(disk, TRUE);
3194
3195         disk->queue = q;
3196         disk->major = DRBD_MAJOR;
3197         disk->first_minor = minor;
3198         disk->fops = &drbd_ops;
3199         sprintf(disk->disk_name, "drbd%d", minor);
3200         disk->private_data = mdev;
3201
3202         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3203         /* we have no partitions. we contain only ourselves. */
3204         mdev->this_bdev->bd_contains = mdev->this_bdev;
3205
3206         q->backing_dev_info.congested_fn = drbd_congested;
3207         q->backing_dev_info.congested_data = mdev;
3208
3209         blk_queue_make_request(q, drbd_make_request_26);
3210         blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3211         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3212         blk_queue_merge_bvec(q, drbd_merge_bvec);
3213         q->queue_lock = &mdev->req_lock; /* needed since we use */
3214                 /* plugging on a queue, that actually has no requests! */
3215         q->unplug_fn = drbd_unplug_fn;
3216
3217         mdev->md_io_page = alloc_page(GFP_KERNEL);
3218         if (!mdev->md_io_page)
3219                 goto out_no_io_page;
3220
3221         if (drbd_bm_init(mdev))
3222                 goto out_no_bitmap;
3223         /* no need to lock access, we are still initializing this minor device. */
3224         if (!tl_init(mdev))
3225                 goto out_no_tl;
3226
3227         mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3228         if (!mdev->app_reads_hash)
3229                 goto out_no_app_reads;
3230
3231         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3232         if (!mdev->current_epoch)
3233                 goto out_no_epoch;
3234
3235         INIT_LIST_HEAD(&mdev->current_epoch->list);
3236         mdev->epochs = 1;
3237
3238         return mdev;
3239
3240 /* out_whatever_else:
3241         kfree(mdev->current_epoch); */
3242 out_no_epoch:
3243         kfree(mdev->app_reads_hash);
3244 out_no_app_reads:
3245         tl_cleanup(mdev);
3246 out_no_tl:
3247         drbd_bm_cleanup(mdev);
3248 out_no_bitmap:
3249         __free_page(mdev->md_io_page);
3250 out_no_io_page:
3251         put_disk(disk);
3252 out_no_disk:
3253         blk_cleanup_queue(q);
3254 out_no_q:
3255         free_cpumask_var(mdev->cpu_mask);
3256 out_no_cpumask:
3257         kfree(mdev);
3258         return NULL;
3259 }
3260
3261 /* counterpart of drbd_new_device.
3262  * last part of drbd_delete_device. */
3263 void drbd_free_mdev(struct drbd_conf *mdev)
3264 {
3265         kfree(mdev->current_epoch);
3266         kfree(mdev->app_reads_hash);
3267         tl_cleanup(mdev);
3268         if (mdev->bitmap) /* should no longer be there. */
3269                 drbd_bm_cleanup(mdev);
3270         __free_page(mdev->md_io_page);
3271         put_disk(mdev->vdisk);
3272         blk_cleanup_queue(mdev->rq_queue);
3273         free_cpumask_var(mdev->cpu_mask);
3274         kfree(mdev);
3275 }
3276
3277
3278 int __init drbd_init(void)
3279 {
3280         int err;
3281
3282         if (sizeof(struct p_handshake) != 80) {
3283                 printk(KERN_ERR
3284                        "drbd: never change the size or layout "
3285                        "of the HandShake packet.\n");
3286                 return -EINVAL;
3287         }
3288
3289         if (1 > minor_count || minor_count > 255) {
3290                 printk(KERN_ERR
3291                         "drbd: invalid minor_count (%d)\n", minor_count);
3292 #ifdef MODULE
3293                 return -EINVAL;
3294 #else
3295                 minor_count = 8;
3296 #endif
3297         }
3298
3299         err = drbd_nl_init();
3300         if (err)
3301                 return err;
3302
3303         err = register_blkdev(DRBD_MAJOR, "drbd");
3304         if (err) {
3305                 printk(KERN_ERR
3306                        "drbd: unable to register block device major %d\n",
3307                        DRBD_MAJOR);
3308                 return err;
3309         }
3310
3311         register_reboot_notifier(&drbd_notifier);
3312
3313         /*
3314          * allocate all necessary structs
3315          */
3316         err = -ENOMEM;
3317
3318         init_waitqueue_head(&drbd_pp_wait);
3319
3320         drbd_proc = NULL; /* play safe for drbd_cleanup */
3321         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3322                                 GFP_KERNEL);
3323         if (!minor_table)
3324                 goto Enomem;
3325
3326         err = drbd_create_mempools();
3327         if (err)
3328                 goto Enomem;
3329
3330         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3331         if (!drbd_proc) {
3332                 printk(KERN_ERR "drbd: unable to register proc file\n");
3333                 goto Enomem;
3334         }
3335
3336         rwlock_init(&global_state_lock);
3337
3338         printk(KERN_INFO "drbd: initialized. "
3339                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3340                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3341         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3342         printk(KERN_INFO "drbd: registered as block device major %d\n",
3343                 DRBD_MAJOR);
3344         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3345
3346         return 0; /* Success! */
3347
3348 Enomem:
3349         drbd_cleanup();
3350         if (err == -ENOMEM)
3351                 /* currently always the case */
3352                 printk(KERN_ERR "drbd: ran out of memory\n");
3353         else
3354                 printk(KERN_ERR "drbd: initialization failure\n");
3355         return err;
3356 }
3357
3358 void drbd_free_bc(struct drbd_backing_dev *ldev)
3359 {
3360         if (ldev == NULL)
3361                 return;
3362
3363         bd_release(ldev->backing_bdev);
3364         bd_release(ldev->md_bdev);
3365
3366         fput(ldev->lo_file);
3367         fput(ldev->md_file);
3368
3369         kfree(ldev);
3370 }
3371
3372 void drbd_free_sock(struct drbd_conf *mdev)
3373 {
3374         if (mdev->data.socket) {
3375                 mutex_lock(&mdev->data.mutex);
3376                 kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3377                 sock_release(mdev->data.socket);
3378                 mdev->data.socket = NULL;
3379                 mutex_unlock(&mdev->data.mutex);
3380         }
3381         if (mdev->meta.socket) {
3382                 mutex_lock(&mdev->meta.mutex);
3383                 kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3384                 sock_release(mdev->meta.socket);
3385                 mdev->meta.socket = NULL;
3386                 mutex_unlock(&mdev->meta.mutex);
3387         }
3388 }
3389
3390
3391 void drbd_free_resources(struct drbd_conf *mdev)
3392 {
3393         crypto_free_hash(mdev->csums_tfm);
3394         mdev->csums_tfm = NULL;
3395         crypto_free_hash(mdev->verify_tfm);
3396         mdev->verify_tfm = NULL;
3397         crypto_free_hash(mdev->cram_hmac_tfm);
3398         mdev->cram_hmac_tfm = NULL;
3399         crypto_free_hash(mdev->integrity_w_tfm);
3400         mdev->integrity_w_tfm = NULL;
3401         crypto_free_hash(mdev->integrity_r_tfm);
3402         mdev->integrity_r_tfm = NULL;
3403
3404         drbd_free_sock(mdev);
3405
3406         __no_warn(local,
3407                   drbd_free_bc(mdev->ldev);
3408                   mdev->ldev = NULL;);
3409 }
3410
3411 /* meta data management */
3412
3413 struct meta_data_on_disk {
3414         u64 la_size;           /* last agreed size. */
3415         u64 uuid[UI_SIZE];   /* UUIDs. */
3416         u64 device_uuid;
3417         u64 reserved_u64_1;
3418         u32 flags;             /* MDF */
3419         u32 magic;
3420         u32 md_size_sect;
3421         u32 al_offset;         /* offset to this block */
3422         u32 al_nr_extents;     /* important for restoring the AL */
3423               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3424         u32 bm_offset;         /* offset to the bitmap, from here */
3425         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3426         u32 reserved_u32[4];
3427
3428 } __packed;
3429
3430 /**
3431  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3432  * @mdev:       DRBD device.
3433  */
3434 void drbd_md_sync(struct drbd_conf *mdev)
3435 {
3436         struct meta_data_on_disk *buffer;
3437         sector_t sector;
3438         int i;
3439
3440         del_timer(&mdev->md_sync_timer);
3441         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3442         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3443                 return;
3444
3445         /* We use here D_FAILED and not D_ATTACHING because we try to write
3446          * metadata even if we detach due to a disk failure! */
3447         if (!get_ldev_if_state(mdev, D_FAILED))
3448                 return;
3449
3450         mutex_lock(&mdev->md_io_mutex);
3451         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3452         memset(buffer, 0, 512);
3453
3454         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3455         for (i = UI_CURRENT; i < UI_SIZE; i++)
3456                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3457         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3458         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3459
3460         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3461         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3462         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3463         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3464         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3465
3466         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3467
3468         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3469         sector = mdev->ldev->md.md_offset;
3470
3471         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3472                 /* this was a try anyways ... */
3473                 dev_err(DEV, "meta data update failed!\n");
3474                 drbd_chk_io_error(mdev, 1, TRUE);
3475         }
3476
3477         /* Update mdev->ldev->md.la_size_sect,
3478          * since we updated it on metadata. */
3479         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3480
3481         mutex_unlock(&mdev->md_io_mutex);
3482         put_ldev(mdev);
3483 }
3484
3485 /**
3486  * drbd_md_read() - Reads in the meta data super block
3487  * @mdev:       DRBD device.
3488  * @bdev:       Device from which the meta data should be read in.
3489  *
3490  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3491  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3492  */
3493 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3494 {
3495         struct meta_data_on_disk *buffer;
3496         int i, rv = NO_ERROR;
3497
3498         if (!get_ldev_if_state(mdev, D_ATTACHING))
3499                 return ERR_IO_MD_DISK;
3500
3501         mutex_lock(&mdev->md_io_mutex);
3502         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3503
3504         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3505                 /* NOTE: cant do normal error processing here as this is
3506                    called BEFORE disk is attached */
3507                 dev_err(DEV, "Error while reading metadata.\n");
3508                 rv = ERR_IO_MD_DISK;
3509                 goto err;
3510         }
3511
3512         if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3513                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
3514                 rv = ERR_MD_INVALID;
3515                 goto err;
3516         }
3517         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3518                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3519                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3520                 rv = ERR_MD_INVALID;
3521                 goto err;
3522         }
3523         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3524                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3525                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3526                 rv = ERR_MD_INVALID;
3527                 goto err;
3528         }
3529         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3530                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3531                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3532                 rv = ERR_MD_INVALID;
3533                 goto err;
3534         }
3535
3536         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3537                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3538                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3539                 rv = ERR_MD_INVALID;
3540                 goto err;
3541         }
3542
3543         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3544         for (i = UI_CURRENT; i < UI_SIZE; i++)
3545                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3546         bdev->md.flags = be32_to_cpu(buffer->flags);
3547         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3548         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3549
3550         if (mdev->sync_conf.al_extents < 7)
3551                 mdev->sync_conf.al_extents = 127;
3552
3553  err:
3554         mutex_unlock(&mdev->md_io_mutex);
3555         put_ldev(mdev);
3556
3557         return rv;
3558 }
3559
3560 /**
3561  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3562  * @mdev:       DRBD device.
3563  *
3564  * Call this function if you change anything that should be written to
3565  * the meta-data super block. This function sets MD_DIRTY, and starts a
3566  * timer that ensures that within five seconds you have to call drbd_md_sync().
3567  */
3568 #ifdef DRBD_DEBUG_MD_SYNC
3569 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3570 {
3571         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3572                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3573                 mdev->last_md_mark_dirty.line = line;
3574                 mdev->last_md_mark_dirty.func = func;
3575         }
3576 }
3577 #else
3578 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3579 {
3580         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3581                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3582 }
3583 #endif
3584
3585 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3586 {
3587         int i;
3588
3589         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3590                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3591 }
3592
3593 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3594 {
3595         if (idx == UI_CURRENT) {
3596                 if (mdev->state.role == R_PRIMARY)
3597                         val |= 1;
3598                 else
3599                         val &= ~((u64)1);
3600
3601                 drbd_set_ed_uuid(mdev, val);
3602         }
3603
3604         mdev->ldev->md.uuid[idx] = val;
3605         drbd_md_mark_dirty(mdev);
3606 }
3607
3608
3609 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3610 {
3611         if (mdev->ldev->md.uuid[idx]) {
3612                 drbd_uuid_move_history(mdev);
3613                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3614         }
3615         _drbd_uuid_set(mdev, idx, val);
3616 }
3617
3618 /**
3619  * drbd_uuid_new_current() - Creates a new current UUID
3620  * @mdev:       DRBD device.
3621  *
3622  * Creates a new current UUID, and rotates the old current UUID into
3623  * the bitmap slot. Causes an incremental resync upon next connect.
3624  */
3625 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3626 {
3627         u64 val;
3628
3629         dev_info(DEV, "Creating new current UUID\n");
3630         D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3631         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3632
3633         get_random_bytes(&val, sizeof(u64));
3634         _drbd_uuid_set(mdev, UI_CURRENT, val);
3635 }
3636
3637 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3638 {
3639         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3640                 return;
3641
3642         if (val == 0) {
3643                 drbd_uuid_move_history(mdev);
3644                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3645                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3646         } else {
3647                 if (mdev->ldev->md.uuid[UI_BITMAP])
3648                         dev_warn(DEV, "bm UUID already set");
3649
3650                 mdev->ldev->md.uuid[UI_BITMAP] = val;
3651                 mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3652
3653         }
3654         drbd_md_mark_dirty(mdev);
3655 }
3656
3657 /**
3658  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3659  * @mdev:       DRBD device.
3660  *
3661  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3662  */
3663 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3664 {
3665         int rv = -EIO;
3666
3667         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3668                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3669                 drbd_md_sync(mdev);
3670                 drbd_bm_set_all(mdev);
3671
3672                 rv = drbd_bm_write(mdev);
3673
3674                 if (!rv) {
3675                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3676                         drbd_md_sync(mdev);
3677                 }
3678
3679                 put_ldev(mdev);
3680         }
3681
3682         return rv;
3683 }
3684
3685 /**
3686  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3687  * @mdev:       DRBD device.
3688  *
3689  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3690  */
3691 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3692 {
3693         int rv = -EIO;
3694
3695         drbd_resume_al(mdev);
3696         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3697                 drbd_bm_clear_all(mdev);
3698                 rv = drbd_bm_write(mdev);
3699                 put_ldev(mdev);
3700         }
3701
3702         return rv;
3703 }
3704
3705 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3706 {
3707         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3708         int rv;
3709
3710         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3711
3712         drbd_bm_lock(mdev, work->why);
3713         rv = work->io_fn(mdev);
3714         drbd_bm_unlock(mdev);
3715
3716         clear_bit(BITMAP_IO, &mdev->flags);
3717         wake_up(&mdev->misc_wait);
3718
3719         if (work->done)
3720                 work->done(mdev, rv);
3721
3722         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3723         work->why = NULL;
3724
3725         return 1;
3726 }
3727
3728 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3729 {
3730         D_ASSERT(mdev->state.disk == D_FAILED);
3731         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3732          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3733          * the protected members anymore, though, so in the after_state_ch work
3734          * it will be safe to free them. */
3735         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3736
3737         clear_bit(GO_DISKLESS, &mdev->flags);
3738         return 1;
3739 }
3740
3741 void drbd_go_diskless(struct drbd_conf *mdev)
3742 {
3743         D_ASSERT(mdev->state.disk == D_FAILED);
3744         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3745                 drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
3746                 /* don't drbd_queue_work_front,
3747                  * we need to serialize with the after_state_ch work
3748                  * of the -> D_FAILED transition. */
3749 }
3750
3751 /**
3752  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3753  * @mdev:       DRBD device.
3754  * @io_fn:      IO callback to be called when bitmap IO is possible
3755  * @done:       callback to be called after the bitmap IO was performed
3756  * @why:        Descriptive text of the reason for doing the IO
3757  *
3758  * While IO on the bitmap happens we freeze application IO thus we ensure
3759  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3760  * called from worker context. It MUST NOT be used while a previous such
3761  * work is still pending!
3762  */
3763 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3764                           int (*io_fn)(struct drbd_conf *),
3765                           void (*done)(struct drbd_conf *, int),
3766                           char *why)
3767 {
3768         D_ASSERT(current == mdev->worker.task);
3769
3770         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3771         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3772         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3773         if (mdev->bm_io_work.why)
3774                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3775                         why, mdev->bm_io_work.why);
3776
3777         mdev->bm_io_work.io_fn = io_fn;
3778         mdev->bm_io_work.done = done;
3779         mdev->bm_io_work.why = why;
3780
3781         set_bit(BITMAP_IO, &mdev->flags);
3782         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3783                 if (list_empty(&mdev->bm_io_work.w.list)) {
3784                         set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3785                         drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3786                 } else
3787                         dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3788         }
3789 }
3790
3791 /**
3792  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3793  * @mdev:       DRBD device.
3794  * @io_fn:      IO callback to be called when bitmap IO is possible
3795  * @why:        Descriptive text of the reason for doing the IO
3796  *
3797  * freezes application IO while that the actual IO operations runs. This
3798  * functions MAY NOT be called from worker context.
3799  */
3800 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3801 {
3802         int rv;
3803
3804         D_ASSERT(current != mdev->worker.task);
3805
3806         drbd_suspend_io(mdev);
3807
3808         drbd_bm_lock(mdev, why);
3809         rv = io_fn(mdev);
3810         drbd_bm_unlock(mdev);
3811
3812         drbd_resume_io(mdev);
3813
3814         return rv;
3815 }
3816
3817 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3818 {
3819         if ((mdev->ldev->md.flags & flag) != flag) {
3820                 drbd_md_mark_dirty(mdev);
3821                 mdev->ldev->md.flags |= flag;
3822         }
3823 }
3824
3825 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3826 {
3827         if ((mdev->ldev->md.flags & flag) != 0) {
3828                 drbd_md_mark_dirty(mdev);
3829                 mdev->ldev->md.flags &= ~flag;
3830         }
3831 }
3832 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3833 {
3834         return (bdev->md.flags & flag) != 0;
3835 }
3836
3837 static void md_sync_timer_fn(unsigned long data)
3838 {
3839         struct drbd_conf *mdev = (struct drbd_conf *) data;
3840
3841         drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3842 }
3843
3844 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3845 {
3846         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3847 #ifdef DEBUG
3848         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3849                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3850 #endif
3851         drbd_md_sync(mdev);
3852         return 1;
3853 }
3854
3855 #ifdef CONFIG_DRBD_FAULT_INJECTION
3856 /* Fault insertion support including random number generator shamelessly
3857  * stolen from kernel/rcutorture.c */
3858 struct fault_random_state {
3859         unsigned long state;
3860         unsigned long count;
3861 };
3862
3863 #define FAULT_RANDOM_MULT 39916801  /* prime */
3864 #define FAULT_RANDOM_ADD        479001701 /* prime */
3865 #define FAULT_RANDOM_REFRESH 10000
3866
3867 /*
3868  * Crude but fast random-number generator.  Uses a linear congruential
3869  * generator, with occasional help from get_random_bytes().
3870  */
3871 static unsigned long
3872 _drbd_fault_random(struct fault_random_state *rsp)
3873 {
3874         long refresh;
3875
3876         if (!rsp->count--) {
3877                 get_random_bytes(&refresh, sizeof(refresh));
3878                 rsp->state += refresh;
3879                 rsp->count = FAULT_RANDOM_REFRESH;
3880         }
3881         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3882         return swahw32(rsp->state);
3883 }
3884
3885 static char *
3886 _drbd_fault_str(unsigned int type) {
3887         static char *_faults[] = {
3888                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3889                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3890                 [DRBD_FAULT_RS_WR] = "Resync write",
3891                 [DRBD_FAULT_RS_RD] = "Resync read",
3892                 [DRBD_FAULT_DT_WR] = "Data write",
3893                 [DRBD_FAULT_DT_RD] = "Data read",
3894                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3895                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3896                 [DRBD_FAULT_AL_EE] = "EE allocation",
3897                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3898         };
3899
3900         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3901 }
3902
3903 unsigned int
3904 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3905 {
3906         static struct fault_random_state rrs = {0, 0};
3907
3908         unsigned int ret = (
3909                 (fault_devs == 0 ||
3910                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3911                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3912
3913         if (ret) {
3914                 fault_count++;
3915
3916                 if (__ratelimit(&drbd_ratelimit_state))
3917                         dev_warn(DEV, "***Simulating %s failure\n",
3918                                 _drbd_fault_str(type));
3919         }
3920
3921         return ret;
3922 }
3923 #endif
3924
3925 const char *drbd_buildtag(void)
3926 {
3927         /* DRBD built from external sources has here a reference to the
3928            git hash of the source code. */
3929
3930         static char buildtag[38] = "\0uilt-in";
3931
3932         if (buildtag[0] == 0) {
3933 #ifdef CONFIG_MODULES
3934                 if (THIS_MODULE != NULL)
3935                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3936                 else
3937 #endif
3938                         buildtag[0] = 'b';
3939         }
3940
3941         return buildtag;
3942 }
3943
3944 module_init(drbd_init)
3945 module_exit(drbd_cleanup)
3946
3947 EXPORT_SYMBOL(drbd_conn_str);
3948 EXPORT_SYMBOL(drbd_role_str);
3949 EXPORT_SYMBOL(drbd_disk_str);
3950 EXPORT_SYMBOL(drbd_set_st_err_str);