gpio/langwell_gpio: ack the correct bit for langwell gpio interrupts
[pandora-kernel.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281                 i = page_chain_free(page);
282         else {
283                 struct page *tmp;
284                 tmp = page_chain_tail(page, &i);
285                 spin_lock(&drbd_pp_lock);
286                 page_chain_add(&drbd_pp_pool, page, tmp);
287                 drbd_pp_vacant += i;
288                 spin_unlock(&drbd_pp_lock);
289         }
290         i = atomic_sub_return(i, a);
291         if (i < 0)
292                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294         wake_up(&drbd_pp_wait);
295 }
296
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313                                      u64 id,
314                                      sector_t sector,
315                                      unsigned int data_size,
316                                      gfp_t gfp_mask) __must_hold(local)
317 {
318         struct drbd_epoch_entry *e;
319         struct page *page;
320         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323                 return NULL;
324
325         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326         if (!e) {
327                 if (!(gfp_mask & __GFP_NOWARN))
328                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329                 return NULL;
330         }
331
332         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333         if (!page)
334                 goto fail;
335
336         INIT_HLIST_NODE(&e->collision);
337         e->epoch = NULL;
338         e->mdev = mdev;
339         e->pages = page;
340         atomic_set(&e->pending_bios, 0);
341         e->size = data_size;
342         e->flags = 0;
343         e->sector = sector;
344         e->block_id = id;
345
346         return e;
347
348  fail:
349         mempool_free(e, drbd_ee_mempool);
350         return NULL;
351 }
352
353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355         if (e->flags & EE_HAS_DIGEST)
356                 kfree(e->digest);
357         drbd_pp_free(mdev, e->pages, is_net);
358         D_ASSERT(atomic_read(&e->pending_bios) == 0);
359         D_ASSERT(hlist_unhashed(&e->collision));
360         mempool_free(e, drbd_ee_mempool);
361 }
362
363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365         LIST_HEAD(work_list);
366         struct drbd_epoch_entry *e, *t;
367         int count = 0;
368         int is_net = list == &mdev->net_ee;
369
370         spin_lock_irq(&mdev->req_lock);
371         list_splice_init(list, &work_list);
372         spin_unlock_irq(&mdev->req_lock);
373
374         list_for_each_entry_safe(e, t, &work_list, w.list) {
375                 drbd_free_some_ee(mdev, e, is_net);
376                 count++;
377         }
378         return count;
379 }
380
381
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393         LIST_HEAD(work_list);
394         LIST_HEAD(reclaimed);
395         struct drbd_epoch_entry *e, *t;
396         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397
398         spin_lock_irq(&mdev->req_lock);
399         reclaim_net_ee(mdev, &reclaimed);
400         list_splice_init(&mdev->done_ee, &work_list);
401         spin_unlock_irq(&mdev->req_lock);
402
403         list_for_each_entry_safe(e, t, &reclaimed, w.list)
404                 drbd_free_net_ee(mdev, e);
405
406         /* possible callbacks here:
407          * e_end_block, and e_end_resync_block, e_send_discard_ack.
408          * all ignore the last argument.
409          */
410         list_for_each_entry_safe(e, t, &work_list, w.list) {
411                 /* list_del not necessary, next/prev members not touched */
412                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
413                 drbd_free_ee(mdev, e);
414         }
415         wake_up(&mdev->ee_wait);
416
417         return ok;
418 }
419
420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422         DEFINE_WAIT(wait);
423
424         /* avoids spin_lock/unlock
425          * and calling prepare_to_wait in the fast path */
426         while (!list_empty(head)) {
427                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428                 spin_unlock_irq(&mdev->req_lock);
429                 io_schedule();
430                 finish_wait(&mdev->ee_wait, &wait);
431                 spin_lock_irq(&mdev->req_lock);
432         }
433 }
434
435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437         spin_lock_irq(&mdev->req_lock);
438         _drbd_wait_ee_list_empty(mdev, head);
439         spin_unlock_irq(&mdev->req_lock);
440 }
441
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445                 struct socket *sock, struct socket **newsock)
446 {
447         struct sock *sk = sock->sk;
448         int err = 0;
449
450         *what = "listen";
451         err = sock->ops->listen(sock, 5);
452         if (err < 0)
453                 goto out;
454
455         *what = "sock_create_lite";
456         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457                                newsock);
458         if (err < 0)
459                 goto out;
460
461         *what = "accept";
462         err = sock->ops->accept(sock, *newsock, 0);
463         if (err < 0) {
464                 sock_release(*newsock);
465                 *newsock = NULL;
466                 goto out;
467         }
468         (*newsock)->ops  = sock->ops;
469
470 out:
471         return err;
472 }
473
474 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
475                     void *buf, size_t size, int flags)
476 {
477         mm_segment_t oldfs;
478         struct kvec iov = {
479                 .iov_base = buf,
480                 .iov_len = size,
481         };
482         struct msghdr msg = {
483                 .msg_iovlen = 1,
484                 .msg_iov = (struct iovec *)&iov,
485                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486         };
487         int rv;
488
489         oldfs = get_fs();
490         set_fs(KERNEL_DS);
491         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492         set_fs(oldfs);
493
494         return rv;
495 }
496
497 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
498 {
499         mm_segment_t oldfs;
500         struct kvec iov = {
501                 .iov_base = buf,
502                 .iov_len = size,
503         };
504         struct msghdr msg = {
505                 .msg_iovlen = 1,
506                 .msg_iov = (struct iovec *)&iov,
507                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
508         };
509         int rv;
510
511         oldfs = get_fs();
512         set_fs(KERNEL_DS);
513
514         for (;;) {
515                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
516                 if (rv == size)
517                         break;
518
519                 /* Note:
520                  * ECONNRESET   other side closed the connection
521                  * ERESTARTSYS  (on  sock) we got a signal
522                  */
523
524                 if (rv < 0) {
525                         if (rv == -ECONNRESET)
526                                 dev_info(DEV, "sock was reset by peer\n");
527                         else if (rv != -ERESTARTSYS)
528                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
529                         break;
530                 } else if (rv == 0) {
531                         dev_info(DEV, "sock was shut down by peer\n");
532                         break;
533                 } else  {
534                         /* signal came in, or peer/link went down,
535                          * after we read a partial message
536                          */
537                         /* D_ASSERT(signal_pending(current)); */
538                         break;
539                 }
540         };
541
542         set_fs(oldfs);
543
544         if (rv != size)
545                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
546
547         return rv;
548 }
549
550 /* quoting tcp(7):
551  *   On individual connections, the socket buffer size must be set prior to the
552  *   listen(2) or connect(2) calls in order to have it take effect.
553  * This is our wrapper to do so.
554  */
555 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556                 unsigned int rcv)
557 {
558         /* open coded SO_SNDBUF, SO_RCVBUF */
559         if (snd) {
560                 sock->sk->sk_sndbuf = snd;
561                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562         }
563         if (rcv) {
564                 sock->sk->sk_rcvbuf = rcv;
565                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566         }
567 }
568
569 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
570 {
571         const char *what;
572         struct socket *sock;
573         struct sockaddr_in6 src_in6;
574         int err;
575         int disconnect_on_error = 1;
576
577         if (!get_net_conf(mdev))
578                 return NULL;
579
580         what = "sock_create_kern";
581         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
582                 SOCK_STREAM, IPPROTO_TCP, &sock);
583         if (err < 0) {
584                 sock = NULL;
585                 goto out;
586         }
587
588         sock->sk->sk_rcvtimeo =
589         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
590         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
591                         mdev->net_conf->rcvbuf_size);
592
593        /* explicitly bind to the configured IP as source IP
594         *  for the outgoing connections.
595         *  This is needed for multihomed hosts and to be
596         *  able to use lo: interfaces for drbd.
597         * Make sure to use 0 as port number, so linux selects
598         *  a free one dynamically.
599         */
600         memcpy(&src_in6, mdev->net_conf->my_addr,
601                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
602         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
603                 src_in6.sin6_port = 0;
604         else
605                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
606
607         what = "bind before connect";
608         err = sock->ops->bind(sock,
609                               (struct sockaddr *) &src_in6,
610                               mdev->net_conf->my_addr_len);
611         if (err < 0)
612                 goto out;
613
614         /* connect may fail, peer not yet available.
615          * stay C_WF_CONNECTION, don't go Disconnecting! */
616         disconnect_on_error = 0;
617         what = "connect";
618         err = sock->ops->connect(sock,
619                                  (struct sockaddr *)mdev->net_conf->peer_addr,
620                                  mdev->net_conf->peer_addr_len, 0);
621
622 out:
623         if (err < 0) {
624                 if (sock) {
625                         sock_release(sock);
626                         sock = NULL;
627                 }
628                 switch (-err) {
629                         /* timeout, busy, signal pending */
630                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
631                 case EINTR: case ERESTARTSYS:
632                         /* peer not (yet) available, network problem */
633                 case ECONNREFUSED: case ENETUNREACH:
634                 case EHOSTDOWN:    case EHOSTUNREACH:
635                         disconnect_on_error = 0;
636                         break;
637                 default:
638                         dev_err(DEV, "%s failed, err = %d\n", what, err);
639                 }
640                 if (disconnect_on_error)
641                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
642         }
643         put_net_conf(mdev);
644         return sock;
645 }
646
647 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
648 {
649         int timeo, err;
650         struct socket *s_estab = NULL, *s_listen;
651         const char *what;
652
653         if (!get_net_conf(mdev))
654                 return NULL;
655
656         what = "sock_create_kern";
657         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
658                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
659         if (err) {
660                 s_listen = NULL;
661                 goto out;
662         }
663
664         timeo = mdev->net_conf->try_connect_int * HZ;
665         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
666
667         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
668         s_listen->sk->sk_rcvtimeo = timeo;
669         s_listen->sk->sk_sndtimeo = timeo;
670         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
671                         mdev->net_conf->rcvbuf_size);
672
673         what = "bind before listen";
674         err = s_listen->ops->bind(s_listen,
675                               (struct sockaddr *) mdev->net_conf->my_addr,
676                               mdev->net_conf->my_addr_len);
677         if (err < 0)
678                 goto out;
679
680         err = drbd_accept(mdev, &what, s_listen, &s_estab);
681
682 out:
683         if (s_listen)
684                 sock_release(s_listen);
685         if (err < 0) {
686                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
687                         dev_err(DEV, "%s failed, err = %d\n", what, err);
688                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
689                 }
690         }
691         put_net_conf(mdev);
692
693         return s_estab;
694 }
695
696 static int drbd_send_fp(struct drbd_conf *mdev,
697         struct socket *sock, enum drbd_packets cmd)
698 {
699         struct p_header80 *h = &mdev->data.sbuf.header.h80;
700
701         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
702 }
703
704 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
705 {
706         struct p_header80 *h = &mdev->data.rbuf.header.h80;
707         int rr;
708
709         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
710
711         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
712                 return be16_to_cpu(h->command);
713
714         return 0xffff;
715 }
716
717 /**
718  * drbd_socket_okay() - Free the socket if its connection is not okay
719  * @mdev:       DRBD device.
720  * @sock:       pointer to the pointer to the socket.
721  */
722 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
723 {
724         int rr;
725         char tb[4];
726
727         if (!*sock)
728                 return false;
729
730         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
731
732         if (rr > 0 || rr == -EAGAIN) {
733                 return true;
734         } else {
735                 sock_release(*sock);
736                 *sock = NULL;
737                 return false;
738         }
739 }
740
741 /*
742  * return values:
743  *   1 yes, we have a valid connection
744  *   0 oops, did not work out, please try again
745  *  -1 peer talks different language,
746  *     no point in trying again, please go standalone.
747  *  -2 We do not have a network config...
748  */
749 static int drbd_connect(struct drbd_conf *mdev)
750 {
751         struct socket *s, *sock, *msock;
752         int try, h, ok;
753
754         D_ASSERT(!mdev->data.socket);
755
756         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757                 return -2;
758
759         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761         sock  = NULL;
762         msock = NULL;
763
764         do {
765                 for (try = 0;;) {
766                         /* 3 tries, this should take less than a second! */
767                         s = drbd_try_connect(mdev);
768                         if (s || ++try >= 3)
769                                 break;
770                         /* give the other side time to call bind() & listen() */
771                         schedule_timeout_interruptible(HZ / 10);
772                 }
773
774                 if (s) {
775                         if (!sock) {
776                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
777                                 sock = s;
778                                 s = NULL;
779                         } else if (!msock) {
780                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
781                                 msock = s;
782                                 s = NULL;
783                         } else {
784                                 dev_err(DEV, "Logic error in drbd_connect()\n");
785                                 goto out_release_sockets;
786                         }
787                 }
788
789                 if (sock && msock) {
790                         schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
791                         ok = drbd_socket_okay(mdev, &sock);
792                         ok = drbd_socket_okay(mdev, &msock) && ok;
793                         if (ok)
794                                 break;
795                 }
796
797 retry:
798                 s = drbd_wait_for_connect(mdev);
799                 if (s) {
800                         try = drbd_recv_fp(mdev, s);
801                         drbd_socket_okay(mdev, &sock);
802                         drbd_socket_okay(mdev, &msock);
803                         switch (try) {
804                         case P_HAND_SHAKE_S:
805                                 if (sock) {
806                                         dev_warn(DEV, "initial packet S crossed\n");
807                                         sock_release(sock);
808                                 }
809                                 sock = s;
810                                 break;
811                         case P_HAND_SHAKE_M:
812                                 if (msock) {
813                                         dev_warn(DEV, "initial packet M crossed\n");
814                                         sock_release(msock);
815                                 }
816                                 msock = s;
817                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
818                                 break;
819                         default:
820                                 dev_warn(DEV, "Error receiving initial packet\n");
821                                 sock_release(s);
822                                 if (random32() & 1)
823                                         goto retry;
824                         }
825                 }
826
827                 if (mdev->state.conn <= C_DISCONNECTING)
828                         goto out_release_sockets;
829                 if (signal_pending(current)) {
830                         flush_signals(current);
831                         smp_rmb();
832                         if (get_t_state(&mdev->receiver) == Exiting)
833                                 goto out_release_sockets;
834                 }
835
836                 if (sock && msock) {
837                         ok = drbd_socket_okay(mdev, &sock);
838                         ok = drbd_socket_okay(mdev, &msock) && ok;
839                         if (ok)
840                                 break;
841                 }
842         } while (1);
843
844         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
845         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
846
847         sock->sk->sk_allocation = GFP_NOIO;
848         msock->sk->sk_allocation = GFP_NOIO;
849
850         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
851         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
852
853         /* NOT YET ...
854          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856          * first set it to the P_HAND_SHAKE timeout,
857          * which we set to 4x the configured ping_timeout. */
858         sock->sk->sk_sndtimeo =
859         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864         /* we don't want delays.
865          * we use TCP_CORK where appropriate, though */
866         drbd_tcp_nodelay(sock);
867         drbd_tcp_nodelay(msock);
868
869         mdev->data.socket = sock;
870         mdev->meta.socket = msock;
871         mdev->last_received = jiffies;
872
873         D_ASSERT(mdev->asender.task == NULL);
874
875         h = drbd_do_handshake(mdev);
876         if (h <= 0)
877                 return h;
878
879         if (mdev->cram_hmac_tfm) {
880                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
881                 switch (drbd_do_auth(mdev)) {
882                 case -1:
883                         dev_err(DEV, "Authentication of peer failed\n");
884                         return -1;
885                 case 0:
886                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
887                         return 0;
888                 }
889         }
890
891         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892                 return 0;
893
894         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897         atomic_set(&mdev->packet_seq, 0);
898         mdev->peer_seq = 0;
899
900         drbd_thread_start(&mdev->asender);
901
902         if (drbd_send_protocol(mdev) == -1)
903                 return -1;
904         drbd_send_sync_param(mdev, &mdev->sync_conf);
905         drbd_send_sizes(mdev, 0, 0);
906         drbd_send_uuids(mdev);
907         drbd_send_state(mdev);
908         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909         clear_bit(RESIZE_PENDING, &mdev->flags);
910         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
911
912         return 1;
913
914 out_release_sockets:
915         if (sock)
916                 sock_release(sock);
917         if (msock)
918                 sock_release(msock);
919         return -1;
920 }
921
922 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
923 {
924         union p_header *h = &mdev->data.rbuf.header;
925         int r;
926
927         r = drbd_recv(mdev, h, sizeof(*h));
928         if (unlikely(r != sizeof(*h))) {
929                 if (!signal_pending(current))
930                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
931                 return false;
932         }
933
934         if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
935                 *cmd = be16_to_cpu(h->h80.command);
936                 *packet_size = be16_to_cpu(h->h80.length);
937         } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
938                 *cmd = be16_to_cpu(h->h95.command);
939                 *packet_size = be32_to_cpu(h->h95.length);
940         } else {
941                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
942                     be32_to_cpu(h->h80.magic),
943                     be16_to_cpu(h->h80.command),
944                     be16_to_cpu(h->h80.length));
945                 return false;
946         }
947         mdev->last_received = jiffies;
948
949         return true;
950 }
951
952 static void drbd_flush(struct drbd_conf *mdev)
953 {
954         int rv;
955
956         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
957                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
958                                         NULL);
959                 if (rv) {
960                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
961                         /* would rather check on EOPNOTSUPP, but that is not reliable.
962                          * don't try again for ANY return value != 0
963                          * if (rv == -EOPNOTSUPP) */
964                         drbd_bump_write_ordering(mdev, WO_drain_io);
965                 }
966                 put_ldev(mdev);
967         }
968 }
969
970 /**
971  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
972  * @mdev:       DRBD device.
973  * @epoch:      Epoch object.
974  * @ev:         Epoch event.
975  */
976 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
977                                                struct drbd_epoch *epoch,
978                                                enum epoch_event ev)
979 {
980         int epoch_size;
981         struct drbd_epoch *next_epoch;
982         enum finish_epoch rv = FE_STILL_LIVE;
983
984         spin_lock(&mdev->epoch_lock);
985         do {
986                 next_epoch = NULL;
987
988                 epoch_size = atomic_read(&epoch->epoch_size);
989
990                 switch (ev & ~EV_CLEANUP) {
991                 case EV_PUT:
992                         atomic_dec(&epoch->active);
993                         break;
994                 case EV_GOT_BARRIER_NR:
995                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
996                         break;
997                 case EV_BECAME_LAST:
998                         /* nothing to do*/
999                         break;
1000                 }
1001
1002                 if (epoch_size != 0 &&
1003                     atomic_read(&epoch->active) == 0 &&
1004                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1005                         if (!(ev & EV_CLEANUP)) {
1006                                 spin_unlock(&mdev->epoch_lock);
1007                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1008                                 spin_lock(&mdev->epoch_lock);
1009                         }
1010                         dec_unacked(mdev);
1011
1012                         if (mdev->current_epoch != epoch) {
1013                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1014                                 list_del(&epoch->list);
1015                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1016                                 mdev->epochs--;
1017                                 kfree(epoch);
1018
1019                                 if (rv == FE_STILL_LIVE)
1020                                         rv = FE_DESTROYED;
1021                         } else {
1022                                 epoch->flags = 0;
1023                                 atomic_set(&epoch->epoch_size, 0);
1024                                 /* atomic_set(&epoch->active, 0); is already zero */
1025                                 if (rv == FE_STILL_LIVE)
1026                                         rv = FE_RECYCLED;
1027                                 wake_up(&mdev->ee_wait);
1028                         }
1029                 }
1030
1031                 if (!next_epoch)
1032                         break;
1033
1034                 epoch = next_epoch;
1035         } while (1);
1036
1037         spin_unlock(&mdev->epoch_lock);
1038
1039         return rv;
1040 }
1041
1042 /**
1043  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1044  * @mdev:       DRBD device.
1045  * @wo:         Write ordering method to try.
1046  */
1047 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1048 {
1049         enum write_ordering_e pwo;
1050         static char *write_ordering_str[] = {
1051                 [WO_none] = "none",
1052                 [WO_drain_io] = "drain",
1053                 [WO_bdev_flush] = "flush",
1054         };
1055
1056         pwo = mdev->write_ordering;
1057         wo = min(pwo, wo);
1058         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1059                 wo = WO_drain_io;
1060         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1061                 wo = WO_none;
1062         mdev->write_ordering = wo;
1063         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1064                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1065 }
1066
1067 /**
1068  * drbd_submit_ee()
1069  * @mdev:       DRBD device.
1070  * @e:          epoch entry
1071  * @rw:         flag field, see bio->bi_rw
1072  *
1073  * May spread the pages to multiple bios,
1074  * depending on bio_add_page restrictions.
1075  *
1076  * Returns 0 if all bios have been submitted,
1077  * -ENOMEM if we could not allocate enough bios,
1078  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1079  *  single page to an empty bio (which should never happen and likely indicates
1080  *  that the lower level IO stack is in some way broken). This has been observed
1081  *  on certain Xen deployments.
1082  */
1083 /* TODO allocate from our own bio_set. */
1084 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1085                 const unsigned rw, const int fault_type)
1086 {
1087         struct bio *bios = NULL;
1088         struct bio *bio;
1089         struct page *page = e->pages;
1090         sector_t sector = e->sector;
1091         unsigned ds = e->size;
1092         unsigned n_bios = 0;
1093         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1094         int err = -ENOMEM;
1095
1096         /* In most cases, we will only need one bio.  But in case the lower
1097          * level restrictions happen to be different at this offset on this
1098          * side than those of the sending peer, we may need to submit the
1099          * request in more than one bio. */
1100 next_bio:
1101         bio = bio_alloc(GFP_NOIO, nr_pages);
1102         if (!bio) {
1103                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1104                 goto fail;
1105         }
1106         /* > e->sector, unless this is the first bio */
1107         bio->bi_sector = sector;
1108         bio->bi_bdev = mdev->ldev->backing_bdev;
1109         bio->bi_rw = rw;
1110         bio->bi_private = e;
1111         bio->bi_end_io = drbd_endio_sec;
1112
1113         bio->bi_next = bios;
1114         bios = bio;
1115         ++n_bios;
1116
1117         page_chain_for_each(page) {
1118                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1119                 if (!bio_add_page(bio, page, len, 0)) {
1120                         /* A single page must always be possible!
1121                          * But in case it fails anyways,
1122                          * we deal with it, and complain (below). */
1123                         if (bio->bi_vcnt == 0) {
1124                                 dev_err(DEV,
1125                                         "bio_add_page failed for len=%u, "
1126                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1127                                         len, (unsigned long long)bio->bi_sector);
1128                                 err = -ENOSPC;
1129                                 goto fail;
1130                         }
1131                         goto next_bio;
1132                 }
1133                 ds -= len;
1134                 sector += len >> 9;
1135                 --nr_pages;
1136         }
1137         D_ASSERT(page == NULL);
1138         D_ASSERT(ds == 0);
1139
1140         atomic_set(&e->pending_bios, n_bios);
1141         do {
1142                 bio = bios;
1143                 bios = bios->bi_next;
1144                 bio->bi_next = NULL;
1145
1146                 drbd_generic_make_request(mdev, fault_type, bio);
1147         } while (bios);
1148         return 0;
1149
1150 fail:
1151         while (bios) {
1152                 bio = bios;
1153                 bios = bios->bi_next;
1154                 bio_put(bio);
1155         }
1156         return err;
1157 }
1158
1159 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1160 {
1161         int rv;
1162         struct p_barrier *p = &mdev->data.rbuf.barrier;
1163         struct drbd_epoch *epoch;
1164
1165         inc_unacked(mdev);
1166
1167         mdev->current_epoch->barrier_nr = p->barrier;
1168         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1169
1170         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1171          * the activity log, which means it would not be resynced in case the
1172          * R_PRIMARY crashes now.
1173          * Therefore we must send the barrier_ack after the barrier request was
1174          * completed. */
1175         switch (mdev->write_ordering) {
1176         case WO_none:
1177                 if (rv == FE_RECYCLED)
1178                         return true;
1179
1180                 /* receiver context, in the writeout path of the other node.
1181                  * avoid potential distributed deadlock */
1182                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1183                 if (epoch)
1184                         break;
1185                 else
1186                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1187                         /* Fall through */
1188
1189         case WO_bdev_flush:
1190         case WO_drain_io:
1191                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1192                 drbd_flush(mdev);
1193
1194                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1195                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1196                         if (epoch)
1197                                 break;
1198                 }
1199
1200                 epoch = mdev->current_epoch;
1201                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1202
1203                 D_ASSERT(atomic_read(&epoch->active) == 0);
1204                 D_ASSERT(epoch->flags == 0);
1205
1206                 return true;
1207         default:
1208                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1209                 return false;
1210         }
1211
1212         epoch->flags = 0;
1213         atomic_set(&epoch->epoch_size, 0);
1214         atomic_set(&epoch->active, 0);
1215
1216         spin_lock(&mdev->epoch_lock);
1217         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1218                 list_add(&epoch->list, &mdev->current_epoch->list);
1219                 mdev->current_epoch = epoch;
1220                 mdev->epochs++;
1221         } else {
1222                 /* The current_epoch got recycled while we allocated this one... */
1223                 kfree(epoch);
1224         }
1225         spin_unlock(&mdev->epoch_lock);
1226
1227         return true;
1228 }
1229
1230 /* used from receive_RSDataReply (recv_resync_read)
1231  * and from receive_Data */
1232 static struct drbd_epoch_entry *
1233 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1234 {
1235         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1236         struct drbd_epoch_entry *e;
1237         struct page *page;
1238         int dgs, ds, rr;
1239         void *dig_in = mdev->int_dig_in;
1240         void *dig_vv = mdev->int_dig_vv;
1241         unsigned long *data;
1242
1243         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1244                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1245
1246         if (dgs) {
1247                 rr = drbd_recv(mdev, dig_in, dgs);
1248                 if (rr != dgs) {
1249                         if (!signal_pending(current))
1250                                 dev_warn(DEV,
1251                                         "short read receiving data digest: read %d expected %d\n",
1252                                         rr, dgs);
1253                         return NULL;
1254                 }
1255         }
1256
1257         data_size -= dgs;
1258
1259         ERR_IF(data_size == 0) return NULL;
1260         ERR_IF(data_size &  0x1ff) return NULL;
1261         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1262
1263         /* even though we trust out peer,
1264          * we sometimes have to double check. */
1265         if (sector + (data_size>>9) > capacity) {
1266                 dev_err(DEV, "request from peer beyond end of local disk: "
1267                         "capacity: %llus < sector: %llus + size: %u\n",
1268                         (unsigned long long)capacity,
1269                         (unsigned long long)sector, data_size);
1270                 return NULL;
1271         }
1272
1273         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1274          * "criss-cross" setup, that might cause write-out on some other DRBD,
1275          * which in turn might block on the other node at this very place.  */
1276         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1277         if (!e)
1278                 return NULL;
1279
1280         ds = data_size;
1281         page = e->pages;
1282         page_chain_for_each(page) {
1283                 unsigned len = min_t(int, ds, PAGE_SIZE);
1284                 data = kmap(page);
1285                 rr = drbd_recv(mdev, data, len);
1286                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1287                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1288                         data[0] = data[0] ^ (unsigned long)-1;
1289                 }
1290                 kunmap(page);
1291                 if (rr != len) {
1292                         drbd_free_ee(mdev, e);
1293                         if (!signal_pending(current))
1294                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1295                                 rr, len);
1296                         return NULL;
1297                 }
1298                 ds -= rr;
1299         }
1300
1301         if (dgs) {
1302                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1303                 if (memcmp(dig_in, dig_vv, dgs)) {
1304                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1305                                 (unsigned long long)sector, data_size);
1306                         drbd_bcast_ee(mdev, "digest failed",
1307                                         dgs, dig_in, dig_vv, e);
1308                         drbd_free_ee(mdev, e);
1309                         return NULL;
1310                 }
1311         }
1312         mdev->recv_cnt += data_size>>9;
1313         return e;
1314 }
1315
1316 /* drbd_drain_block() just takes a data block
1317  * out of the socket input buffer, and discards it.
1318  */
1319 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1320 {
1321         struct page *page;
1322         int rr, rv = 1;
1323         void *data;
1324
1325         if (!data_size)
1326                 return true;
1327
1328         page = drbd_pp_alloc(mdev, 1, 1);
1329
1330         data = kmap(page);
1331         while (data_size) {
1332                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334                         rv = 0;
1335                         if (!signal_pending(current))
1336                                 dev_warn(DEV,
1337                                         "short read receiving data: read %d expected %d\n",
1338                                         rr, min_t(int, data_size, PAGE_SIZE));
1339                         break;
1340                 }
1341                 data_size -= rr;
1342         }
1343         kunmap(page);
1344         drbd_pp_free(mdev, page, 0);
1345         return rv;
1346 }
1347
1348 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1349                            sector_t sector, int data_size)
1350 {
1351         struct bio_vec *bvec;
1352         struct bio *bio;
1353         int dgs, rr, i, expect;
1354         void *dig_in = mdev->int_dig_in;
1355         void *dig_vv = mdev->int_dig_vv;
1356
1357         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1358                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1359
1360         if (dgs) {
1361                 rr = drbd_recv(mdev, dig_in, dgs);
1362                 if (rr != dgs) {
1363                         if (!signal_pending(current))
1364                                 dev_warn(DEV,
1365                                         "short read receiving data reply digest: read %d expected %d\n",
1366                                         rr, dgs);
1367                         return 0;
1368                 }
1369         }
1370
1371         data_size -= dgs;
1372
1373         /* optimistically update recv_cnt.  if receiving fails below,
1374          * we disconnect anyways, and counters will be reset. */
1375         mdev->recv_cnt += data_size>>9;
1376
1377         bio = req->master_bio;
1378         D_ASSERT(sector == bio->bi_sector);
1379
1380         bio_for_each_segment(bvec, bio, i) {
1381                 expect = min_t(int, data_size, bvec->bv_len);
1382                 rr = drbd_recv(mdev,
1383                              kmap(bvec->bv_page)+bvec->bv_offset,
1384                              expect);
1385                 kunmap(bvec->bv_page);
1386                 if (rr != expect) {
1387                         if (!signal_pending(current))
1388                                 dev_warn(DEV, "short read receiving data reply: "
1389                                         "read %d expected %d\n",
1390                                         rr, expect);
1391                         return 0;
1392                 }
1393                 data_size -= rr;
1394         }
1395
1396         if (dgs) {
1397                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1398                 if (memcmp(dig_in, dig_vv, dgs)) {
1399                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1400                         return 0;
1401                 }
1402         }
1403
1404         D_ASSERT(data_size == 0);
1405         return 1;
1406 }
1407
1408 /* e_end_resync_block() is called via
1409  * drbd_process_done_ee() by asender only */
1410 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1411 {
1412         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1413         sector_t sector = e->sector;
1414         int ok;
1415
1416         D_ASSERT(hlist_unhashed(&e->collision));
1417
1418         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1419                 drbd_set_in_sync(mdev, sector, e->size);
1420                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1421         } else {
1422                 /* Record failure to sync */
1423                 drbd_rs_failed_io(mdev, sector, e->size);
1424
1425                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1426         }
1427         dec_unacked(mdev);
1428
1429         return ok;
1430 }
1431
1432 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1433 {
1434         struct drbd_epoch_entry *e;
1435
1436         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1437         if (!e)
1438                 goto fail;
1439
1440         dec_rs_pending(mdev);
1441
1442         inc_unacked(mdev);
1443         /* corresponding dec_unacked() in e_end_resync_block()
1444          * respective _drbd_clear_done_ee */
1445
1446         e->w.cb = e_end_resync_block;
1447
1448         spin_lock_irq(&mdev->req_lock);
1449         list_add(&e->w.list, &mdev->sync_ee);
1450         spin_unlock_irq(&mdev->req_lock);
1451
1452         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1453         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1454                 return true;
1455
1456         /* don't care for the reason here */
1457         dev_err(DEV, "submit failed, triggering re-connect\n");
1458         spin_lock_irq(&mdev->req_lock);
1459         list_del(&e->w.list);
1460         spin_unlock_irq(&mdev->req_lock);
1461
1462         drbd_free_ee(mdev, e);
1463 fail:
1464         put_ldev(mdev);
1465         return false;
1466 }
1467
1468 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1469 {
1470         struct drbd_request *req;
1471         sector_t sector;
1472         int ok;
1473         struct p_data *p = &mdev->data.rbuf.data;
1474
1475         sector = be64_to_cpu(p->sector);
1476
1477         spin_lock_irq(&mdev->req_lock);
1478         req = _ar_id_to_req(mdev, p->block_id, sector);
1479         spin_unlock_irq(&mdev->req_lock);
1480         if (unlikely(!req)) {
1481                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1482                 return false;
1483         }
1484
1485         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1486          * special casing it there for the various failure cases.
1487          * still no race with drbd_fail_pending_reads */
1488         ok = recv_dless_read(mdev, req, sector, data_size);
1489
1490         if (ok)
1491                 req_mod(req, data_received);
1492         /* else: nothing. handled from drbd_disconnect...
1493          * I don't think we may complete this just yet
1494          * in case we are "on-disconnect: freeze" */
1495
1496         return ok;
1497 }
1498
1499 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1500 {
1501         sector_t sector;
1502         int ok;
1503         struct p_data *p = &mdev->data.rbuf.data;
1504
1505         sector = be64_to_cpu(p->sector);
1506         D_ASSERT(p->block_id == ID_SYNCER);
1507
1508         if (get_ldev(mdev)) {
1509                 /* data is submitted to disk within recv_resync_read.
1510                  * corresponding put_ldev done below on error,
1511                  * or in drbd_endio_write_sec. */
1512                 ok = recv_resync_read(mdev, sector, data_size);
1513         } else {
1514                 if (__ratelimit(&drbd_ratelimit_state))
1515                         dev_err(DEV, "Can not write resync data to local disk.\n");
1516
1517                 ok = drbd_drain_block(mdev, data_size);
1518
1519                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1520         }
1521
1522         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1523
1524         return ok;
1525 }
1526
1527 /* e_end_block() is called via drbd_process_done_ee().
1528  * this means this function only runs in the asender thread
1529  */
1530 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1531 {
1532         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1533         sector_t sector = e->sector;
1534         int ok = 1, pcmd;
1535
1536         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1537                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1538                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1539                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1540                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1541                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1542                         ok &= drbd_send_ack(mdev, pcmd, e);
1543                         if (pcmd == P_RS_WRITE_ACK)
1544                                 drbd_set_in_sync(mdev, sector, e->size);
1545                 } else {
1546                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1547                         /* we expect it to be marked out of sync anyways...
1548                          * maybe assert this?  */
1549                 }
1550                 dec_unacked(mdev);
1551         }
1552         /* we delete from the conflict detection hash _after_ we sent out the
1553          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1554         if (mdev->net_conf->two_primaries) {
1555                 spin_lock_irq(&mdev->req_lock);
1556                 D_ASSERT(!hlist_unhashed(&e->collision));
1557                 hlist_del_init(&e->collision);
1558                 spin_unlock_irq(&mdev->req_lock);
1559         } else {
1560                 D_ASSERT(hlist_unhashed(&e->collision));
1561         }
1562
1563         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1564
1565         return ok;
1566 }
1567
1568 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1569 {
1570         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1571         int ok = 1;
1572
1573         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1574         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1575
1576         spin_lock_irq(&mdev->req_lock);
1577         D_ASSERT(!hlist_unhashed(&e->collision));
1578         hlist_del_init(&e->collision);
1579         spin_unlock_irq(&mdev->req_lock);
1580
1581         dec_unacked(mdev);
1582
1583         return ok;
1584 }
1585
1586 /* Called from receive_Data.
1587  * Synchronize packets on sock with packets on msock.
1588  *
1589  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1590  * packet traveling on msock, they are still processed in the order they have
1591  * been sent.
1592  *
1593  * Note: we don't care for Ack packets overtaking P_DATA packets.
1594  *
1595  * In case packet_seq is larger than mdev->peer_seq number, there are
1596  * outstanding packets on the msock. We wait for them to arrive.
1597  * In case we are the logically next packet, we update mdev->peer_seq
1598  * ourselves. Correctly handles 32bit wrap around.
1599  *
1600  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1601  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1602  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1603  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1604  *
1605  * returns 0 if we may process the packet,
1606  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1607 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1608 {
1609         DEFINE_WAIT(wait);
1610         unsigned int p_seq;
1611         long timeout;
1612         int ret = 0;
1613         spin_lock(&mdev->peer_seq_lock);
1614         for (;;) {
1615                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1616                 if (seq_le(packet_seq, mdev->peer_seq+1))
1617                         break;
1618                 if (signal_pending(current)) {
1619                         ret = -ERESTARTSYS;
1620                         break;
1621                 }
1622                 p_seq = mdev->peer_seq;
1623                 spin_unlock(&mdev->peer_seq_lock);
1624                 timeout = schedule_timeout(30*HZ);
1625                 spin_lock(&mdev->peer_seq_lock);
1626                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1627                         ret = -ETIMEDOUT;
1628                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1629                         break;
1630                 }
1631         }
1632         finish_wait(&mdev->seq_wait, &wait);
1633         if (mdev->peer_seq+1 == packet_seq)
1634                 mdev->peer_seq++;
1635         spin_unlock(&mdev->peer_seq_lock);
1636         return ret;
1637 }
1638
1639 /* see also bio_flags_to_wire()
1640  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1641  * flags and back. We may replicate to other kernel versions. */
1642 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1643 {
1644         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1645                 (dpf & DP_FUA ? REQ_FUA : 0) |
1646                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1647                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1648 }
1649
1650 /* mirrored write */
1651 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1652 {
1653         sector_t sector;
1654         struct drbd_epoch_entry *e;
1655         struct p_data *p = &mdev->data.rbuf.data;
1656         int rw = WRITE;
1657         u32 dp_flags;
1658
1659         if (!get_ldev(mdev)) {
1660                 spin_lock(&mdev->peer_seq_lock);
1661                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1662                         mdev->peer_seq++;
1663                 spin_unlock(&mdev->peer_seq_lock);
1664
1665                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1666                 atomic_inc(&mdev->current_epoch->epoch_size);
1667                 return drbd_drain_block(mdev, data_size);
1668         }
1669
1670         /* get_ldev(mdev) successful.
1671          * Corresponding put_ldev done either below (on various errors),
1672          * or in drbd_endio_write_sec, if we successfully submit the data at
1673          * the end of this function. */
1674
1675         sector = be64_to_cpu(p->sector);
1676         e = read_in_block(mdev, p->block_id, sector, data_size);
1677         if (!e) {
1678                 put_ldev(mdev);
1679                 return false;
1680         }
1681
1682         e->w.cb = e_end_block;
1683
1684         dp_flags = be32_to_cpu(p->dp_flags);
1685         rw |= wire_flags_to_bio(mdev, dp_flags);
1686
1687         if (dp_flags & DP_MAY_SET_IN_SYNC)
1688                 e->flags |= EE_MAY_SET_IN_SYNC;
1689
1690         spin_lock(&mdev->epoch_lock);
1691         e->epoch = mdev->current_epoch;
1692         atomic_inc(&e->epoch->epoch_size);
1693         atomic_inc(&e->epoch->active);
1694         spin_unlock(&mdev->epoch_lock);
1695
1696         /* I'm the receiver, I do hold a net_cnt reference. */
1697         if (!mdev->net_conf->two_primaries) {
1698                 spin_lock_irq(&mdev->req_lock);
1699         } else {
1700                 /* don't get the req_lock yet,
1701                  * we may sleep in drbd_wait_peer_seq */
1702                 const int size = e->size;
1703                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1704                 DEFINE_WAIT(wait);
1705                 struct drbd_request *i;
1706                 struct hlist_node *n;
1707                 struct hlist_head *slot;
1708                 int first;
1709
1710                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1711                 BUG_ON(mdev->ee_hash == NULL);
1712                 BUG_ON(mdev->tl_hash == NULL);
1713
1714                 /* conflict detection and handling:
1715                  * 1. wait on the sequence number,
1716                  *    in case this data packet overtook ACK packets.
1717                  * 2. check our hash tables for conflicting requests.
1718                  *    we only need to walk the tl_hash, since an ee can not
1719                  *    have a conflict with an other ee: on the submitting
1720                  *    node, the corresponding req had already been conflicting,
1721                  *    and a conflicting req is never sent.
1722                  *
1723                  * Note: for two_primaries, we are protocol C,
1724                  * so there cannot be any request that is DONE
1725                  * but still on the transfer log.
1726                  *
1727                  * unconditionally add to the ee_hash.
1728                  *
1729                  * if no conflicting request is found:
1730                  *    submit.
1731                  *
1732                  * if any conflicting request is found
1733                  * that has not yet been acked,
1734                  * AND I have the "discard concurrent writes" flag:
1735                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1736                  *
1737                  * if any conflicting request is found:
1738                  *       block the receiver, waiting on misc_wait
1739                  *       until no more conflicting requests are there,
1740                  *       or we get interrupted (disconnect).
1741                  *
1742                  *       we do not just write after local io completion of those
1743                  *       requests, but only after req is done completely, i.e.
1744                  *       we wait for the P_DISCARD_ACK to arrive!
1745                  *
1746                  *       then proceed normally, i.e. submit.
1747                  */
1748                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1749                         goto out_interrupted;
1750
1751                 spin_lock_irq(&mdev->req_lock);
1752
1753                 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1754
1755 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1756                 slot = tl_hash_slot(mdev, sector);
1757                 first = 1;
1758                 for (;;) {
1759                         int have_unacked = 0;
1760                         int have_conflict = 0;
1761                         prepare_to_wait(&mdev->misc_wait, &wait,
1762                                 TASK_INTERRUPTIBLE);
1763                         hlist_for_each_entry(i, n, slot, collision) {
1764                                 if (OVERLAPS) {
1765                                         /* only ALERT on first iteration,
1766                                          * we may be woken up early... */
1767                                         if (first)
1768                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1769                                                       " new: %llus +%u; pending: %llus +%u\n",
1770                                                       current->comm, current->pid,
1771                                                       (unsigned long long)sector, size,
1772                                                       (unsigned long long)i->sector, i->size);
1773                                         if (i->rq_state & RQ_NET_PENDING)
1774                                                 ++have_unacked;
1775                                         ++have_conflict;
1776                                 }
1777                         }
1778 #undef OVERLAPS
1779                         if (!have_conflict)
1780                                 break;
1781
1782                         /* Discard Ack only for the _first_ iteration */
1783                         if (first && discard && have_unacked) {
1784                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1785                                      (unsigned long long)sector);
1786                                 inc_unacked(mdev);
1787                                 e->w.cb = e_send_discard_ack;
1788                                 list_add_tail(&e->w.list, &mdev->done_ee);
1789
1790                                 spin_unlock_irq(&mdev->req_lock);
1791
1792                                 /* we could probably send that P_DISCARD_ACK ourselves,
1793                                  * but I don't like the receiver using the msock */
1794
1795                                 put_ldev(mdev);
1796                                 wake_asender(mdev);
1797                                 finish_wait(&mdev->misc_wait, &wait);
1798                                 return true;
1799                         }
1800
1801                         if (signal_pending(current)) {
1802                                 hlist_del_init(&e->collision);
1803
1804                                 spin_unlock_irq(&mdev->req_lock);
1805
1806                                 finish_wait(&mdev->misc_wait, &wait);
1807                                 goto out_interrupted;
1808                         }
1809
1810                         spin_unlock_irq(&mdev->req_lock);
1811                         if (first) {
1812                                 first = 0;
1813                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1814                                      "sec=%llus\n", (unsigned long long)sector);
1815                         } else if (discard) {
1816                                 /* we had none on the first iteration.
1817                                  * there must be none now. */
1818                                 D_ASSERT(have_unacked == 0);
1819                         }
1820                         schedule();
1821                         spin_lock_irq(&mdev->req_lock);
1822                 }
1823                 finish_wait(&mdev->misc_wait, &wait);
1824         }
1825
1826         list_add(&e->w.list, &mdev->active_ee);
1827         spin_unlock_irq(&mdev->req_lock);
1828
1829         switch (mdev->net_conf->wire_protocol) {
1830         case DRBD_PROT_C:
1831                 inc_unacked(mdev);
1832                 /* corresponding dec_unacked() in e_end_block()
1833                  * respective _drbd_clear_done_ee */
1834                 break;
1835         case DRBD_PROT_B:
1836                 /* I really don't like it that the receiver thread
1837                  * sends on the msock, but anyways */
1838                 drbd_send_ack(mdev, P_RECV_ACK, e);
1839                 break;
1840         case DRBD_PROT_A:
1841                 /* nothing to do */
1842                 break;
1843         }
1844
1845         if (mdev->state.pdsk < D_INCONSISTENT) {
1846                 /* In case we have the only disk of the cluster, */
1847                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1848                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1849                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1850                 drbd_al_begin_io(mdev, e->sector);
1851         }
1852
1853         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1854                 return true;
1855
1856         /* don't care for the reason here */
1857         dev_err(DEV, "submit failed, triggering re-connect\n");
1858         spin_lock_irq(&mdev->req_lock);
1859         list_del(&e->w.list);
1860         hlist_del_init(&e->collision);
1861         spin_unlock_irq(&mdev->req_lock);
1862         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1863                 drbd_al_complete_io(mdev, e->sector);
1864
1865 out_interrupted:
1866         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1867         put_ldev(mdev);
1868         drbd_free_ee(mdev, e);
1869         return false;
1870 }
1871
1872 /* We may throttle resync, if the lower device seems to be busy,
1873  * and current sync rate is above c_min_rate.
1874  *
1875  * To decide whether or not the lower device is busy, we use a scheme similar
1876  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1877  * (more than 64 sectors) of activity we cannot account for with our own resync
1878  * activity, it obviously is "busy".
1879  *
1880  * The current sync rate used here uses only the most recent two step marks,
1881  * to have a short time average so we can react faster.
1882  */
1883 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1884 {
1885         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1886         unsigned long db, dt, dbdt;
1887         struct lc_element *tmp;
1888         int curr_events;
1889         int throttle = 0;
1890
1891         /* feature disabled? */
1892         if (mdev->sync_conf.c_min_rate == 0)
1893                 return 0;
1894
1895         spin_lock_irq(&mdev->al_lock);
1896         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1897         if (tmp) {
1898                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1899                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1900                         spin_unlock_irq(&mdev->al_lock);
1901                         return 0;
1902                 }
1903                 /* Do not slow down if app IO is already waiting for this extent */
1904         }
1905         spin_unlock_irq(&mdev->al_lock);
1906
1907         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1908                       (int)part_stat_read(&disk->part0, sectors[1]) -
1909                         atomic_read(&mdev->rs_sect_ev);
1910
1911         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1912                 unsigned long rs_left;
1913                 int i;
1914
1915                 mdev->rs_last_events = curr_events;
1916
1917                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1918                  * approx. */
1919                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1920
1921                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1922                         rs_left = mdev->ov_left;
1923                 else
1924                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1925
1926                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1927                 if (!dt)
1928                         dt++;
1929                 db = mdev->rs_mark_left[i] - rs_left;
1930                 dbdt = Bit2KB(db/dt);
1931
1932                 if (dbdt > mdev->sync_conf.c_min_rate)
1933                         throttle = 1;
1934         }
1935         return throttle;
1936 }
1937
1938
1939 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1940 {
1941         sector_t sector;
1942         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1943         struct drbd_epoch_entry *e;
1944         struct digest_info *di = NULL;
1945         int size, verb;
1946         unsigned int fault_type;
1947         struct p_block_req *p = &mdev->data.rbuf.block_req;
1948
1949         sector = be64_to_cpu(p->sector);
1950         size   = be32_to_cpu(p->blksize);
1951
1952         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1953                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1954                                 (unsigned long long)sector, size);
1955                 return false;
1956         }
1957         if (sector + (size>>9) > capacity) {
1958                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1959                                 (unsigned long long)sector, size);
1960                 return false;
1961         }
1962
1963         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1964                 verb = 1;
1965                 switch (cmd) {
1966                 case P_DATA_REQUEST:
1967                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1968                         break;
1969                 case P_RS_DATA_REQUEST:
1970                 case P_CSUM_RS_REQUEST:
1971                 case P_OV_REQUEST:
1972                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1973                         break;
1974                 case P_OV_REPLY:
1975                         verb = 0;
1976                         dec_rs_pending(mdev);
1977                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1978                         break;
1979                 default:
1980                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1981                                 cmdname(cmd));
1982                 }
1983                 if (verb && __ratelimit(&drbd_ratelimit_state))
1984                         dev_err(DEV, "Can not satisfy peer's read request, "
1985                             "no local data.\n");
1986
1987                 /* drain possibly payload */
1988                 return drbd_drain_block(mdev, digest_size);
1989         }
1990
1991         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1992          * "criss-cross" setup, that might cause write-out on some other DRBD,
1993          * which in turn might block on the other node at this very place.  */
1994         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1995         if (!e) {
1996                 put_ldev(mdev);
1997                 return false;
1998         }
1999
2000         switch (cmd) {
2001         case P_DATA_REQUEST:
2002                 e->w.cb = w_e_end_data_req;
2003                 fault_type = DRBD_FAULT_DT_RD;
2004                 /* application IO, don't drbd_rs_begin_io */
2005                 goto submit;
2006
2007         case P_RS_DATA_REQUEST:
2008                 e->w.cb = w_e_end_rsdata_req;
2009                 fault_type = DRBD_FAULT_RS_RD;
2010                 /* used in the sector offset progress display */
2011                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2012                 break;
2013
2014         case P_OV_REPLY:
2015         case P_CSUM_RS_REQUEST:
2016                 fault_type = DRBD_FAULT_RS_RD;
2017                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2018                 if (!di)
2019                         goto out_free_e;
2020
2021                 di->digest_size = digest_size;
2022                 di->digest = (((char *)di)+sizeof(struct digest_info));
2023
2024                 e->digest = di;
2025                 e->flags |= EE_HAS_DIGEST;
2026
2027                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2028                         goto out_free_e;
2029
2030                 if (cmd == P_CSUM_RS_REQUEST) {
2031                         D_ASSERT(mdev->agreed_pro_version >= 89);
2032                         e->w.cb = w_e_end_csum_rs_req;
2033                         /* used in the sector offset progress display */
2034                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2035                 } else if (cmd == P_OV_REPLY) {
2036                         /* track progress, we may need to throttle */
2037                         atomic_add(size >> 9, &mdev->rs_sect_in);
2038                         e->w.cb = w_e_end_ov_reply;
2039                         dec_rs_pending(mdev);
2040                         /* drbd_rs_begin_io done when we sent this request,
2041                          * but accounting still needs to be done. */
2042                         goto submit_for_resync;
2043                 }
2044                 break;
2045
2046         case P_OV_REQUEST:
2047                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2048                     mdev->agreed_pro_version >= 90) {
2049                         unsigned long now = jiffies;
2050                         int i;
2051                         mdev->ov_start_sector = sector;
2052                         mdev->ov_position = sector;
2053                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2054                         mdev->rs_total = mdev->ov_left;
2055                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2056                                 mdev->rs_mark_left[i] = mdev->ov_left;
2057                                 mdev->rs_mark_time[i] = now;
2058                         }
2059                         dev_info(DEV, "Online Verify start sector: %llu\n",
2060                                         (unsigned long long)sector);
2061                 }
2062                 e->w.cb = w_e_end_ov_req;
2063                 fault_type = DRBD_FAULT_RS_RD;
2064                 break;
2065
2066         default:
2067                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2068                     cmdname(cmd));
2069                 fault_type = DRBD_FAULT_MAX;
2070                 goto out_free_e;
2071         }
2072
2073         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2074          * wrt the receiver, but it is not as straightforward as it may seem.
2075          * Various places in the resync start and stop logic assume resync
2076          * requests are processed in order, requeuing this on the worker thread
2077          * introduces a bunch of new code for synchronization between threads.
2078          *
2079          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2080          * "forever", throttling after drbd_rs_begin_io will lock that extent
2081          * for application writes for the same time.  For now, just throttle
2082          * here, where the rest of the code expects the receiver to sleep for
2083          * a while, anyways.
2084          */
2085
2086         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2087          * this defers syncer requests for some time, before letting at least
2088          * on request through.  The resync controller on the receiving side
2089          * will adapt to the incoming rate accordingly.
2090          *
2091          * We cannot throttle here if remote is Primary/SyncTarget:
2092          * we would also throttle its application reads.
2093          * In that case, throttling is done on the SyncTarget only.
2094          */
2095         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2096                 schedule_timeout_uninterruptible(HZ/10);
2097         if (drbd_rs_begin_io(mdev, sector))
2098                 goto out_free_e;
2099
2100 submit_for_resync:
2101         atomic_add(size >> 9, &mdev->rs_sect_ev);
2102
2103 submit:
2104         inc_unacked(mdev);
2105         spin_lock_irq(&mdev->req_lock);
2106         list_add_tail(&e->w.list, &mdev->read_ee);
2107         spin_unlock_irq(&mdev->req_lock);
2108
2109         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2110                 return true;
2111
2112         /* don't care for the reason here */
2113         dev_err(DEV, "submit failed, triggering re-connect\n");
2114         spin_lock_irq(&mdev->req_lock);
2115         list_del(&e->w.list);
2116         spin_unlock_irq(&mdev->req_lock);
2117         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2118
2119 out_free_e:
2120         put_ldev(mdev);
2121         drbd_free_ee(mdev, e);
2122         return false;
2123 }
2124
2125 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2126 {
2127         int self, peer, rv = -100;
2128         unsigned long ch_self, ch_peer;
2129
2130         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2131         peer = mdev->p_uuid[UI_BITMAP] & 1;
2132
2133         ch_peer = mdev->p_uuid[UI_SIZE];
2134         ch_self = mdev->comm_bm_set;
2135
2136         switch (mdev->net_conf->after_sb_0p) {
2137         case ASB_CONSENSUS:
2138         case ASB_DISCARD_SECONDARY:
2139         case ASB_CALL_HELPER:
2140                 dev_err(DEV, "Configuration error.\n");
2141                 break;
2142         case ASB_DISCONNECT:
2143                 break;
2144         case ASB_DISCARD_YOUNGER_PRI:
2145                 if (self == 0 && peer == 1) {
2146                         rv = -1;
2147                         break;
2148                 }
2149                 if (self == 1 && peer == 0) {
2150                         rv =  1;
2151                         break;
2152                 }
2153                 /* Else fall through to one of the other strategies... */
2154         case ASB_DISCARD_OLDER_PRI:
2155                 if (self == 0 && peer == 1) {
2156                         rv = 1;
2157                         break;
2158                 }
2159                 if (self == 1 && peer == 0) {
2160                         rv = -1;
2161                         break;
2162                 }
2163                 /* Else fall through to one of the other strategies... */
2164                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2165                      "Using discard-least-changes instead\n");
2166         case ASB_DISCARD_ZERO_CHG:
2167                 if (ch_peer == 0 && ch_self == 0) {
2168                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2169                                 ? -1 : 1;
2170                         break;
2171                 } else {
2172                         if (ch_peer == 0) { rv =  1; break; }
2173                         if (ch_self == 0) { rv = -1; break; }
2174                 }
2175                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2176                         break;
2177         case ASB_DISCARD_LEAST_CHG:
2178                 if      (ch_self < ch_peer)
2179                         rv = -1;
2180                 else if (ch_self > ch_peer)
2181                         rv =  1;
2182                 else /* ( ch_self == ch_peer ) */
2183                      /* Well, then use something else. */
2184                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2185                                 ? -1 : 1;
2186                 break;
2187         case ASB_DISCARD_LOCAL:
2188                 rv = -1;
2189                 break;
2190         case ASB_DISCARD_REMOTE:
2191                 rv =  1;
2192         }
2193
2194         return rv;
2195 }
2196
2197 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2198 {
2199         int hg, rv = -100;
2200
2201         switch (mdev->net_conf->after_sb_1p) {
2202         case ASB_DISCARD_YOUNGER_PRI:
2203         case ASB_DISCARD_OLDER_PRI:
2204         case ASB_DISCARD_LEAST_CHG:
2205         case ASB_DISCARD_LOCAL:
2206         case ASB_DISCARD_REMOTE:
2207                 dev_err(DEV, "Configuration error.\n");
2208                 break;
2209         case ASB_DISCONNECT:
2210                 break;
2211         case ASB_CONSENSUS:
2212                 hg = drbd_asb_recover_0p(mdev);
2213                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2214                         rv = hg;
2215                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2216                         rv = hg;
2217                 break;
2218         case ASB_VIOLENTLY:
2219                 rv = drbd_asb_recover_0p(mdev);
2220                 break;
2221         case ASB_DISCARD_SECONDARY:
2222                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2223         case ASB_CALL_HELPER:
2224                 hg = drbd_asb_recover_0p(mdev);
2225                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2226                         enum drbd_state_rv rv2;
2227
2228                         drbd_set_role(mdev, R_SECONDARY, 0);
2229                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2230                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2231                           * we do not need to wait for the after state change work either. */
2232                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2233                         if (rv2 != SS_SUCCESS) {
2234                                 drbd_khelper(mdev, "pri-lost-after-sb");
2235                         } else {
2236                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2237                                 rv = hg;
2238                         }
2239                 } else
2240                         rv = hg;
2241         }
2242
2243         return rv;
2244 }
2245
2246 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2247 {
2248         int hg, rv = -100;
2249
2250         switch (mdev->net_conf->after_sb_2p) {
2251         case ASB_DISCARD_YOUNGER_PRI:
2252         case ASB_DISCARD_OLDER_PRI:
2253         case ASB_DISCARD_LEAST_CHG:
2254         case ASB_DISCARD_LOCAL:
2255         case ASB_DISCARD_REMOTE:
2256         case ASB_CONSENSUS:
2257         case ASB_DISCARD_SECONDARY:
2258                 dev_err(DEV, "Configuration error.\n");
2259                 break;
2260         case ASB_VIOLENTLY:
2261                 rv = drbd_asb_recover_0p(mdev);
2262                 break;
2263         case ASB_DISCONNECT:
2264                 break;
2265         case ASB_CALL_HELPER:
2266                 hg = drbd_asb_recover_0p(mdev);
2267                 if (hg == -1) {
2268                         enum drbd_state_rv rv2;
2269
2270                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2271                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2272                           * we do not need to wait for the after state change work either. */
2273                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2274                         if (rv2 != SS_SUCCESS) {
2275                                 drbd_khelper(mdev, "pri-lost-after-sb");
2276                         } else {
2277                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2278                                 rv = hg;
2279                         }
2280                 } else
2281                         rv = hg;
2282         }
2283
2284         return rv;
2285 }
2286
2287 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2288                            u64 bits, u64 flags)
2289 {
2290         if (!uuid) {
2291                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2292                 return;
2293         }
2294         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2295              text,
2296              (unsigned long long)uuid[UI_CURRENT],
2297              (unsigned long long)uuid[UI_BITMAP],
2298              (unsigned long long)uuid[UI_HISTORY_START],
2299              (unsigned long long)uuid[UI_HISTORY_END],
2300              (unsigned long long)bits,
2301              (unsigned long long)flags);
2302 }
2303
2304 /*
2305   100   after split brain try auto recover
2306     2   C_SYNC_SOURCE set BitMap
2307     1   C_SYNC_SOURCE use BitMap
2308     0   no Sync
2309    -1   C_SYNC_TARGET use BitMap
2310    -2   C_SYNC_TARGET set BitMap
2311  -100   after split brain, disconnect
2312 -1000   unrelated data
2313 -1091   requires proto 91
2314 -1096   requires proto 96
2315  */
2316 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2317 {
2318         u64 self, peer;
2319         int i, j;
2320
2321         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2322         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2323
2324         *rule_nr = 10;
2325         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2326                 return 0;
2327
2328         *rule_nr = 20;
2329         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2330              peer != UUID_JUST_CREATED)
2331                 return -2;
2332
2333         *rule_nr = 30;
2334         if (self != UUID_JUST_CREATED &&
2335             (peer == UUID_JUST_CREATED || peer == (u64)0))
2336                 return 2;
2337
2338         if (self == peer) {
2339                 int rct, dc; /* roles at crash time */
2340
2341                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2342
2343                         if (mdev->agreed_pro_version < 91)
2344                                 return -1091;
2345
2346                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2347                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2348                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2349                                 drbd_uuid_set_bm(mdev, 0UL);
2350
2351                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2352                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2353                                 *rule_nr = 34;
2354                         } else {
2355                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2356                                 *rule_nr = 36;
2357                         }
2358
2359                         return 1;
2360                 }
2361
2362                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2363
2364                         if (mdev->agreed_pro_version < 91)
2365                                 return -1091;
2366
2367                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2368                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2369                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2370
2371                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2372                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2373                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2374
2375                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2376                                 *rule_nr = 35;
2377                         } else {
2378                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2379                                 *rule_nr = 37;
2380                         }
2381
2382                         return -1;
2383                 }
2384
2385                 /* Common power [off|failure] */
2386                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2387                         (mdev->p_uuid[UI_FLAGS] & 2);
2388                 /* lowest bit is set when we were primary,
2389                  * next bit (weight 2) is set when peer was primary */
2390                 *rule_nr = 40;
2391
2392                 switch (rct) {
2393                 case 0: /* !self_pri && !peer_pri */ return 0;
2394                 case 1: /*  self_pri && !peer_pri */ return 1;
2395                 case 2: /* !self_pri &&  peer_pri */ return -1;
2396                 case 3: /*  self_pri &&  peer_pri */
2397                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2398                         return dc ? -1 : 1;
2399                 }
2400         }
2401
2402         *rule_nr = 50;
2403         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2404         if (self == peer)
2405                 return -1;
2406
2407         *rule_nr = 51;
2408         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2409         if (self == peer) {
2410                 if (mdev->agreed_pro_version < 96 ?
2411                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2412                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2413                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2414                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2415                            resync as sync source modifications of the peer's UUIDs. */
2416
2417                         if (mdev->agreed_pro_version < 91)
2418                                 return -1091;
2419
2420                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2421                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2422
2423                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2424                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2425
2426                         return -1;
2427                 }
2428         }
2429
2430         *rule_nr = 60;
2431         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2432         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2433                 peer = mdev->p_uuid[i] & ~((u64)1);
2434                 if (self == peer)
2435                         return -2;
2436         }
2437
2438         *rule_nr = 70;
2439         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2440         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2441         if (self == peer)
2442                 return 1;
2443
2444         *rule_nr = 71;
2445         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2446         if (self == peer) {
2447                 if (mdev->agreed_pro_version < 96 ?
2448                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2449                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2450                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2451                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2452                            resync as sync source modifications of our UUIDs. */
2453
2454                         if (mdev->agreed_pro_version < 91)
2455                                 return -1091;
2456
2457                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2458                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2459
2460                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2461                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2462                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2463
2464                         return 1;
2465                 }
2466         }
2467
2468
2469         *rule_nr = 80;
2470         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2471         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2473                 if (self == peer)
2474                         return 2;
2475         }
2476
2477         *rule_nr = 90;
2478         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2480         if (self == peer && self != ((u64)0))
2481                 return 100;
2482
2483         *rule_nr = 100;
2484         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2485                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2486                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2487                         peer = mdev->p_uuid[j] & ~((u64)1);
2488                         if (self == peer)
2489                                 return -100;
2490                 }
2491         }
2492
2493         return -1000;
2494 }
2495
2496 /* drbd_sync_handshake() returns the new conn state on success, or
2497    CONN_MASK (-1) on failure.
2498  */
2499 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2500                                            enum drbd_disk_state peer_disk) __must_hold(local)
2501 {
2502         int hg, rule_nr;
2503         enum drbd_conns rv = C_MASK;
2504         enum drbd_disk_state mydisk;
2505
2506         mydisk = mdev->state.disk;
2507         if (mydisk == D_NEGOTIATING)
2508                 mydisk = mdev->new_state_tmp.disk;
2509
2510         dev_info(DEV, "drbd_sync_handshake:\n");
2511         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2512         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2513                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2514
2515         hg = drbd_uuid_compare(mdev, &rule_nr);
2516
2517         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2518
2519         if (hg == -1000) {
2520                 dev_alert(DEV, "Unrelated data, aborting!\n");
2521                 return C_MASK;
2522         }
2523         if (hg < -1000) {
2524                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2525                 return C_MASK;
2526         }
2527
2528         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2529             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2530                 int f = (hg == -100) || abs(hg) == 2;
2531                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2532                 if (f)
2533                         hg = hg*2;
2534                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2535                      hg > 0 ? "source" : "target");
2536         }
2537
2538         if (abs(hg) == 100)
2539                 drbd_khelper(mdev, "initial-split-brain");
2540
2541         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2542                 int pcount = (mdev->state.role == R_PRIMARY)
2543                            + (peer_role == R_PRIMARY);
2544                 int forced = (hg == -100);
2545
2546                 switch (pcount) {
2547                 case 0:
2548                         hg = drbd_asb_recover_0p(mdev);
2549                         break;
2550                 case 1:
2551                         hg = drbd_asb_recover_1p(mdev);
2552                         break;
2553                 case 2:
2554                         hg = drbd_asb_recover_2p(mdev);
2555                         break;
2556                 }
2557                 if (abs(hg) < 100) {
2558                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2559                              "automatically solved. Sync from %s node\n",
2560                              pcount, (hg < 0) ? "peer" : "this");
2561                         if (forced) {
2562                                 dev_warn(DEV, "Doing a full sync, since"
2563                                      " UUIDs where ambiguous.\n");
2564                                 hg = hg*2;
2565                         }
2566                 }
2567         }
2568
2569         if (hg == -100) {
2570                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2571                         hg = -1;
2572                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2573                         hg = 1;
2574
2575                 if (abs(hg) < 100)
2576                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2577                              "Sync from %s node\n",
2578                              (hg < 0) ? "peer" : "this");
2579         }
2580
2581         if (hg == -100) {
2582                 /* FIXME this log message is not correct if we end up here
2583                  * after an attempted attach on a diskless node.
2584                  * We just refuse to attach -- well, we drop the "connection"
2585                  * to that disk, in a way... */
2586                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2587                 drbd_khelper(mdev, "split-brain");
2588                 return C_MASK;
2589         }
2590
2591         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2592                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2593                 return C_MASK;
2594         }
2595
2596         if (hg < 0 && /* by intention we do not use mydisk here. */
2597             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2598                 switch (mdev->net_conf->rr_conflict) {
2599                 case ASB_CALL_HELPER:
2600                         drbd_khelper(mdev, "pri-lost");
2601                         /* fall through */
2602                 case ASB_DISCONNECT:
2603                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2604                         return C_MASK;
2605                 case ASB_VIOLENTLY:
2606                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2607                              "assumption\n");
2608                 }
2609         }
2610
2611         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2612                 if (hg == 0)
2613                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2614                 else
2615                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2616                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2617                                  abs(hg) >= 2 ? "full" : "bit-map based");
2618                 return C_MASK;
2619         }
2620
2621         if (abs(hg) >= 2) {
2622                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2623                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2624                                         BM_LOCKED_SET_ALLOWED))
2625                         return C_MASK;
2626         }
2627
2628         if (hg > 0) { /* become sync source. */
2629                 rv = C_WF_BITMAP_S;
2630         } else if (hg < 0) { /* become sync target */
2631                 rv = C_WF_BITMAP_T;
2632         } else {
2633                 rv = C_CONNECTED;
2634                 if (drbd_bm_total_weight(mdev)) {
2635                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2636                              drbd_bm_total_weight(mdev));
2637                 }
2638         }
2639
2640         return rv;
2641 }
2642
2643 /* returns 1 if invalid */
2644 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2645 {
2646         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2647         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2648             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2649                 return 0;
2650
2651         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2652         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2653             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2654                 return 1;
2655
2656         /* everything else is valid if they are equal on both sides. */
2657         if (peer == self)
2658                 return 0;
2659
2660         /* everything es is invalid. */
2661         return 1;
2662 }
2663
2664 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2665 {
2666         struct p_protocol *p = &mdev->data.rbuf.protocol;
2667         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2668         int p_want_lose, p_two_primaries, cf;
2669         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2670
2671         p_proto         = be32_to_cpu(p->protocol);
2672         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2673         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2674         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2675         p_two_primaries = be32_to_cpu(p->two_primaries);
2676         cf              = be32_to_cpu(p->conn_flags);
2677         p_want_lose = cf & CF_WANT_LOSE;
2678
2679         clear_bit(CONN_DRY_RUN, &mdev->flags);
2680
2681         if (cf & CF_DRY_RUN)
2682                 set_bit(CONN_DRY_RUN, &mdev->flags);
2683
2684         if (p_proto != mdev->net_conf->wire_protocol) {
2685                 dev_err(DEV, "incompatible communication protocols\n");
2686                 goto disconnect;
2687         }
2688
2689         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2690                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2691                 goto disconnect;
2692         }
2693
2694         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2695                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2696                 goto disconnect;
2697         }
2698
2699         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2700                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2701                 goto disconnect;
2702         }
2703
2704         if (p_want_lose && mdev->net_conf->want_lose) {
2705                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2706                 goto disconnect;
2707         }
2708
2709         if (p_two_primaries != mdev->net_conf->two_primaries) {
2710                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2711                 goto disconnect;
2712         }
2713
2714         if (mdev->agreed_pro_version >= 87) {
2715                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2716
2717                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2718                         return false;
2719
2720                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2721                 if (strcmp(p_integrity_alg, my_alg)) {
2722                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2723                         goto disconnect;
2724                 }
2725                 dev_info(DEV, "data-integrity-alg: %s\n",
2726                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2727         }
2728
2729         return true;
2730
2731 disconnect:
2732         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2733         return false;
2734 }
2735
2736 /* helper function
2737  * input: alg name, feature name
2738  * return: NULL (alg name was "")
2739  *         ERR_PTR(error) if something goes wrong
2740  *         or the crypto hash ptr, if it worked out ok. */
2741 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2742                 const char *alg, const char *name)
2743 {
2744         struct crypto_hash *tfm;
2745
2746         if (!alg[0])
2747                 return NULL;
2748
2749         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2750         if (IS_ERR(tfm)) {
2751                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2752                         alg, name, PTR_ERR(tfm));
2753                 return tfm;
2754         }
2755         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2756                 crypto_free_hash(tfm);
2757                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2758                 return ERR_PTR(-EINVAL);
2759         }
2760         return tfm;
2761 }
2762
2763 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2764 {
2765         int ok = true;
2766         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2767         unsigned int header_size, data_size, exp_max_sz;
2768         struct crypto_hash *verify_tfm = NULL;
2769         struct crypto_hash *csums_tfm = NULL;
2770         const int apv = mdev->agreed_pro_version;
2771         int *rs_plan_s = NULL;
2772         int fifo_size = 0;
2773
2774         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2775                     : apv == 88 ? sizeof(struct p_rs_param)
2776                                         + SHARED_SECRET_MAX
2777                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2778                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2779
2780         if (packet_size > exp_max_sz) {
2781                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2782                     packet_size, exp_max_sz);
2783                 return false;
2784         }
2785
2786         if (apv <= 88) {
2787                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2788                 data_size   = packet_size  - header_size;
2789         } else if (apv <= 94) {
2790                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2791                 data_size   = packet_size  - header_size;
2792                 D_ASSERT(data_size == 0);
2793         } else {
2794                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2795                 data_size   = packet_size  - header_size;
2796                 D_ASSERT(data_size == 0);
2797         }
2798
2799         /* initialize verify_alg and csums_alg */
2800         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2801
2802         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2803                 return false;
2804
2805         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2806
2807         if (apv >= 88) {
2808                 if (apv == 88) {
2809                         if (data_size > SHARED_SECRET_MAX) {
2810                                 dev_err(DEV, "verify-alg too long, "
2811                                     "peer wants %u, accepting only %u byte\n",
2812                                                 data_size, SHARED_SECRET_MAX);
2813                                 return false;
2814                         }
2815
2816                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2817                                 return false;
2818
2819                         /* we expect NUL terminated string */
2820                         /* but just in case someone tries to be evil */
2821                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2822                         p->verify_alg[data_size-1] = 0;
2823
2824                 } else /* apv >= 89 */ {
2825                         /* we still expect NUL terminated strings */
2826                         /* but just in case someone tries to be evil */
2827                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2828                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2829                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2830                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2831                 }
2832
2833                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2834                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2835                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2836                                     mdev->sync_conf.verify_alg, p->verify_alg);
2837                                 goto disconnect;
2838                         }
2839                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2840                                         p->verify_alg, "verify-alg");
2841                         if (IS_ERR(verify_tfm)) {
2842                                 verify_tfm = NULL;
2843                                 goto disconnect;
2844                         }
2845                 }
2846
2847                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2848                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2849                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2850                                     mdev->sync_conf.csums_alg, p->csums_alg);
2851                                 goto disconnect;
2852                         }
2853                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2854                                         p->csums_alg, "csums-alg");
2855                         if (IS_ERR(csums_tfm)) {
2856                                 csums_tfm = NULL;
2857                                 goto disconnect;
2858                         }
2859                 }
2860
2861                 if (apv > 94) {
2862                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2863                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2864                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2865                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2866                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2867
2868                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2869                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2870                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2871                                 if (!rs_plan_s) {
2872                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2873                                         goto disconnect;
2874                                 }
2875                         }
2876                 }
2877
2878                 spin_lock(&mdev->peer_seq_lock);
2879                 /* lock against drbd_nl_syncer_conf() */
2880                 if (verify_tfm) {
2881                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2882                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2883                         crypto_free_hash(mdev->verify_tfm);
2884                         mdev->verify_tfm = verify_tfm;
2885                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2886                 }
2887                 if (csums_tfm) {
2888                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2889                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2890                         crypto_free_hash(mdev->csums_tfm);
2891                         mdev->csums_tfm = csums_tfm;
2892                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2893                 }
2894                 if (fifo_size != mdev->rs_plan_s.size) {
2895                         kfree(mdev->rs_plan_s.values);
2896                         mdev->rs_plan_s.values = rs_plan_s;
2897                         mdev->rs_plan_s.size   = fifo_size;
2898                         mdev->rs_planed = 0;
2899                 }
2900                 spin_unlock(&mdev->peer_seq_lock);
2901         }
2902
2903         return ok;
2904 disconnect:
2905         /* just for completeness: actually not needed,
2906          * as this is not reached if csums_tfm was ok. */
2907         crypto_free_hash(csums_tfm);
2908         /* but free the verify_tfm again, if csums_tfm did not work out */
2909         crypto_free_hash(verify_tfm);
2910         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2911         return false;
2912 }
2913
2914 /* warn if the arguments differ by more than 12.5% */
2915 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2916         const char *s, sector_t a, sector_t b)
2917 {
2918         sector_t d;
2919         if (a == 0 || b == 0)
2920                 return;
2921         d = (a > b) ? (a - b) : (b - a);
2922         if (d > (a>>3) || d > (b>>3))
2923                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2924                      (unsigned long long)a, (unsigned long long)b);
2925 }
2926
2927 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2928 {
2929         struct p_sizes *p = &mdev->data.rbuf.sizes;
2930         enum determine_dev_size dd = unchanged;
2931         sector_t p_size, p_usize, my_usize;
2932         int ldsc = 0; /* local disk size changed */
2933         enum dds_flags ddsf;
2934
2935         p_size = be64_to_cpu(p->d_size);
2936         p_usize = be64_to_cpu(p->u_size);
2937
2938         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2939                 dev_err(DEV, "some backing storage is needed\n");
2940                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2941                 return false;
2942         }
2943
2944         /* just store the peer's disk size for now.
2945          * we still need to figure out whether we accept that. */
2946         mdev->p_size = p_size;
2947
2948         if (get_ldev(mdev)) {
2949                 warn_if_differ_considerably(mdev, "lower level device sizes",
2950                            p_size, drbd_get_max_capacity(mdev->ldev));
2951                 warn_if_differ_considerably(mdev, "user requested size",
2952                                             p_usize, mdev->ldev->dc.disk_size);
2953
2954                 /* if this is the first connect, or an otherwise expected
2955                  * param exchange, choose the minimum */
2956                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2957                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2958                                              p_usize);
2959
2960                 my_usize = mdev->ldev->dc.disk_size;
2961
2962                 if (mdev->ldev->dc.disk_size != p_usize) {
2963                         mdev->ldev->dc.disk_size = p_usize;
2964                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2965                              (unsigned long)mdev->ldev->dc.disk_size);
2966                 }
2967
2968                 /* Never shrink a device with usable data during connect.
2969                    But allow online shrinking if we are connected. */
2970                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2971                    drbd_get_capacity(mdev->this_bdev) &&
2972                    mdev->state.disk >= D_OUTDATED &&
2973                    mdev->state.conn < C_CONNECTED) {
2974                         dev_err(DEV, "The peer's disk size is too small!\n");
2975                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2976                         mdev->ldev->dc.disk_size = my_usize;
2977                         put_ldev(mdev);
2978                         return false;
2979                 }
2980                 put_ldev(mdev);
2981         }
2982
2983         ddsf = be16_to_cpu(p->dds_flags);
2984         if (get_ldev(mdev)) {
2985                 dd = drbd_determine_dev_size(mdev, ddsf);
2986                 put_ldev(mdev);
2987                 if (dd == dev_size_error)
2988                         return false;
2989                 drbd_md_sync(mdev);
2990         } else {
2991                 /* I am diskless, need to accept the peer's size. */
2992                 drbd_set_my_capacity(mdev, p_size);
2993         }
2994
2995         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
2996         drbd_reconsider_max_bio_size(mdev);
2997
2998         if (get_ldev(mdev)) {
2999                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3000                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3001                         ldsc = 1;
3002                 }
3003
3004                 put_ldev(mdev);
3005         }
3006
3007         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3008                 if (be64_to_cpu(p->c_size) !=
3009                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3010                         /* we have different sizes, probably peer
3011                          * needs to know my new size... */
3012                         drbd_send_sizes(mdev, 0, ddsf);
3013                 }
3014                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3015                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3016                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3017                             mdev->state.disk >= D_INCONSISTENT) {
3018                                 if (ddsf & DDSF_NO_RESYNC)
3019                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3020                                 else
3021                                         resync_after_online_grow(mdev);
3022                         } else
3023                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3024                 }
3025         }
3026
3027         return true;
3028 }
3029
3030 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3031 {
3032         struct p_uuids *p = &mdev->data.rbuf.uuids;
3033         u64 *p_uuid;
3034         int i, updated_uuids = 0;
3035
3036         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3037
3038         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3039                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3040
3041         kfree(mdev->p_uuid);
3042         mdev->p_uuid = p_uuid;
3043
3044         if (mdev->state.conn < C_CONNECTED &&
3045             mdev->state.disk < D_INCONSISTENT &&
3046             mdev->state.role == R_PRIMARY &&
3047             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3048                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3049                     (unsigned long long)mdev->ed_uuid);
3050                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3051                 return false;
3052         }
3053
3054         if (get_ldev(mdev)) {
3055                 int skip_initial_sync =
3056                         mdev->state.conn == C_CONNECTED &&
3057                         mdev->agreed_pro_version >= 90 &&
3058                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3059                         (p_uuid[UI_FLAGS] & 8);
3060                 if (skip_initial_sync) {
3061                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3062                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3063                                         "clear_n_write from receive_uuids",
3064                                         BM_LOCKED_TEST_ALLOWED);
3065                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3066                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3067                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3068                                         CS_VERBOSE, NULL);
3069                         drbd_md_sync(mdev);
3070                         updated_uuids = 1;
3071                 }
3072                 put_ldev(mdev);
3073         } else if (mdev->state.disk < D_INCONSISTENT &&
3074                    mdev->state.role == R_PRIMARY) {
3075                 /* I am a diskless primary, the peer just created a new current UUID
3076                    for me. */
3077                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3078         }
3079
3080         /* Before we test for the disk state, we should wait until an eventually
3081            ongoing cluster wide state change is finished. That is important if
3082            we are primary and are detaching from our disk. We need to see the
3083            new disk state... */
3084         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3085         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3086                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3087
3088         if (updated_uuids)
3089                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3090
3091         return true;
3092 }
3093
3094 /**
3095  * convert_state() - Converts the peer's view of the cluster state to our point of view
3096  * @ps:         The state as seen by the peer.
3097  */
3098 static union drbd_state convert_state(union drbd_state ps)
3099 {
3100         union drbd_state ms;
3101
3102         static enum drbd_conns c_tab[] = {
3103                 [C_CONNECTED] = C_CONNECTED,
3104
3105                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3106                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3107                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3108                 [C_VERIFY_S]       = C_VERIFY_T,
3109                 [C_MASK]   = C_MASK,
3110         };
3111
3112         ms.i = ps.i;
3113
3114         ms.conn = c_tab[ps.conn];
3115         ms.peer = ps.role;
3116         ms.role = ps.peer;
3117         ms.pdsk = ps.disk;
3118         ms.disk = ps.pdsk;
3119         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3120
3121         return ms;
3122 }
3123
3124 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3125 {
3126         struct p_req_state *p = &mdev->data.rbuf.req_state;
3127         union drbd_state mask, val;
3128         enum drbd_state_rv rv;
3129
3130         mask.i = be32_to_cpu(p->mask);
3131         val.i = be32_to_cpu(p->val);
3132
3133         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3134             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3135                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3136                 return true;
3137         }
3138
3139         mask = convert_state(mask);
3140         val = convert_state(val);
3141
3142         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3143
3144         drbd_send_sr_reply(mdev, rv);
3145         drbd_md_sync(mdev);
3146
3147         return true;
3148 }
3149
3150 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3151 {
3152         struct p_state *p = &mdev->data.rbuf.state;
3153         union drbd_state os, ns, peer_state;
3154         enum drbd_disk_state real_peer_disk;
3155         enum chg_state_flags cs_flags;
3156         int rv;
3157
3158         peer_state.i = be32_to_cpu(p->state);
3159
3160         real_peer_disk = peer_state.disk;
3161         if (peer_state.disk == D_NEGOTIATING) {
3162                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3163                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3164         }
3165
3166         spin_lock_irq(&mdev->req_lock);
3167  retry:
3168         os = ns = mdev->state;
3169         spin_unlock_irq(&mdev->req_lock);
3170
3171         /* peer says his disk is uptodate, while we think it is inconsistent,
3172          * and this happens while we think we have a sync going on. */
3173         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3174             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3175                 /* If we are (becoming) SyncSource, but peer is still in sync
3176                  * preparation, ignore its uptodate-ness to avoid flapping, it
3177                  * will change to inconsistent once the peer reaches active
3178                  * syncing states.
3179                  * It may have changed syncer-paused flags, however, so we
3180                  * cannot ignore this completely. */
3181                 if (peer_state.conn > C_CONNECTED &&
3182                     peer_state.conn < C_SYNC_SOURCE)
3183                         real_peer_disk = D_INCONSISTENT;
3184
3185                 /* if peer_state changes to connected at the same time,
3186                  * it explicitly notifies us that it finished resync.
3187                  * Maybe we should finish it up, too? */
3188                 else if (os.conn >= C_SYNC_SOURCE &&
3189                          peer_state.conn == C_CONNECTED) {
3190                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3191                                 drbd_resync_finished(mdev);
3192                         return true;
3193                 }
3194         }
3195
3196         /* peer says his disk is inconsistent, while we think it is uptodate,
3197          * and this happens while the peer still thinks we have a sync going on,
3198          * but we think we are already done with the sync.
3199          * We ignore this to avoid flapping pdsk.
3200          * This should not happen, if the peer is a recent version of drbd. */
3201         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3202             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3203                 real_peer_disk = D_UP_TO_DATE;
3204
3205         if (ns.conn == C_WF_REPORT_PARAMS)
3206                 ns.conn = C_CONNECTED;
3207
3208         if (peer_state.conn == C_AHEAD)
3209                 ns.conn = C_BEHIND;
3210
3211         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3212             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3213                 int cr; /* consider resync */
3214
3215                 /* if we established a new connection */
3216                 cr  = (os.conn < C_CONNECTED);
3217                 /* if we had an established connection
3218                  * and one of the nodes newly attaches a disk */
3219                 cr |= (os.conn == C_CONNECTED &&
3220                        (peer_state.disk == D_NEGOTIATING ||
3221                         os.disk == D_NEGOTIATING));
3222                 /* if we have both been inconsistent, and the peer has been
3223                  * forced to be UpToDate with --overwrite-data */
3224                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3225                 /* if we had been plain connected, and the admin requested to
3226                  * start a sync by "invalidate" or "invalidate-remote" */
3227                 cr |= (os.conn == C_CONNECTED &&
3228                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3229                                  peer_state.conn <= C_WF_BITMAP_T));
3230
3231                 if (cr)
3232                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3233
3234                 put_ldev(mdev);
3235                 if (ns.conn == C_MASK) {
3236                         ns.conn = C_CONNECTED;
3237                         if (mdev->state.disk == D_NEGOTIATING) {
3238                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3239                         } else if (peer_state.disk == D_NEGOTIATING) {
3240                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3241                                 peer_state.disk = D_DISKLESS;
3242                                 real_peer_disk = D_DISKLESS;
3243                         } else {
3244                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3245                                         return false;
3246                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3247                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3248                                 return false;
3249                         }
3250                 }
3251         }
3252
3253         spin_lock_irq(&mdev->req_lock);
3254         if (mdev->state.i != os.i)
3255                 goto retry;
3256         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3257         ns.peer = peer_state.role;
3258         ns.pdsk = real_peer_disk;
3259         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3260         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3261                 ns.disk = mdev->new_state_tmp.disk;
3262         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3263         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3264             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3265                 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3266                    for temporal network outages! */
3267                 spin_unlock_irq(&mdev->req_lock);
3268                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3269                 tl_clear(mdev);
3270                 drbd_uuid_new_current(mdev);
3271                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3272                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3273                 return false;
3274         }
3275         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3276         ns = mdev->state;
3277         spin_unlock_irq(&mdev->req_lock);
3278
3279         if (rv < SS_SUCCESS) {
3280                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3281                 return false;
3282         }
3283
3284         if (os.conn > C_WF_REPORT_PARAMS) {
3285                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3286                     peer_state.disk != D_NEGOTIATING ) {
3287                         /* we want resync, peer has not yet decided to sync... */
3288                         /* Nowadays only used when forcing a node into primary role and
3289                            setting its disk to UpToDate with that */
3290                         drbd_send_uuids(mdev);
3291                         drbd_send_state(mdev);
3292                 }
3293         }
3294
3295         mdev->net_conf->want_lose = 0;
3296
3297         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3298
3299         return true;
3300 }
3301
3302 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3303 {
3304         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3305
3306         wait_event(mdev->misc_wait,
3307                    mdev->state.conn == C_WF_SYNC_UUID ||
3308                    mdev->state.conn == C_BEHIND ||
3309                    mdev->state.conn < C_CONNECTED ||
3310                    mdev->state.disk < D_NEGOTIATING);
3311
3312         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3313
3314         /* Here the _drbd_uuid_ functions are right, current should
3315            _not_ be rotated into the history */
3316         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3317                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3318                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3319
3320                 drbd_print_uuids(mdev, "updated sync uuid");
3321                 drbd_start_resync(mdev, C_SYNC_TARGET);
3322
3323                 put_ldev(mdev);
3324         } else
3325                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3326
3327         return true;
3328 }
3329
3330 /**
3331  * receive_bitmap_plain
3332  *
3333  * Return 0 when done, 1 when another iteration is needed, and a negative error
3334  * code upon failure.
3335  */
3336 static int
3337 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3338                      unsigned long *buffer, struct bm_xfer_ctx *c)
3339 {
3340         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3341         unsigned want = num_words * sizeof(long);
3342         int err;
3343
3344         if (want != data_size) {
3345                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3346                 return -EIO;
3347         }
3348         if (want == 0)
3349                 return 0;
3350         err = drbd_recv(mdev, buffer, want);
3351         if (err != want) {
3352                 if (err >= 0)
3353                         err = -EIO;
3354                 return err;
3355         }
3356
3357         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3358
3359         c->word_offset += num_words;
3360         c->bit_offset = c->word_offset * BITS_PER_LONG;
3361         if (c->bit_offset > c->bm_bits)
3362                 c->bit_offset = c->bm_bits;
3363
3364         return 1;
3365 }
3366
3367 /**
3368  * recv_bm_rle_bits
3369  *
3370  * Return 0 when done, 1 when another iteration is needed, and a negative error
3371  * code upon failure.
3372  */
3373 static int
3374 recv_bm_rle_bits(struct drbd_conf *mdev,
3375                 struct p_compressed_bm *p,
3376                 struct bm_xfer_ctx *c)
3377 {
3378         struct bitstream bs;
3379         u64 look_ahead;
3380         u64 rl;
3381         u64 tmp;
3382         unsigned long s = c->bit_offset;
3383         unsigned long e;
3384         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3385         int toggle = DCBP_get_start(p);
3386         int have;
3387         int bits;
3388
3389         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3390
3391         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3392         if (bits < 0)
3393                 return -EIO;
3394
3395         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3396                 bits = vli_decode_bits(&rl, look_ahead);
3397                 if (bits <= 0)
3398                         return -EIO;
3399
3400                 if (toggle) {
3401                         e = s + rl -1;
3402                         if (e >= c->bm_bits) {
3403                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3404                                 return -EIO;
3405                         }
3406                         _drbd_bm_set_bits(mdev, s, e);
3407                 }
3408
3409                 if (have < bits) {
3410                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3411                                 have, bits, look_ahead,
3412                                 (unsigned int)(bs.cur.b - p->code),
3413                                 (unsigned int)bs.buf_len);
3414                         return -EIO;
3415                 }
3416                 look_ahead >>= bits;
3417                 have -= bits;
3418
3419                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3420                 if (bits < 0)
3421                         return -EIO;
3422                 look_ahead |= tmp << have;
3423                 have += bits;
3424         }
3425
3426         c->bit_offset = s;
3427         bm_xfer_ctx_bit_to_word_offset(c);
3428
3429         return (s != c->bm_bits);
3430 }
3431
3432 /**
3433  * decode_bitmap_c
3434  *
3435  * Return 0 when done, 1 when another iteration is needed, and a negative error
3436  * code upon failure.
3437  */
3438 static int
3439 decode_bitmap_c(struct drbd_conf *mdev,
3440                 struct p_compressed_bm *p,
3441                 struct bm_xfer_ctx *c)
3442 {
3443         if (DCBP_get_code(p) == RLE_VLI_Bits)
3444                 return recv_bm_rle_bits(mdev, p, c);
3445
3446         /* other variants had been implemented for evaluation,
3447          * but have been dropped as this one turned out to be "best"
3448          * during all our tests. */
3449
3450         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3451         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3452         return -EIO;
3453 }
3454
3455 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3456                 const char *direction, struct bm_xfer_ctx *c)
3457 {
3458         /* what would it take to transfer it "plaintext" */
3459         unsigned plain = sizeof(struct p_header80) *
3460                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3461                 + c->bm_words * sizeof(long);
3462         unsigned total = c->bytes[0] + c->bytes[1];
3463         unsigned r;
3464
3465         /* total can not be zero. but just in case: */
3466         if (total == 0)
3467                 return;
3468
3469         /* don't report if not compressed */
3470         if (total >= plain)
3471                 return;
3472
3473         /* total < plain. check for overflow, still */
3474         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3475                                     : (1000 * total / plain);
3476
3477         if (r > 1000)
3478                 r = 1000;
3479
3480         r = 1000 - r;
3481         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3482              "total %u; compression: %u.%u%%\n",
3483                         direction,
3484                         c->bytes[1], c->packets[1],
3485                         c->bytes[0], c->packets[0],
3486                         total, r/10, r % 10);
3487 }
3488
3489 /* Since we are processing the bitfield from lower addresses to higher,
3490    it does not matter if the process it in 32 bit chunks or 64 bit
3491    chunks as long as it is little endian. (Understand it as byte stream,
3492    beginning with the lowest byte...) If we would use big endian
3493    we would need to process it from the highest address to the lowest,
3494    in order to be agnostic to the 32 vs 64 bits issue.
3495
3496    returns 0 on failure, 1 if we successfully received it. */
3497 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3498 {
3499         struct bm_xfer_ctx c;
3500         void *buffer;
3501         int err;
3502         int ok = false;
3503         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3504
3505         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3506         /* you are supposed to send additional out-of-sync information
3507          * if you actually set bits during this phase */
3508
3509         /* maybe we should use some per thread scratch page,
3510          * and allocate that during initial device creation? */
3511         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3512         if (!buffer) {
3513                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3514                 goto out;
3515         }
3516
3517         c = (struct bm_xfer_ctx) {
3518                 .bm_bits = drbd_bm_bits(mdev),
3519                 .bm_words = drbd_bm_words(mdev),
3520         };
3521
3522         for(;;) {
3523                 if (cmd == P_BITMAP) {
3524                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3525                 } else if (cmd == P_COMPRESSED_BITMAP) {
3526                         /* MAYBE: sanity check that we speak proto >= 90,
3527                          * and the feature is enabled! */
3528                         struct p_compressed_bm *p;
3529
3530                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3531                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3532                                 goto out;
3533                         }
3534                         /* use the page buff */
3535                         p = buffer;
3536                         memcpy(p, h, sizeof(*h));
3537                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3538                                 goto out;
3539                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3540                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3541                                 goto out;
3542                         }
3543                         err = decode_bitmap_c(mdev, p, &c);
3544                 } else {
3545                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3546                         goto out;
3547                 }
3548
3549                 c.packets[cmd == P_BITMAP]++;
3550                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3551
3552                 if (err <= 0) {
3553                         if (err < 0)
3554                                 goto out;
3555                         break;
3556                 }
3557                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3558                         goto out;
3559         }
3560
3561         INFO_bm_xfer_stats(mdev, "receive", &c);
3562
3563         if (mdev->state.conn == C_WF_BITMAP_T) {
3564                 enum drbd_state_rv rv;
3565
3566                 ok = !drbd_send_bitmap(mdev);
3567                 if (!ok)
3568                         goto out;
3569                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3570                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3571                 D_ASSERT(rv == SS_SUCCESS);
3572         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3573                 /* admin may have requested C_DISCONNECTING,
3574                  * other threads may have noticed network errors */
3575                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3576                     drbd_conn_str(mdev->state.conn));
3577         }
3578
3579         ok = true;
3580  out:
3581         drbd_bm_unlock(mdev);
3582         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3583                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3584         free_page((unsigned long) buffer);
3585         return ok;
3586 }
3587
3588 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3589 {
3590         /* TODO zero copy sink :) */
3591         static char sink[128];
3592         int size, want, r;
3593
3594         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3595                  cmd, data_size);
3596
3597         size = data_size;
3598         while (size > 0) {
3599                 want = min_t(int, size, sizeof(sink));
3600                 r = drbd_recv(mdev, sink, want);
3601                 ERR_IF(r <= 0) break;
3602                 size -= r;
3603         }
3604         return size == 0;
3605 }
3606
3607 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3608 {
3609         /* Make sure we've acked all the TCP data associated
3610          * with the data requests being unplugged */
3611         drbd_tcp_quickack(mdev->data.socket);
3612
3613         return true;
3614 }
3615
3616 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3617 {
3618         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3619
3620         switch (mdev->state.conn) {
3621         case C_WF_SYNC_UUID:
3622         case C_WF_BITMAP_T:
3623         case C_BEHIND:
3624                         break;
3625         default:
3626                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3627                                 drbd_conn_str(mdev->state.conn));
3628         }
3629
3630         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3631
3632         return true;
3633 }
3634
3635 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3636
3637 struct data_cmd {
3638         int expect_payload;
3639         size_t pkt_size;
3640         drbd_cmd_handler_f function;
3641 };
3642
3643 static struct data_cmd drbd_cmd_handler[] = {
3644         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3645         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3646         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3647         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3648         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3649         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3650         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3651         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3652         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3653         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3654         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3655         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3656         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3657         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3658         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3659         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3660         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3661         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3662         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3663         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3664         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3665         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3666         /* anything missing from this table is in
3667          * the asender_tbl, see get_asender_cmd */
3668         [P_MAX_CMD]         = { 0, 0, NULL },
3669 };
3670
3671 /* All handler functions that expect a sub-header get that sub-heder in
3672    mdev->data.rbuf.header.head.payload.
3673
3674    Usually in mdev->data.rbuf.header.head the callback can find the usual
3675    p_header, but they may not rely on that. Since there is also p_header95 !
3676  */
3677
3678 static void drbdd(struct drbd_conf *mdev)
3679 {
3680         union p_header *header = &mdev->data.rbuf.header;
3681         unsigned int packet_size;
3682         enum drbd_packets cmd;
3683         size_t shs; /* sub header size */
3684         int rv;
3685
3686         while (get_t_state(&mdev->receiver) == Running) {
3687                 drbd_thread_current_set_cpu(mdev);
3688                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3689                         goto err_out;
3690
3691                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3692                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3693                         goto err_out;
3694                 }
3695
3696                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3697                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3698                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3699                         goto err_out;
3700                 }
3701
3702                 if (shs) {
3703                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3704                         if (unlikely(rv != shs)) {
3705                                 if (!signal_pending(current))
3706                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3707                                 goto err_out;
3708                         }
3709                 }
3710
3711                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3712
3713                 if (unlikely(!rv)) {
3714                         dev_err(DEV, "error receiving %s, l: %d!\n",
3715                             cmdname(cmd), packet_size);
3716                         goto err_out;
3717                 }
3718         }
3719
3720         if (0) {
3721         err_out:
3722                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3723         }
3724         /* If we leave here, we probably want to update at least the
3725          * "Connected" indicator on stable storage. Do so explicitly here. */
3726         drbd_md_sync(mdev);
3727 }
3728
3729 void drbd_flush_workqueue(struct drbd_conf *mdev)
3730 {
3731         struct drbd_wq_barrier barr;
3732
3733         barr.w.cb = w_prev_work_done;
3734         init_completion(&barr.done);
3735         drbd_queue_work(&mdev->data.work, &barr.w);
3736         wait_for_completion(&barr.done);
3737 }
3738
3739 void drbd_free_tl_hash(struct drbd_conf *mdev)
3740 {
3741         struct hlist_head *h;
3742
3743         spin_lock_irq(&mdev->req_lock);
3744
3745         if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3746                 spin_unlock_irq(&mdev->req_lock);
3747                 return;
3748         }
3749         /* paranoia code */
3750         for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3751                 if (h->first)
3752                         dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3753                                 (int)(h - mdev->ee_hash), h->first);
3754         kfree(mdev->ee_hash);
3755         mdev->ee_hash = NULL;
3756         mdev->ee_hash_s = 0;
3757
3758         /* paranoia code */
3759         for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3760                 if (h->first)
3761                         dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3762                                 (int)(h - mdev->tl_hash), h->first);
3763         kfree(mdev->tl_hash);
3764         mdev->tl_hash = NULL;
3765         mdev->tl_hash_s = 0;
3766         spin_unlock_irq(&mdev->req_lock);
3767 }
3768
3769 static void drbd_disconnect(struct drbd_conf *mdev)
3770 {
3771         enum drbd_fencing_p fp;
3772         union drbd_state os, ns;
3773         int rv = SS_UNKNOWN_ERROR;
3774         unsigned int i;
3775
3776         if (mdev->state.conn == C_STANDALONE)
3777                 return;
3778
3779         /* asender does not clean up anything. it must not interfere, either */
3780         drbd_thread_stop(&mdev->asender);
3781         drbd_free_sock(mdev);
3782
3783         /* wait for current activity to cease. */
3784         spin_lock_irq(&mdev->req_lock);
3785         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3786         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3787         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3788         spin_unlock_irq(&mdev->req_lock);
3789
3790         /* We do not have data structures that would allow us to
3791          * get the rs_pending_cnt down to 0 again.
3792          *  * On C_SYNC_TARGET we do not have any data structures describing
3793          *    the pending RSDataRequest's we have sent.
3794          *  * On C_SYNC_SOURCE there is no data structure that tracks
3795          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3796          *  And no, it is not the sum of the reference counts in the
3797          *  resync_LRU. The resync_LRU tracks the whole operation including
3798          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3799          *  on the fly. */
3800         drbd_rs_cancel_all(mdev);
3801         mdev->rs_total = 0;
3802         mdev->rs_failed = 0;
3803         atomic_set(&mdev->rs_pending_cnt, 0);
3804         wake_up(&mdev->misc_wait);
3805
3806         del_timer(&mdev->request_timer);
3807
3808         /* make sure syncer is stopped and w_resume_next_sg queued */
3809         del_timer_sync(&mdev->resync_timer);
3810         resync_timer_fn((unsigned long)mdev);
3811
3812         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3813          * w_make_resync_request etc. which may still be on the worker queue
3814          * to be "canceled" */
3815         drbd_flush_workqueue(mdev);
3816
3817         /* This also does reclaim_net_ee().  If we do this too early, we might
3818          * miss some resync ee and pages.*/
3819         drbd_process_done_ee(mdev);
3820
3821         kfree(mdev->p_uuid);
3822         mdev->p_uuid = NULL;
3823
3824         if (!is_susp(mdev->state))
3825                 tl_clear(mdev);
3826
3827         dev_info(DEV, "Connection closed\n");
3828
3829         drbd_md_sync(mdev);
3830
3831         fp = FP_DONT_CARE;
3832         if (get_ldev(mdev)) {
3833                 fp = mdev->ldev->dc.fencing;
3834                 put_ldev(mdev);
3835         }
3836
3837         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3838                 drbd_try_outdate_peer_async(mdev);
3839
3840         spin_lock_irq(&mdev->req_lock);
3841         os = mdev->state;
3842         if (os.conn >= C_UNCONNECTED) {
3843                 /* Do not restart in case we are C_DISCONNECTING */
3844                 ns = os;
3845                 ns.conn = C_UNCONNECTED;
3846                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3847         }
3848         spin_unlock_irq(&mdev->req_lock);
3849
3850         if (os.conn == C_DISCONNECTING) {
3851                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3852
3853                 crypto_free_hash(mdev->cram_hmac_tfm);
3854                 mdev->cram_hmac_tfm = NULL;
3855
3856                 kfree(mdev->net_conf);
3857                 mdev->net_conf = NULL;
3858                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3859         }
3860
3861         /* serialize with bitmap writeout triggered by the state change,
3862          * if any. */
3863         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3864
3865         /* tcp_close and release of sendpage pages can be deferred.  I don't
3866          * want to use SO_LINGER, because apparently it can be deferred for
3867          * more than 20 seconds (longest time I checked).
3868          *
3869          * Actually we don't care for exactly when the network stack does its
3870          * put_page(), but release our reference on these pages right here.
3871          */
3872         i = drbd_release_ee(mdev, &mdev->net_ee);
3873         if (i)
3874                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3875         i = atomic_read(&mdev->pp_in_use_by_net);
3876         if (i)
3877                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3878         i = atomic_read(&mdev->pp_in_use);
3879         if (i)
3880                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3881
3882         D_ASSERT(list_empty(&mdev->read_ee));
3883         D_ASSERT(list_empty(&mdev->active_ee));
3884         D_ASSERT(list_empty(&mdev->sync_ee));
3885         D_ASSERT(list_empty(&mdev->done_ee));
3886
3887         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3888         atomic_set(&mdev->current_epoch->epoch_size, 0);
3889         D_ASSERT(list_empty(&mdev->current_epoch->list));
3890 }
3891
3892 /*
3893  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3894  * we can agree on is stored in agreed_pro_version.
3895  *
3896  * feature flags and the reserved array should be enough room for future
3897  * enhancements of the handshake protocol, and possible plugins...
3898  *
3899  * for now, they are expected to be zero, but ignored.
3900  */
3901 static int drbd_send_handshake(struct drbd_conf *mdev)
3902 {
3903         /* ASSERT current == mdev->receiver ... */
3904         struct p_handshake *p = &mdev->data.sbuf.handshake;
3905         int ok;
3906
3907         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3908                 dev_err(DEV, "interrupted during initial handshake\n");
3909                 return 0; /* interrupted. not ok. */
3910         }
3911
3912         if (mdev->data.socket == NULL) {
3913                 mutex_unlock(&mdev->data.mutex);
3914                 return 0;
3915         }
3916
3917         memset(p, 0, sizeof(*p));
3918         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3919         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3920         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3921                              (struct p_header80 *)p, sizeof(*p), 0 );
3922         mutex_unlock(&mdev->data.mutex);
3923         return ok;
3924 }
3925
3926 /*
3927  * return values:
3928  *   1 yes, we have a valid connection
3929  *   0 oops, did not work out, please try again
3930  *  -1 peer talks different language,
3931  *     no point in trying again, please go standalone.
3932  */
3933 static int drbd_do_handshake(struct drbd_conf *mdev)
3934 {
3935         /* ASSERT current == mdev->receiver ... */
3936         struct p_handshake *p = &mdev->data.rbuf.handshake;
3937         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3938         unsigned int length;
3939         enum drbd_packets cmd;
3940         int rv;
3941
3942         rv = drbd_send_handshake(mdev);
3943         if (!rv)
3944                 return 0;
3945
3946         rv = drbd_recv_header(mdev, &cmd, &length);
3947         if (!rv)
3948                 return 0;
3949
3950         if (cmd != P_HAND_SHAKE) {
3951                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3952                      cmdname(cmd), cmd);
3953                 return -1;
3954         }
3955
3956         if (length != expect) {
3957                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3958                      expect, length);
3959                 return -1;
3960         }
3961
3962         rv = drbd_recv(mdev, &p->head.payload, expect);
3963
3964         if (rv != expect) {
3965                 if (!signal_pending(current))
3966                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3967                 return 0;
3968         }
3969
3970         p->protocol_min = be32_to_cpu(p->protocol_min);
3971         p->protocol_max = be32_to_cpu(p->protocol_max);
3972         if (p->protocol_max == 0)
3973                 p->protocol_max = p->protocol_min;
3974
3975         if (PRO_VERSION_MAX < p->protocol_min ||
3976             PRO_VERSION_MIN > p->protocol_max)
3977                 goto incompat;
3978
3979         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3980
3981         dev_info(DEV, "Handshake successful: "
3982              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3983
3984         return 1;
3985
3986  incompat:
3987         dev_err(DEV, "incompatible DRBD dialects: "
3988             "I support %d-%d, peer supports %d-%d\n",
3989             PRO_VERSION_MIN, PRO_VERSION_MAX,
3990             p->protocol_min, p->protocol_max);
3991         return -1;
3992 }
3993
3994 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3995 static int drbd_do_auth(struct drbd_conf *mdev)
3996 {
3997         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3998         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3999         return -1;
4000 }
4001 #else
4002 #define CHALLENGE_LEN 64
4003
4004 /* Return value:
4005         1 - auth succeeded,
4006         0 - failed, try again (network error),
4007         -1 - auth failed, don't try again.
4008 */
4009
4010 static int drbd_do_auth(struct drbd_conf *mdev)
4011 {
4012         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4013         struct scatterlist sg;
4014         char *response = NULL;
4015         char *right_response = NULL;
4016         char *peers_ch = NULL;
4017         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4018         unsigned int resp_size;
4019         struct hash_desc desc;
4020         enum drbd_packets cmd;
4021         unsigned int length;
4022         int rv;
4023
4024         desc.tfm = mdev->cram_hmac_tfm;
4025         desc.flags = 0;
4026
4027         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4028                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4029         if (rv) {
4030                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4031                 rv = -1;
4032                 goto fail;
4033         }
4034
4035         get_random_bytes(my_challenge, CHALLENGE_LEN);
4036
4037         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4038         if (!rv)
4039                 goto fail;
4040
4041         rv = drbd_recv_header(mdev, &cmd, &length);
4042         if (!rv)
4043                 goto fail;
4044
4045         if (cmd != P_AUTH_CHALLENGE) {
4046                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4047                     cmdname(cmd), cmd);
4048                 rv = 0;
4049                 goto fail;
4050         }
4051
4052         if (length > CHALLENGE_LEN * 2) {
4053                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4054                 rv = -1;
4055                 goto fail;
4056         }
4057
4058         peers_ch = kmalloc(length, GFP_NOIO);
4059         if (peers_ch == NULL) {
4060                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4061                 rv = -1;
4062                 goto fail;
4063         }
4064
4065         rv = drbd_recv(mdev, peers_ch, length);
4066
4067         if (rv != length) {
4068                 if (!signal_pending(current))
4069                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4070                 rv = 0;
4071                 goto fail;
4072         }
4073
4074         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4075         response = kmalloc(resp_size, GFP_NOIO);
4076         if (response == NULL) {
4077                 dev_err(DEV, "kmalloc of response failed\n");
4078                 rv = -1;
4079                 goto fail;
4080         }
4081
4082         sg_init_table(&sg, 1);
4083         sg_set_buf(&sg, peers_ch, length);
4084
4085         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4086         if (rv) {
4087                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4088                 rv = -1;
4089                 goto fail;
4090         }
4091
4092         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4093         if (!rv)
4094                 goto fail;
4095
4096         rv = drbd_recv_header(mdev, &cmd, &length);
4097         if (!rv)
4098                 goto fail;
4099
4100         if (cmd != P_AUTH_RESPONSE) {
4101                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4102                         cmdname(cmd), cmd);
4103                 rv = 0;
4104                 goto fail;
4105         }
4106
4107         if (length != resp_size) {
4108                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4109                 rv = 0;
4110                 goto fail;
4111         }
4112
4113         rv = drbd_recv(mdev, response , resp_size);
4114
4115         if (rv != resp_size) {
4116                 if (!signal_pending(current))
4117                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4118                 rv = 0;
4119                 goto fail;
4120         }
4121
4122         right_response = kmalloc(resp_size, GFP_NOIO);
4123         if (right_response == NULL) {
4124                 dev_err(DEV, "kmalloc of right_response failed\n");
4125                 rv = -1;
4126                 goto fail;
4127         }
4128
4129         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4130
4131         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4132         if (rv) {
4133                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4134                 rv = -1;
4135                 goto fail;
4136         }
4137
4138         rv = !memcmp(response, right_response, resp_size);
4139
4140         if (rv)
4141                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4142                      resp_size, mdev->net_conf->cram_hmac_alg);
4143         else
4144                 rv = -1;
4145
4146  fail:
4147         kfree(peers_ch);
4148         kfree(response);
4149         kfree(right_response);
4150
4151         return rv;
4152 }
4153 #endif
4154
4155 int drbdd_init(struct drbd_thread *thi)
4156 {
4157         struct drbd_conf *mdev = thi->mdev;
4158         unsigned int minor = mdev_to_minor(mdev);
4159         int h;
4160
4161         sprintf(current->comm, "drbd%d_receiver", minor);
4162
4163         dev_info(DEV, "receiver (re)started\n");
4164
4165         do {
4166                 h = drbd_connect(mdev);
4167                 if (h == 0) {
4168                         drbd_disconnect(mdev);
4169                         schedule_timeout_interruptible(HZ);
4170                 }
4171                 if (h == -1) {
4172                         dev_warn(DEV, "Discarding network configuration.\n");
4173                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4174                 }
4175         } while (h == 0);
4176
4177         if (h > 0) {
4178                 if (get_net_conf(mdev)) {
4179                         drbdd(mdev);
4180                         put_net_conf(mdev);
4181                 }
4182         }
4183
4184         drbd_disconnect(mdev);
4185
4186         dev_info(DEV, "receiver terminated\n");
4187         return 0;
4188 }
4189
4190 /* ********* acknowledge sender ******** */
4191
4192 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4193 {
4194         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4195
4196         int retcode = be32_to_cpu(p->retcode);
4197
4198         if (retcode >= SS_SUCCESS) {
4199                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4200         } else {
4201                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4202                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4203                     drbd_set_st_err_str(retcode), retcode);
4204         }
4205         wake_up(&mdev->state_wait);
4206
4207         return true;
4208 }
4209
4210 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4211 {
4212         return drbd_send_ping_ack(mdev);
4213
4214 }
4215
4216 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4217 {
4218         /* restore idle timeout */
4219         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4220         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4221                 wake_up(&mdev->misc_wait);
4222
4223         return true;
4224 }
4225
4226 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4227 {
4228         struct p_block_ack *p = (struct p_block_ack *)h;
4229         sector_t sector = be64_to_cpu(p->sector);
4230         int blksize = be32_to_cpu(p->blksize);
4231
4232         D_ASSERT(mdev->agreed_pro_version >= 89);
4233
4234         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4235
4236         if (get_ldev(mdev)) {
4237                 drbd_rs_complete_io(mdev, sector);
4238                 drbd_set_in_sync(mdev, sector, blksize);
4239                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4240                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4241                 put_ldev(mdev);
4242         }
4243         dec_rs_pending(mdev);
4244         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4245
4246         return true;
4247 }
4248
4249 /* when we receive the ACK for a write request,
4250  * verify that we actually know about it */
4251 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4252         u64 id, sector_t sector)
4253 {
4254         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4255         struct hlist_node *n;
4256         struct drbd_request *req;
4257
4258         hlist_for_each_entry(req, n, slot, collision) {
4259                 if ((unsigned long)req == (unsigned long)id) {
4260                         if (req->sector != sector) {
4261                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4262                                     "wrong sector (%llus versus %llus)\n", req,
4263                                     (unsigned long long)req->sector,
4264                                     (unsigned long long)sector);
4265                                 break;
4266                         }
4267                         return req;
4268                 }
4269         }
4270         return NULL;
4271 }
4272
4273 typedef struct drbd_request *(req_validator_fn)
4274         (struct drbd_conf *mdev, u64 id, sector_t sector);
4275
4276 static int validate_req_change_req_state(struct drbd_conf *mdev,
4277         u64 id, sector_t sector, req_validator_fn validator,
4278         const char *func, enum drbd_req_event what)
4279 {
4280         struct drbd_request *req;
4281         struct bio_and_error m;
4282
4283         spin_lock_irq(&mdev->req_lock);
4284         req = validator(mdev, id, sector);
4285         if (unlikely(!req)) {
4286                 spin_unlock_irq(&mdev->req_lock);
4287
4288                 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4289                         (void *)(unsigned long)id, (unsigned long long)sector);
4290                 return false;
4291         }
4292         __req_mod(req, what, &m);
4293         spin_unlock_irq(&mdev->req_lock);
4294
4295         if (m.bio)
4296                 complete_master_bio(mdev, &m);
4297         return true;
4298 }
4299
4300 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4301 {
4302         struct p_block_ack *p = (struct p_block_ack *)h;
4303         sector_t sector = be64_to_cpu(p->sector);
4304         int blksize = be32_to_cpu(p->blksize);
4305         enum drbd_req_event what;
4306
4307         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4308
4309         if (is_syncer_block_id(p->block_id)) {
4310                 drbd_set_in_sync(mdev, sector, blksize);
4311                 dec_rs_pending(mdev);
4312                 return true;
4313         }
4314         switch (be16_to_cpu(h->command)) {
4315         case P_RS_WRITE_ACK:
4316                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4317                 what = write_acked_by_peer_and_sis;
4318                 break;
4319         case P_WRITE_ACK:
4320                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4321                 what = write_acked_by_peer;
4322                 break;
4323         case P_RECV_ACK:
4324                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4325                 what = recv_acked_by_peer;
4326                 break;
4327         case P_DISCARD_ACK:
4328                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4329                 what = conflict_discarded_by_peer;
4330                 break;
4331         default:
4332                 D_ASSERT(0);
4333                 return false;
4334         }
4335
4336         return validate_req_change_req_state(mdev, p->block_id, sector,
4337                 _ack_id_to_req, __func__ , what);
4338 }
4339
4340 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4341 {
4342         struct p_block_ack *p = (struct p_block_ack *)h;
4343         sector_t sector = be64_to_cpu(p->sector);
4344         int size = be32_to_cpu(p->blksize);
4345         struct drbd_request *req;
4346         struct bio_and_error m;
4347
4348         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4349
4350         if (is_syncer_block_id(p->block_id)) {
4351                 dec_rs_pending(mdev);
4352                 drbd_rs_failed_io(mdev, sector, size);
4353                 return true;
4354         }
4355
4356         spin_lock_irq(&mdev->req_lock);
4357         req = _ack_id_to_req(mdev, p->block_id, sector);
4358         if (!req) {
4359                 spin_unlock_irq(&mdev->req_lock);
4360                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4361                     mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4362                         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4363                            The master bio might already be completed, therefore the
4364                            request is no longer in the collision hash.
4365                            => Do not try to validate block_id as request. */
4366                         /* In Protocol B we might already have got a P_RECV_ACK
4367                            but then get a P_NEG_ACK after wards. */
4368                         drbd_set_out_of_sync(mdev, sector, size);
4369                         return true;
4370                 } else {
4371                         dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4372                                 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4373                         return false;
4374                 }
4375         }
4376         __req_mod(req, neg_acked, &m);
4377         spin_unlock_irq(&mdev->req_lock);
4378
4379         if (m.bio)
4380                 complete_master_bio(mdev, &m);
4381         return true;
4382 }
4383
4384 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4385 {
4386         struct p_block_ack *p = (struct p_block_ack *)h;
4387         sector_t sector = be64_to_cpu(p->sector);
4388
4389         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4390         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4391             (unsigned long long)sector, be32_to_cpu(p->blksize));
4392
4393         return validate_req_change_req_state(mdev, p->block_id, sector,
4394                 _ar_id_to_req, __func__ , neg_acked);
4395 }
4396
4397 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4398 {
4399         sector_t sector;
4400         int size;
4401         struct p_block_ack *p = (struct p_block_ack *)h;
4402
4403         sector = be64_to_cpu(p->sector);
4404         size = be32_to_cpu(p->blksize);
4405
4406         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4407
4408         dec_rs_pending(mdev);
4409
4410         if (get_ldev_if_state(mdev, D_FAILED)) {
4411                 drbd_rs_complete_io(mdev, sector);
4412                 switch (be16_to_cpu(h->command)) {
4413                 case P_NEG_RS_DREPLY:
4414                         drbd_rs_failed_io(mdev, sector, size);
4415                 case P_RS_CANCEL:
4416                         break;
4417                 default:
4418                         D_ASSERT(0);
4419                         put_ldev(mdev);
4420                         return false;
4421                 }
4422                 put_ldev(mdev);
4423         }
4424
4425         return true;
4426 }
4427
4428 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4429 {
4430         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4431
4432         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4433
4434         if (mdev->state.conn == C_AHEAD &&
4435             atomic_read(&mdev->ap_in_flight) == 0 &&
4436             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4437                 mdev->start_resync_timer.expires = jiffies + HZ;
4438                 add_timer(&mdev->start_resync_timer);
4439         }
4440
4441         return true;
4442 }
4443
4444 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4445 {
4446         struct p_block_ack *p = (struct p_block_ack *)h;
4447         struct drbd_work *w;
4448         sector_t sector;
4449         int size;
4450
4451         sector = be64_to_cpu(p->sector);
4452         size = be32_to_cpu(p->blksize);
4453
4454         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4455
4456         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4457                 drbd_ov_oos_found(mdev, sector, size);
4458         else
4459                 ov_oos_print(mdev);
4460
4461         if (!get_ldev(mdev))
4462                 return true;
4463
4464         drbd_rs_complete_io(mdev, sector);
4465         dec_rs_pending(mdev);
4466
4467         --mdev->ov_left;
4468
4469         /* let's advance progress step marks only for every other megabyte */
4470         if ((mdev->ov_left & 0x200) == 0x200)
4471                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4472
4473         if (mdev->ov_left == 0) {
4474                 w = kmalloc(sizeof(*w), GFP_NOIO);
4475                 if (w) {
4476                         w->cb = w_ov_finished;
4477                         drbd_queue_work_front(&mdev->data.work, w);
4478                 } else {
4479                         dev_err(DEV, "kmalloc(w) failed.");
4480                         ov_oos_print(mdev);
4481                         drbd_resync_finished(mdev);
4482                 }
4483         }
4484         put_ldev(mdev);
4485         return true;
4486 }
4487
4488 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4489 {
4490         return true;
4491 }
4492
4493 struct asender_cmd {
4494         size_t pkt_size;
4495         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4496 };
4497
4498 static struct asender_cmd *get_asender_cmd(int cmd)
4499 {
4500         static struct asender_cmd asender_tbl[] = {
4501                 /* anything missing from this table is in
4502                  * the drbd_cmd_handler (drbd_default_handler) table,
4503                  * see the beginning of drbdd() */
4504         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4505         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4506         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4507         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4508         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4509         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4510         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4511         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4512         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4513         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4514         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4515         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4516         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4517         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4518         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4519         [P_MAX_CMD]         = { 0, NULL },
4520         };
4521         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4522                 return NULL;
4523         return &asender_tbl[cmd];
4524 }
4525
4526 int drbd_asender(struct drbd_thread *thi)
4527 {
4528         struct drbd_conf *mdev = thi->mdev;
4529         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4530         struct asender_cmd *cmd = NULL;
4531
4532         int rv, len;
4533         void *buf    = h;
4534         int received = 0;
4535         int expect   = sizeof(struct p_header80);
4536         int empty;
4537         int ping_timeout_active = 0;
4538
4539         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4540
4541         current->policy = SCHED_RR;  /* Make this a realtime task! */
4542         current->rt_priority = 2;    /* more important than all other tasks */
4543
4544         while (get_t_state(thi) == Running) {
4545                 drbd_thread_current_set_cpu(mdev);
4546                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4547                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4548                         mdev->meta.socket->sk->sk_rcvtimeo =
4549                                 mdev->net_conf->ping_timeo*HZ/10;
4550                         ping_timeout_active = 1;
4551                 }
4552
4553                 /* conditionally cork;
4554                  * it may hurt latency if we cork without much to send */
4555                 if (!mdev->net_conf->no_cork &&
4556                         3 < atomic_read(&mdev->unacked_cnt))
4557                         drbd_tcp_cork(mdev->meta.socket);
4558                 while (1) {
4559                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4560                         flush_signals(current);
4561                         if (!drbd_process_done_ee(mdev))
4562                                 goto reconnect;
4563                         /* to avoid race with newly queued ACKs */
4564                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4565                         spin_lock_irq(&mdev->req_lock);
4566                         empty = list_empty(&mdev->done_ee);
4567                         spin_unlock_irq(&mdev->req_lock);
4568                         /* new ack may have been queued right here,
4569                          * but then there is also a signal pending,
4570                          * and we start over... */
4571                         if (empty)
4572                                 break;
4573                 }
4574                 /* but unconditionally uncork unless disabled */
4575                 if (!mdev->net_conf->no_cork)
4576                         drbd_tcp_uncork(mdev->meta.socket);
4577
4578                 /* short circuit, recv_msg would return EINTR anyways. */
4579                 if (signal_pending(current))
4580                         continue;
4581
4582                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4583                                      buf, expect-received, 0);
4584                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4585
4586                 flush_signals(current);
4587
4588                 /* Note:
4589                  * -EINTR        (on meta) we got a signal
4590                  * -EAGAIN       (on meta) rcvtimeo expired
4591                  * -ECONNRESET   other side closed the connection
4592                  * -ERESTARTSYS  (on data) we got a signal
4593                  * rv <  0       other than above: unexpected error!
4594                  * rv == expected: full header or command
4595                  * rv <  expected: "woken" by signal during receive
4596                  * rv == 0       : "connection shut down by peer"
4597                  */
4598                 if (likely(rv > 0)) {
4599                         received += rv;
4600                         buf      += rv;
4601                 } else if (rv == 0) {
4602                         dev_err(DEV, "meta connection shut down by peer.\n");
4603                         goto reconnect;
4604                 } else if (rv == -EAGAIN) {
4605                         if (ping_timeout_active) {
4606                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4607                                 goto reconnect;
4608                         }
4609                         set_bit(SEND_PING, &mdev->flags);
4610                         continue;
4611                 } else if (rv == -EINTR) {
4612                         continue;
4613                 } else {
4614                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4615                         goto reconnect;
4616                 }
4617
4618                 if (received == expect && cmd == NULL) {
4619                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4620                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4621                                     be32_to_cpu(h->magic),
4622                                     be16_to_cpu(h->command),
4623                                     be16_to_cpu(h->length));
4624                                 goto reconnect;
4625                         }
4626                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4627                         len = be16_to_cpu(h->length);
4628                         if (unlikely(cmd == NULL)) {
4629                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4630                                     be32_to_cpu(h->magic),
4631                                     be16_to_cpu(h->command),
4632                                     be16_to_cpu(h->length));
4633                                 goto disconnect;
4634                         }
4635                         expect = cmd->pkt_size;
4636                         ERR_IF(len != expect-sizeof(struct p_header80))
4637                                 goto reconnect;
4638                 }
4639                 if (received == expect) {
4640                         D_ASSERT(cmd != NULL);
4641                         if (!cmd->process(mdev, h))
4642                                 goto reconnect;
4643
4644                         /* the idle_timeout (ping-int)
4645                          * has been restored in got_PingAck() */
4646                         if (cmd == get_asender_cmd(P_PING_ACK))
4647                                 ping_timeout_active = 0;
4648
4649                         buf      = h;
4650                         received = 0;
4651                         expect   = sizeof(struct p_header80);
4652                         cmd      = NULL;
4653                 }
4654         }
4655
4656         if (0) {
4657 reconnect:
4658                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4659                 drbd_md_sync(mdev);
4660         }
4661         if (0) {
4662 disconnect:
4663                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4664                 drbd_md_sync(mdev);
4665         }
4666         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4667
4668         D_ASSERT(mdev->state.conn < C_CONNECTED);
4669         dev_info(DEV, "asender terminated\n");
4670
4671         return 0;
4672 }