Merge branch 'rmobile-fixes-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[pandora-kernel.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281                 i = page_chain_free(page);
282         else {
283                 struct page *tmp;
284                 tmp = page_chain_tail(page, &i);
285                 spin_lock(&drbd_pp_lock);
286                 page_chain_add(&drbd_pp_pool, page, tmp);
287                 drbd_pp_vacant += i;
288                 spin_unlock(&drbd_pp_lock);
289         }
290         i = atomic_sub_return(i, a);
291         if (i < 0)
292                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294         wake_up(&drbd_pp_wait);
295 }
296
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313                                      u64 id,
314                                      sector_t sector,
315                                      unsigned int data_size,
316                                      gfp_t gfp_mask) __must_hold(local)
317 {
318         struct drbd_epoch_entry *e;
319         struct page *page;
320         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323                 return NULL;
324
325         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326         if (!e) {
327                 if (!(gfp_mask & __GFP_NOWARN))
328                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329                 return NULL;
330         }
331
332         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333         if (!page)
334                 goto fail;
335
336         INIT_HLIST_NODE(&e->colision);
337         e->epoch = NULL;
338         e->mdev = mdev;
339         e->pages = page;
340         atomic_set(&e->pending_bios, 0);
341         e->size = data_size;
342         e->flags = 0;
343         e->sector = sector;
344         e->block_id = id;
345
346         return e;
347
348  fail:
349         mempool_free(e, drbd_ee_mempool);
350         return NULL;
351 }
352
353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355         if (e->flags & EE_HAS_DIGEST)
356                 kfree(e->digest);
357         drbd_pp_free(mdev, e->pages, is_net);
358         D_ASSERT(atomic_read(&e->pending_bios) == 0);
359         D_ASSERT(hlist_unhashed(&e->colision));
360         mempool_free(e, drbd_ee_mempool);
361 }
362
363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365         LIST_HEAD(work_list);
366         struct drbd_epoch_entry *e, *t;
367         int count = 0;
368         int is_net = list == &mdev->net_ee;
369
370         spin_lock_irq(&mdev->req_lock);
371         list_splice_init(list, &work_list);
372         spin_unlock_irq(&mdev->req_lock);
373
374         list_for_each_entry_safe(e, t, &work_list, w.list) {
375                 drbd_free_some_ee(mdev, e, is_net);
376                 count++;
377         }
378         return count;
379 }
380
381
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393         LIST_HEAD(work_list);
394         LIST_HEAD(reclaimed);
395         struct drbd_epoch_entry *e, *t;
396         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397
398         spin_lock_irq(&mdev->req_lock);
399         reclaim_net_ee(mdev, &reclaimed);
400         list_splice_init(&mdev->done_ee, &work_list);
401         spin_unlock_irq(&mdev->req_lock);
402
403         list_for_each_entry_safe(e, t, &reclaimed, w.list)
404                 drbd_free_net_ee(mdev, e);
405
406         /* possible callbacks here:
407          * e_end_block, and e_end_resync_block, e_send_discard_ack.
408          * all ignore the last argument.
409          */
410         list_for_each_entry_safe(e, t, &work_list, w.list) {
411                 /* list_del not necessary, next/prev members not touched */
412                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
413                 drbd_free_ee(mdev, e);
414         }
415         wake_up(&mdev->ee_wait);
416
417         return ok;
418 }
419
420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422         DEFINE_WAIT(wait);
423
424         /* avoids spin_lock/unlock
425          * and calling prepare_to_wait in the fast path */
426         while (!list_empty(head)) {
427                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428                 spin_unlock_irq(&mdev->req_lock);
429                 io_schedule();
430                 finish_wait(&mdev->ee_wait, &wait);
431                 spin_lock_irq(&mdev->req_lock);
432         }
433 }
434
435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437         spin_lock_irq(&mdev->req_lock);
438         _drbd_wait_ee_list_empty(mdev, head);
439         spin_unlock_irq(&mdev->req_lock);
440 }
441
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445                 struct socket *sock, struct socket **newsock)
446 {
447         struct sock *sk = sock->sk;
448         int err = 0;
449
450         *what = "listen";
451         err = sock->ops->listen(sock, 5);
452         if (err < 0)
453                 goto out;
454
455         *what = "sock_create_lite";
456         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457                                newsock);
458         if (err < 0)
459                 goto out;
460
461         *what = "accept";
462         err = sock->ops->accept(sock, *newsock, 0);
463         if (err < 0) {
464                 sock_release(*newsock);
465                 *newsock = NULL;
466                 goto out;
467         }
468         (*newsock)->ops  = sock->ops;
469
470 out:
471         return err;
472 }
473
474 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
475                     void *buf, size_t size, int flags)
476 {
477         mm_segment_t oldfs;
478         struct kvec iov = {
479                 .iov_base = buf,
480                 .iov_len = size,
481         };
482         struct msghdr msg = {
483                 .msg_iovlen = 1,
484                 .msg_iov = (struct iovec *)&iov,
485                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486         };
487         int rv;
488
489         oldfs = get_fs();
490         set_fs(KERNEL_DS);
491         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492         set_fs(oldfs);
493
494         return rv;
495 }
496
497 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
498 {
499         mm_segment_t oldfs;
500         struct kvec iov = {
501                 .iov_base = buf,
502                 .iov_len = size,
503         };
504         struct msghdr msg = {
505                 .msg_iovlen = 1,
506                 .msg_iov = (struct iovec *)&iov,
507                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
508         };
509         int rv;
510
511         oldfs = get_fs();
512         set_fs(KERNEL_DS);
513
514         for (;;) {
515                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
516                 if (rv == size)
517                         break;
518
519                 /* Note:
520                  * ECONNRESET   other side closed the connection
521                  * ERESTARTSYS  (on  sock) we got a signal
522                  */
523
524                 if (rv < 0) {
525                         if (rv == -ECONNRESET)
526                                 dev_info(DEV, "sock was reset by peer\n");
527                         else if (rv != -ERESTARTSYS)
528                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
529                         break;
530                 } else if (rv == 0) {
531                         dev_info(DEV, "sock was shut down by peer\n");
532                         break;
533                 } else  {
534                         /* signal came in, or peer/link went down,
535                          * after we read a partial message
536                          */
537                         /* D_ASSERT(signal_pending(current)); */
538                         break;
539                 }
540         };
541
542         set_fs(oldfs);
543
544         if (rv != size)
545                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
546
547         return rv;
548 }
549
550 /* quoting tcp(7):
551  *   On individual connections, the socket buffer size must be set prior to the
552  *   listen(2) or connect(2) calls in order to have it take effect.
553  * This is our wrapper to do so.
554  */
555 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556                 unsigned int rcv)
557 {
558         /* open coded SO_SNDBUF, SO_RCVBUF */
559         if (snd) {
560                 sock->sk->sk_sndbuf = snd;
561                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562         }
563         if (rcv) {
564                 sock->sk->sk_rcvbuf = rcv;
565                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566         }
567 }
568
569 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
570 {
571         const char *what;
572         struct socket *sock;
573         struct sockaddr_in6 src_in6;
574         int err;
575         int disconnect_on_error = 1;
576
577         if (!get_net_conf(mdev))
578                 return NULL;
579
580         what = "sock_create_kern";
581         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
582                 SOCK_STREAM, IPPROTO_TCP, &sock);
583         if (err < 0) {
584                 sock = NULL;
585                 goto out;
586         }
587
588         sock->sk->sk_rcvtimeo =
589         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
590         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
591                         mdev->net_conf->rcvbuf_size);
592
593        /* explicitly bind to the configured IP as source IP
594         *  for the outgoing connections.
595         *  This is needed for multihomed hosts and to be
596         *  able to use lo: interfaces for drbd.
597         * Make sure to use 0 as port number, so linux selects
598         *  a free one dynamically.
599         */
600         memcpy(&src_in6, mdev->net_conf->my_addr,
601                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
602         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
603                 src_in6.sin6_port = 0;
604         else
605                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
606
607         what = "bind before connect";
608         err = sock->ops->bind(sock,
609                               (struct sockaddr *) &src_in6,
610                               mdev->net_conf->my_addr_len);
611         if (err < 0)
612                 goto out;
613
614         /* connect may fail, peer not yet available.
615          * stay C_WF_CONNECTION, don't go Disconnecting! */
616         disconnect_on_error = 0;
617         what = "connect";
618         err = sock->ops->connect(sock,
619                                  (struct sockaddr *)mdev->net_conf->peer_addr,
620                                  mdev->net_conf->peer_addr_len, 0);
621
622 out:
623         if (err < 0) {
624                 if (sock) {
625                         sock_release(sock);
626                         sock = NULL;
627                 }
628                 switch (-err) {
629                         /* timeout, busy, signal pending */
630                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
631                 case EINTR: case ERESTARTSYS:
632                         /* peer not (yet) available, network problem */
633                 case ECONNREFUSED: case ENETUNREACH:
634                 case EHOSTDOWN:    case EHOSTUNREACH:
635                         disconnect_on_error = 0;
636                         break;
637                 default:
638                         dev_err(DEV, "%s failed, err = %d\n", what, err);
639                 }
640                 if (disconnect_on_error)
641                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
642         }
643         put_net_conf(mdev);
644         return sock;
645 }
646
647 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
648 {
649         int timeo, err;
650         struct socket *s_estab = NULL, *s_listen;
651         const char *what;
652
653         if (!get_net_conf(mdev))
654                 return NULL;
655
656         what = "sock_create_kern";
657         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
658                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
659         if (err) {
660                 s_listen = NULL;
661                 goto out;
662         }
663
664         timeo = mdev->net_conf->try_connect_int * HZ;
665         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
666
667         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
668         s_listen->sk->sk_rcvtimeo = timeo;
669         s_listen->sk->sk_sndtimeo = timeo;
670         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
671                         mdev->net_conf->rcvbuf_size);
672
673         what = "bind before listen";
674         err = s_listen->ops->bind(s_listen,
675                               (struct sockaddr *) mdev->net_conf->my_addr,
676                               mdev->net_conf->my_addr_len);
677         if (err < 0)
678                 goto out;
679
680         err = drbd_accept(mdev, &what, s_listen, &s_estab);
681
682 out:
683         if (s_listen)
684                 sock_release(s_listen);
685         if (err < 0) {
686                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
687                         dev_err(DEV, "%s failed, err = %d\n", what, err);
688                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
689                 }
690         }
691         put_net_conf(mdev);
692
693         return s_estab;
694 }
695
696 static int drbd_send_fp(struct drbd_conf *mdev,
697         struct socket *sock, enum drbd_packets cmd)
698 {
699         struct p_header80 *h = &mdev->data.sbuf.header.h80;
700
701         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
702 }
703
704 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
705 {
706         struct p_header80 *h = &mdev->data.rbuf.header.h80;
707         int rr;
708
709         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
710
711         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
712                 return be16_to_cpu(h->command);
713
714         return 0xffff;
715 }
716
717 /**
718  * drbd_socket_okay() - Free the socket if its connection is not okay
719  * @mdev:       DRBD device.
720  * @sock:       pointer to the pointer to the socket.
721  */
722 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
723 {
724         int rr;
725         char tb[4];
726
727         if (!*sock)
728                 return false;
729
730         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
731
732         if (rr > 0 || rr == -EAGAIN) {
733                 return true;
734         } else {
735                 sock_release(*sock);
736                 *sock = NULL;
737                 return false;
738         }
739 }
740
741 /*
742  * return values:
743  *   1 yes, we have a valid connection
744  *   0 oops, did not work out, please try again
745  *  -1 peer talks different language,
746  *     no point in trying again, please go standalone.
747  *  -2 We do not have a network config...
748  */
749 static int drbd_connect(struct drbd_conf *mdev)
750 {
751         struct socket *s, *sock, *msock;
752         int try, h, ok;
753
754         D_ASSERT(!mdev->data.socket);
755
756         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757                 return -2;
758
759         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761         sock  = NULL;
762         msock = NULL;
763
764         do {
765                 for (try = 0;;) {
766                         /* 3 tries, this should take less than a second! */
767                         s = drbd_try_connect(mdev);
768                         if (s || ++try >= 3)
769                                 break;
770                         /* give the other side time to call bind() & listen() */
771                         schedule_timeout_interruptible(HZ / 10);
772                 }
773
774                 if (s) {
775                         if (!sock) {
776                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
777                                 sock = s;
778                                 s = NULL;
779                         } else if (!msock) {
780                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
781                                 msock = s;
782                                 s = NULL;
783                         } else {
784                                 dev_err(DEV, "Logic error in drbd_connect()\n");
785                                 goto out_release_sockets;
786                         }
787                 }
788
789                 if (sock && msock) {
790                         schedule_timeout_interruptible(HZ / 10);
791                         ok = drbd_socket_okay(mdev, &sock);
792                         ok = drbd_socket_okay(mdev, &msock) && ok;
793                         if (ok)
794                                 break;
795                 }
796
797 retry:
798                 s = drbd_wait_for_connect(mdev);
799                 if (s) {
800                         try = drbd_recv_fp(mdev, s);
801                         drbd_socket_okay(mdev, &sock);
802                         drbd_socket_okay(mdev, &msock);
803                         switch (try) {
804                         case P_HAND_SHAKE_S:
805                                 if (sock) {
806                                         dev_warn(DEV, "initial packet S crossed\n");
807                                         sock_release(sock);
808                                 }
809                                 sock = s;
810                                 break;
811                         case P_HAND_SHAKE_M:
812                                 if (msock) {
813                                         dev_warn(DEV, "initial packet M crossed\n");
814                                         sock_release(msock);
815                                 }
816                                 msock = s;
817                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
818                                 break;
819                         default:
820                                 dev_warn(DEV, "Error receiving initial packet\n");
821                                 sock_release(s);
822                                 if (random32() & 1)
823                                         goto retry;
824                         }
825                 }
826
827                 if (mdev->state.conn <= C_DISCONNECTING)
828                         goto out_release_sockets;
829                 if (signal_pending(current)) {
830                         flush_signals(current);
831                         smp_rmb();
832                         if (get_t_state(&mdev->receiver) == Exiting)
833                                 goto out_release_sockets;
834                 }
835
836                 if (sock && msock) {
837                         ok = drbd_socket_okay(mdev, &sock);
838                         ok = drbd_socket_okay(mdev, &msock) && ok;
839                         if (ok)
840                                 break;
841                 }
842         } while (1);
843
844         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
845         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
846
847         sock->sk->sk_allocation = GFP_NOIO;
848         msock->sk->sk_allocation = GFP_NOIO;
849
850         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
851         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
852
853         /* NOT YET ...
854          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856          * first set it to the P_HAND_SHAKE timeout,
857          * which we set to 4x the configured ping_timeout. */
858         sock->sk->sk_sndtimeo =
859         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864         /* we don't want delays.
865          * we use TCP_CORK where appropriate, though */
866         drbd_tcp_nodelay(sock);
867         drbd_tcp_nodelay(msock);
868
869         mdev->data.socket = sock;
870         mdev->meta.socket = msock;
871         mdev->last_received = jiffies;
872
873         D_ASSERT(mdev->asender.task == NULL);
874
875         h = drbd_do_handshake(mdev);
876         if (h <= 0)
877                 return h;
878
879         if (mdev->cram_hmac_tfm) {
880                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
881                 switch (drbd_do_auth(mdev)) {
882                 case -1:
883                         dev_err(DEV, "Authentication of peer failed\n");
884                         return -1;
885                 case 0:
886                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
887                         return 0;
888                 }
889         }
890
891         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892                 return 0;
893
894         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897         atomic_set(&mdev->packet_seq, 0);
898         mdev->peer_seq = 0;
899
900         drbd_thread_start(&mdev->asender);
901
902         if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
903                 drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
904                 put_ldev(mdev);
905         }
906
907         if (drbd_send_protocol(mdev) == -1)
908                 return -1;
909         drbd_send_sync_param(mdev, &mdev->sync_conf);
910         drbd_send_sizes(mdev, 0, 0);
911         drbd_send_uuids(mdev);
912         drbd_send_state(mdev);
913         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
914         clear_bit(RESIZE_PENDING, &mdev->flags);
915         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
916
917         return 1;
918
919 out_release_sockets:
920         if (sock)
921                 sock_release(sock);
922         if (msock)
923                 sock_release(msock);
924         return -1;
925 }
926
927 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
928 {
929         union p_header *h = &mdev->data.rbuf.header;
930         int r;
931
932         r = drbd_recv(mdev, h, sizeof(*h));
933         if (unlikely(r != sizeof(*h))) {
934                 if (!signal_pending(current))
935                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
936                 return false;
937         }
938
939         if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
940                 *cmd = be16_to_cpu(h->h80.command);
941                 *packet_size = be16_to_cpu(h->h80.length);
942         } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
943                 *cmd = be16_to_cpu(h->h95.command);
944                 *packet_size = be32_to_cpu(h->h95.length);
945         } else {
946                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
947                     be32_to_cpu(h->h80.magic),
948                     be16_to_cpu(h->h80.command),
949                     be16_to_cpu(h->h80.length));
950                 return false;
951         }
952         mdev->last_received = jiffies;
953
954         return true;
955 }
956
957 static void drbd_flush(struct drbd_conf *mdev)
958 {
959         int rv;
960
961         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
962                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
963                                         NULL);
964                 if (rv) {
965                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
966                         /* would rather check on EOPNOTSUPP, but that is not reliable.
967                          * don't try again for ANY return value != 0
968                          * if (rv == -EOPNOTSUPP) */
969                         drbd_bump_write_ordering(mdev, WO_drain_io);
970                 }
971                 put_ldev(mdev);
972         }
973 }
974
975 /**
976  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
977  * @mdev:       DRBD device.
978  * @epoch:      Epoch object.
979  * @ev:         Epoch event.
980  */
981 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
982                                                struct drbd_epoch *epoch,
983                                                enum epoch_event ev)
984 {
985         int epoch_size;
986         struct drbd_epoch *next_epoch;
987         enum finish_epoch rv = FE_STILL_LIVE;
988
989         spin_lock(&mdev->epoch_lock);
990         do {
991                 next_epoch = NULL;
992
993                 epoch_size = atomic_read(&epoch->epoch_size);
994
995                 switch (ev & ~EV_CLEANUP) {
996                 case EV_PUT:
997                         atomic_dec(&epoch->active);
998                         break;
999                 case EV_GOT_BARRIER_NR:
1000                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1001                         break;
1002                 case EV_BECAME_LAST:
1003                         /* nothing to do*/
1004                         break;
1005                 }
1006
1007                 if (epoch_size != 0 &&
1008                     atomic_read(&epoch->active) == 0 &&
1009                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1010                         if (!(ev & EV_CLEANUP)) {
1011                                 spin_unlock(&mdev->epoch_lock);
1012                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1013                                 spin_lock(&mdev->epoch_lock);
1014                         }
1015                         dec_unacked(mdev);
1016
1017                         if (mdev->current_epoch != epoch) {
1018                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1019                                 list_del(&epoch->list);
1020                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1021                                 mdev->epochs--;
1022                                 kfree(epoch);
1023
1024                                 if (rv == FE_STILL_LIVE)
1025                                         rv = FE_DESTROYED;
1026                         } else {
1027                                 epoch->flags = 0;
1028                                 atomic_set(&epoch->epoch_size, 0);
1029                                 /* atomic_set(&epoch->active, 0); is already zero */
1030                                 if (rv == FE_STILL_LIVE)
1031                                         rv = FE_RECYCLED;
1032                                 wake_up(&mdev->ee_wait);
1033                         }
1034                 }
1035
1036                 if (!next_epoch)
1037                         break;
1038
1039                 epoch = next_epoch;
1040         } while (1);
1041
1042         spin_unlock(&mdev->epoch_lock);
1043
1044         return rv;
1045 }
1046
1047 /**
1048  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1049  * @mdev:       DRBD device.
1050  * @wo:         Write ordering method to try.
1051  */
1052 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1053 {
1054         enum write_ordering_e pwo;
1055         static char *write_ordering_str[] = {
1056                 [WO_none] = "none",
1057                 [WO_drain_io] = "drain",
1058                 [WO_bdev_flush] = "flush",
1059         };
1060
1061         pwo = mdev->write_ordering;
1062         wo = min(pwo, wo);
1063         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1064                 wo = WO_drain_io;
1065         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1066                 wo = WO_none;
1067         mdev->write_ordering = wo;
1068         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1069                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1070 }
1071
1072 /**
1073  * drbd_submit_ee()
1074  * @mdev:       DRBD device.
1075  * @e:          epoch entry
1076  * @rw:         flag field, see bio->bi_rw
1077  *
1078  * May spread the pages to multiple bios,
1079  * depending on bio_add_page restrictions.
1080  *
1081  * Returns 0 if all bios have been submitted,
1082  * -ENOMEM if we could not allocate enough bios,
1083  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1084  *  single page to an empty bio (which should never happen and likely indicates
1085  *  that the lower level IO stack is in some way broken). This has been observed
1086  *  on certain Xen deployments.
1087  */
1088 /* TODO allocate from our own bio_set. */
1089 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1090                 const unsigned rw, const int fault_type)
1091 {
1092         struct bio *bios = NULL;
1093         struct bio *bio;
1094         struct page *page = e->pages;
1095         sector_t sector = e->sector;
1096         unsigned ds = e->size;
1097         unsigned n_bios = 0;
1098         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1099         int err = -ENOMEM;
1100
1101         /* In most cases, we will only need one bio.  But in case the lower
1102          * level restrictions happen to be different at this offset on this
1103          * side than those of the sending peer, we may need to submit the
1104          * request in more than one bio. */
1105 next_bio:
1106         bio = bio_alloc(GFP_NOIO, nr_pages);
1107         if (!bio) {
1108                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1109                 goto fail;
1110         }
1111         /* > e->sector, unless this is the first bio */
1112         bio->bi_sector = sector;
1113         bio->bi_bdev = mdev->ldev->backing_bdev;
1114         bio->bi_rw = rw;
1115         bio->bi_private = e;
1116         bio->bi_end_io = drbd_endio_sec;
1117
1118         bio->bi_next = bios;
1119         bios = bio;
1120         ++n_bios;
1121
1122         page_chain_for_each(page) {
1123                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1124                 if (!bio_add_page(bio, page, len, 0)) {
1125                         /* A single page must always be possible!
1126                          * But in case it fails anyways,
1127                          * we deal with it, and complain (below). */
1128                         if (bio->bi_vcnt == 0) {
1129                                 dev_err(DEV,
1130                                         "bio_add_page failed for len=%u, "
1131                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1132                                         len, (unsigned long long)bio->bi_sector);
1133                                 err = -ENOSPC;
1134                                 goto fail;
1135                         }
1136                         goto next_bio;
1137                 }
1138                 ds -= len;
1139                 sector += len >> 9;
1140                 --nr_pages;
1141         }
1142         D_ASSERT(page == NULL);
1143         D_ASSERT(ds == 0);
1144
1145         atomic_set(&e->pending_bios, n_bios);
1146         do {
1147                 bio = bios;
1148                 bios = bios->bi_next;
1149                 bio->bi_next = NULL;
1150
1151                 drbd_generic_make_request(mdev, fault_type, bio);
1152         } while (bios);
1153         return 0;
1154
1155 fail:
1156         while (bios) {
1157                 bio = bios;
1158                 bios = bios->bi_next;
1159                 bio_put(bio);
1160         }
1161         return err;
1162 }
1163
1164 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1165 {
1166         int rv;
1167         struct p_barrier *p = &mdev->data.rbuf.barrier;
1168         struct drbd_epoch *epoch;
1169
1170         inc_unacked(mdev);
1171
1172         mdev->current_epoch->barrier_nr = p->barrier;
1173         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1174
1175         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1176          * the activity log, which means it would not be resynced in case the
1177          * R_PRIMARY crashes now.
1178          * Therefore we must send the barrier_ack after the barrier request was
1179          * completed. */
1180         switch (mdev->write_ordering) {
1181         case WO_none:
1182                 if (rv == FE_RECYCLED)
1183                         return true;
1184
1185                 /* receiver context, in the writeout path of the other node.
1186                  * avoid potential distributed deadlock */
1187                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1188                 if (epoch)
1189                         break;
1190                 else
1191                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1192                         /* Fall through */
1193
1194         case WO_bdev_flush:
1195         case WO_drain_io:
1196                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1197                 drbd_flush(mdev);
1198
1199                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1200                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1201                         if (epoch)
1202                                 break;
1203                 }
1204
1205                 epoch = mdev->current_epoch;
1206                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1207
1208                 D_ASSERT(atomic_read(&epoch->active) == 0);
1209                 D_ASSERT(epoch->flags == 0);
1210
1211                 return true;
1212         default:
1213                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1214                 return false;
1215         }
1216
1217         epoch->flags = 0;
1218         atomic_set(&epoch->epoch_size, 0);
1219         atomic_set(&epoch->active, 0);
1220
1221         spin_lock(&mdev->epoch_lock);
1222         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1223                 list_add(&epoch->list, &mdev->current_epoch->list);
1224                 mdev->current_epoch = epoch;
1225                 mdev->epochs++;
1226         } else {
1227                 /* The current_epoch got recycled while we allocated this one... */
1228                 kfree(epoch);
1229         }
1230         spin_unlock(&mdev->epoch_lock);
1231
1232         return true;
1233 }
1234
1235 /* used from receive_RSDataReply (recv_resync_read)
1236  * and from receive_Data */
1237 static struct drbd_epoch_entry *
1238 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1239 {
1240         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1241         struct drbd_epoch_entry *e;
1242         struct page *page;
1243         int dgs, ds, rr;
1244         void *dig_in = mdev->int_dig_in;
1245         void *dig_vv = mdev->int_dig_vv;
1246         unsigned long *data;
1247
1248         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1249                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1250
1251         if (dgs) {
1252                 rr = drbd_recv(mdev, dig_in, dgs);
1253                 if (rr != dgs) {
1254                         if (!signal_pending(current))
1255                                 dev_warn(DEV,
1256                                         "short read receiving data digest: read %d expected %d\n",
1257                                         rr, dgs);
1258                         return NULL;
1259                 }
1260         }
1261
1262         data_size -= dgs;
1263
1264         ERR_IF(data_size == 0) return NULL;
1265         ERR_IF(data_size &  0x1ff) return NULL;
1266         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1267
1268         /* even though we trust out peer,
1269          * we sometimes have to double check. */
1270         if (sector + (data_size>>9) > capacity) {
1271                 dev_err(DEV, "request from peer beyond end of local disk: "
1272                         "capacity: %llus < sector: %llus + size: %u\n",
1273                         (unsigned long long)capacity,
1274                         (unsigned long long)sector, data_size);
1275                 return NULL;
1276         }
1277
1278         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1279          * "criss-cross" setup, that might cause write-out on some other DRBD,
1280          * which in turn might block on the other node at this very place.  */
1281         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1282         if (!e)
1283                 return NULL;
1284
1285         ds = data_size;
1286         page = e->pages;
1287         page_chain_for_each(page) {
1288                 unsigned len = min_t(int, ds, PAGE_SIZE);
1289                 data = kmap(page);
1290                 rr = drbd_recv(mdev, data, len);
1291                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1292                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1293                         data[0] = data[0] ^ (unsigned long)-1;
1294                 }
1295                 kunmap(page);
1296                 if (rr != len) {
1297                         drbd_free_ee(mdev, e);
1298                         if (!signal_pending(current))
1299                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1300                                 rr, len);
1301                         return NULL;
1302                 }
1303                 ds -= rr;
1304         }
1305
1306         if (dgs) {
1307                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1308                 if (memcmp(dig_in, dig_vv, dgs)) {
1309                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1310                                 (unsigned long long)sector, data_size);
1311                         drbd_bcast_ee(mdev, "digest failed",
1312                                         dgs, dig_in, dig_vv, e);
1313                         drbd_free_ee(mdev, e);
1314                         return NULL;
1315                 }
1316         }
1317         mdev->recv_cnt += data_size>>9;
1318         return e;
1319 }
1320
1321 /* drbd_drain_block() just takes a data block
1322  * out of the socket input buffer, and discards it.
1323  */
1324 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1325 {
1326         struct page *page;
1327         int rr, rv = 1;
1328         void *data;
1329
1330         if (!data_size)
1331                 return true;
1332
1333         page = drbd_pp_alloc(mdev, 1, 1);
1334
1335         data = kmap(page);
1336         while (data_size) {
1337                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1338                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1339                         rv = 0;
1340                         if (!signal_pending(current))
1341                                 dev_warn(DEV,
1342                                         "short read receiving data: read %d expected %d\n",
1343                                         rr, min_t(int, data_size, PAGE_SIZE));
1344                         break;
1345                 }
1346                 data_size -= rr;
1347         }
1348         kunmap(page);
1349         drbd_pp_free(mdev, page, 0);
1350         return rv;
1351 }
1352
1353 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1354                            sector_t sector, int data_size)
1355 {
1356         struct bio_vec *bvec;
1357         struct bio *bio;
1358         int dgs, rr, i, expect;
1359         void *dig_in = mdev->int_dig_in;
1360         void *dig_vv = mdev->int_dig_vv;
1361
1362         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1363                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1364
1365         if (dgs) {
1366                 rr = drbd_recv(mdev, dig_in, dgs);
1367                 if (rr != dgs) {
1368                         if (!signal_pending(current))
1369                                 dev_warn(DEV,
1370                                         "short read receiving data reply digest: read %d expected %d\n",
1371                                         rr, dgs);
1372                         return 0;
1373                 }
1374         }
1375
1376         data_size -= dgs;
1377
1378         /* optimistically update recv_cnt.  if receiving fails below,
1379          * we disconnect anyways, and counters will be reset. */
1380         mdev->recv_cnt += data_size>>9;
1381
1382         bio = req->master_bio;
1383         D_ASSERT(sector == bio->bi_sector);
1384
1385         bio_for_each_segment(bvec, bio, i) {
1386                 expect = min_t(int, data_size, bvec->bv_len);
1387                 rr = drbd_recv(mdev,
1388                              kmap(bvec->bv_page)+bvec->bv_offset,
1389                              expect);
1390                 kunmap(bvec->bv_page);
1391                 if (rr != expect) {
1392                         if (!signal_pending(current))
1393                                 dev_warn(DEV, "short read receiving data reply: "
1394                                         "read %d expected %d\n",
1395                                         rr, expect);
1396                         return 0;
1397                 }
1398                 data_size -= rr;
1399         }
1400
1401         if (dgs) {
1402                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1403                 if (memcmp(dig_in, dig_vv, dgs)) {
1404                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1405                         return 0;
1406                 }
1407         }
1408
1409         D_ASSERT(data_size == 0);
1410         return 1;
1411 }
1412
1413 /* e_end_resync_block() is called via
1414  * drbd_process_done_ee() by asender only */
1415 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1416 {
1417         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1418         sector_t sector = e->sector;
1419         int ok;
1420
1421         D_ASSERT(hlist_unhashed(&e->colision));
1422
1423         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1424                 drbd_set_in_sync(mdev, sector, e->size);
1425                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1426         } else {
1427                 /* Record failure to sync */
1428                 drbd_rs_failed_io(mdev, sector, e->size);
1429
1430                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1431         }
1432         dec_unacked(mdev);
1433
1434         return ok;
1435 }
1436
1437 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1438 {
1439         struct drbd_epoch_entry *e;
1440
1441         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1442         if (!e)
1443                 goto fail;
1444
1445         dec_rs_pending(mdev);
1446
1447         inc_unacked(mdev);
1448         /* corresponding dec_unacked() in e_end_resync_block()
1449          * respective _drbd_clear_done_ee */
1450
1451         e->w.cb = e_end_resync_block;
1452
1453         spin_lock_irq(&mdev->req_lock);
1454         list_add(&e->w.list, &mdev->sync_ee);
1455         spin_unlock_irq(&mdev->req_lock);
1456
1457         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1458         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1459                 return true;
1460
1461         /* don't care for the reason here */
1462         dev_err(DEV, "submit failed, triggering re-connect\n");
1463         spin_lock_irq(&mdev->req_lock);
1464         list_del(&e->w.list);
1465         spin_unlock_irq(&mdev->req_lock);
1466
1467         drbd_free_ee(mdev, e);
1468 fail:
1469         put_ldev(mdev);
1470         return false;
1471 }
1472
1473 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1474 {
1475         struct drbd_request *req;
1476         sector_t sector;
1477         int ok;
1478         struct p_data *p = &mdev->data.rbuf.data;
1479
1480         sector = be64_to_cpu(p->sector);
1481
1482         spin_lock_irq(&mdev->req_lock);
1483         req = _ar_id_to_req(mdev, p->block_id, sector);
1484         spin_unlock_irq(&mdev->req_lock);
1485         if (unlikely(!req)) {
1486                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1487                 return false;
1488         }
1489
1490         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1491          * special casing it there for the various failure cases.
1492          * still no race with drbd_fail_pending_reads */
1493         ok = recv_dless_read(mdev, req, sector, data_size);
1494
1495         if (ok)
1496                 req_mod(req, data_received);
1497         /* else: nothing. handled from drbd_disconnect...
1498          * I don't think we may complete this just yet
1499          * in case we are "on-disconnect: freeze" */
1500
1501         return ok;
1502 }
1503
1504 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1505 {
1506         sector_t sector;
1507         int ok;
1508         struct p_data *p = &mdev->data.rbuf.data;
1509
1510         sector = be64_to_cpu(p->sector);
1511         D_ASSERT(p->block_id == ID_SYNCER);
1512
1513         if (get_ldev(mdev)) {
1514                 /* data is submitted to disk within recv_resync_read.
1515                  * corresponding put_ldev done below on error,
1516                  * or in drbd_endio_write_sec. */
1517                 ok = recv_resync_read(mdev, sector, data_size);
1518         } else {
1519                 if (__ratelimit(&drbd_ratelimit_state))
1520                         dev_err(DEV, "Can not write resync data to local disk.\n");
1521
1522                 ok = drbd_drain_block(mdev, data_size);
1523
1524                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1525         }
1526
1527         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1528
1529         return ok;
1530 }
1531
1532 /* e_end_block() is called via drbd_process_done_ee().
1533  * this means this function only runs in the asender thread
1534  */
1535 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1536 {
1537         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1538         sector_t sector = e->sector;
1539         int ok = 1, pcmd;
1540
1541         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1542                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1543                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1544                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1545                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1546                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1547                         ok &= drbd_send_ack(mdev, pcmd, e);
1548                         if (pcmd == P_RS_WRITE_ACK)
1549                                 drbd_set_in_sync(mdev, sector, e->size);
1550                 } else {
1551                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1552                         /* we expect it to be marked out of sync anyways...
1553                          * maybe assert this?  */
1554                 }
1555                 dec_unacked(mdev);
1556         }
1557         /* we delete from the conflict detection hash _after_ we sent out the
1558          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1559         if (mdev->net_conf->two_primaries) {
1560                 spin_lock_irq(&mdev->req_lock);
1561                 D_ASSERT(!hlist_unhashed(&e->colision));
1562                 hlist_del_init(&e->colision);
1563                 spin_unlock_irq(&mdev->req_lock);
1564         } else {
1565                 D_ASSERT(hlist_unhashed(&e->colision));
1566         }
1567
1568         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1569
1570         return ok;
1571 }
1572
1573 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1574 {
1575         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1576         int ok = 1;
1577
1578         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1579         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1580
1581         spin_lock_irq(&mdev->req_lock);
1582         D_ASSERT(!hlist_unhashed(&e->colision));
1583         hlist_del_init(&e->colision);
1584         spin_unlock_irq(&mdev->req_lock);
1585
1586         dec_unacked(mdev);
1587
1588         return ok;
1589 }
1590
1591 /* Called from receive_Data.
1592  * Synchronize packets on sock with packets on msock.
1593  *
1594  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1595  * packet traveling on msock, they are still processed in the order they have
1596  * been sent.
1597  *
1598  * Note: we don't care for Ack packets overtaking P_DATA packets.
1599  *
1600  * In case packet_seq is larger than mdev->peer_seq number, there are
1601  * outstanding packets on the msock. We wait for them to arrive.
1602  * In case we are the logically next packet, we update mdev->peer_seq
1603  * ourselves. Correctly handles 32bit wrap around.
1604  *
1605  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1606  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1607  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1608  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1609  *
1610  * returns 0 if we may process the packet,
1611  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1612 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1613 {
1614         DEFINE_WAIT(wait);
1615         unsigned int p_seq;
1616         long timeout;
1617         int ret = 0;
1618         spin_lock(&mdev->peer_seq_lock);
1619         for (;;) {
1620                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1621                 if (seq_le(packet_seq, mdev->peer_seq+1))
1622                         break;
1623                 if (signal_pending(current)) {
1624                         ret = -ERESTARTSYS;
1625                         break;
1626                 }
1627                 p_seq = mdev->peer_seq;
1628                 spin_unlock(&mdev->peer_seq_lock);
1629                 timeout = schedule_timeout(30*HZ);
1630                 spin_lock(&mdev->peer_seq_lock);
1631                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1632                         ret = -ETIMEDOUT;
1633                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1634                         break;
1635                 }
1636         }
1637         finish_wait(&mdev->seq_wait, &wait);
1638         if (mdev->peer_seq+1 == packet_seq)
1639                 mdev->peer_seq++;
1640         spin_unlock(&mdev->peer_seq_lock);
1641         return ret;
1642 }
1643
1644 /* see also bio_flags_to_wire()
1645  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1646  * flags and back. We may replicate to other kernel versions. */
1647 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1648 {
1649         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1650                 (dpf & DP_FUA ? REQ_FUA : 0) |
1651                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1652                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1653 }
1654
1655 /* mirrored write */
1656 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1657 {
1658         sector_t sector;
1659         struct drbd_epoch_entry *e;
1660         struct p_data *p = &mdev->data.rbuf.data;
1661         int rw = WRITE;
1662         u32 dp_flags;
1663
1664         if (!get_ldev(mdev)) {
1665                 spin_lock(&mdev->peer_seq_lock);
1666                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1667                         mdev->peer_seq++;
1668                 spin_unlock(&mdev->peer_seq_lock);
1669
1670                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1671                 atomic_inc(&mdev->current_epoch->epoch_size);
1672                 return drbd_drain_block(mdev, data_size);
1673         }
1674
1675         /* get_ldev(mdev) successful.
1676          * Corresponding put_ldev done either below (on various errors),
1677          * or in drbd_endio_write_sec, if we successfully submit the data at
1678          * the end of this function. */
1679
1680         sector = be64_to_cpu(p->sector);
1681         e = read_in_block(mdev, p->block_id, sector, data_size);
1682         if (!e) {
1683                 put_ldev(mdev);
1684                 return false;
1685         }
1686
1687         e->w.cb = e_end_block;
1688
1689         dp_flags = be32_to_cpu(p->dp_flags);
1690         rw |= wire_flags_to_bio(mdev, dp_flags);
1691
1692         if (dp_flags & DP_MAY_SET_IN_SYNC)
1693                 e->flags |= EE_MAY_SET_IN_SYNC;
1694
1695         spin_lock(&mdev->epoch_lock);
1696         e->epoch = mdev->current_epoch;
1697         atomic_inc(&e->epoch->epoch_size);
1698         atomic_inc(&e->epoch->active);
1699         spin_unlock(&mdev->epoch_lock);
1700
1701         /* I'm the receiver, I do hold a net_cnt reference. */
1702         if (!mdev->net_conf->two_primaries) {
1703                 spin_lock_irq(&mdev->req_lock);
1704         } else {
1705                 /* don't get the req_lock yet,
1706                  * we may sleep in drbd_wait_peer_seq */
1707                 const int size = e->size;
1708                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1709                 DEFINE_WAIT(wait);
1710                 struct drbd_request *i;
1711                 struct hlist_node *n;
1712                 struct hlist_head *slot;
1713                 int first;
1714
1715                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1716                 BUG_ON(mdev->ee_hash == NULL);
1717                 BUG_ON(mdev->tl_hash == NULL);
1718
1719                 /* conflict detection and handling:
1720                  * 1. wait on the sequence number,
1721                  *    in case this data packet overtook ACK packets.
1722                  * 2. check our hash tables for conflicting requests.
1723                  *    we only need to walk the tl_hash, since an ee can not
1724                  *    have a conflict with an other ee: on the submitting
1725                  *    node, the corresponding req had already been conflicting,
1726                  *    and a conflicting req is never sent.
1727                  *
1728                  * Note: for two_primaries, we are protocol C,
1729                  * so there cannot be any request that is DONE
1730                  * but still on the transfer log.
1731                  *
1732                  * unconditionally add to the ee_hash.
1733                  *
1734                  * if no conflicting request is found:
1735                  *    submit.
1736                  *
1737                  * if any conflicting request is found
1738                  * that has not yet been acked,
1739                  * AND I have the "discard concurrent writes" flag:
1740                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1741                  *
1742                  * if any conflicting request is found:
1743                  *       block the receiver, waiting on misc_wait
1744                  *       until no more conflicting requests are there,
1745                  *       or we get interrupted (disconnect).
1746                  *
1747                  *       we do not just write after local io completion of those
1748                  *       requests, but only after req is done completely, i.e.
1749                  *       we wait for the P_DISCARD_ACK to arrive!
1750                  *
1751                  *       then proceed normally, i.e. submit.
1752                  */
1753                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1754                         goto out_interrupted;
1755
1756                 spin_lock_irq(&mdev->req_lock);
1757
1758                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1759
1760 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1761                 slot = tl_hash_slot(mdev, sector);
1762                 first = 1;
1763                 for (;;) {
1764                         int have_unacked = 0;
1765                         int have_conflict = 0;
1766                         prepare_to_wait(&mdev->misc_wait, &wait,
1767                                 TASK_INTERRUPTIBLE);
1768                         hlist_for_each_entry(i, n, slot, colision) {
1769                                 if (OVERLAPS) {
1770                                         /* only ALERT on first iteration,
1771                                          * we may be woken up early... */
1772                                         if (first)
1773                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1774                                                       " new: %llus +%u; pending: %llus +%u\n",
1775                                                       current->comm, current->pid,
1776                                                       (unsigned long long)sector, size,
1777                                                       (unsigned long long)i->sector, i->size);
1778                                         if (i->rq_state & RQ_NET_PENDING)
1779                                                 ++have_unacked;
1780                                         ++have_conflict;
1781                                 }
1782                         }
1783 #undef OVERLAPS
1784                         if (!have_conflict)
1785                                 break;
1786
1787                         /* Discard Ack only for the _first_ iteration */
1788                         if (first && discard && have_unacked) {
1789                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1790                                      (unsigned long long)sector);
1791                                 inc_unacked(mdev);
1792                                 e->w.cb = e_send_discard_ack;
1793                                 list_add_tail(&e->w.list, &mdev->done_ee);
1794
1795                                 spin_unlock_irq(&mdev->req_lock);
1796
1797                                 /* we could probably send that P_DISCARD_ACK ourselves,
1798                                  * but I don't like the receiver using the msock */
1799
1800                                 put_ldev(mdev);
1801                                 wake_asender(mdev);
1802                                 finish_wait(&mdev->misc_wait, &wait);
1803                                 return true;
1804                         }
1805
1806                         if (signal_pending(current)) {
1807                                 hlist_del_init(&e->colision);
1808
1809                                 spin_unlock_irq(&mdev->req_lock);
1810
1811                                 finish_wait(&mdev->misc_wait, &wait);
1812                                 goto out_interrupted;
1813                         }
1814
1815                         spin_unlock_irq(&mdev->req_lock);
1816                         if (first) {
1817                                 first = 0;
1818                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1819                                      "sec=%llus\n", (unsigned long long)sector);
1820                         } else if (discard) {
1821                                 /* we had none on the first iteration.
1822                                  * there must be none now. */
1823                                 D_ASSERT(have_unacked == 0);
1824                         }
1825                         schedule();
1826                         spin_lock_irq(&mdev->req_lock);
1827                 }
1828                 finish_wait(&mdev->misc_wait, &wait);
1829         }
1830
1831         list_add(&e->w.list, &mdev->active_ee);
1832         spin_unlock_irq(&mdev->req_lock);
1833
1834         switch (mdev->net_conf->wire_protocol) {
1835         case DRBD_PROT_C:
1836                 inc_unacked(mdev);
1837                 /* corresponding dec_unacked() in e_end_block()
1838                  * respective _drbd_clear_done_ee */
1839                 break;
1840         case DRBD_PROT_B:
1841                 /* I really don't like it that the receiver thread
1842                  * sends on the msock, but anyways */
1843                 drbd_send_ack(mdev, P_RECV_ACK, e);
1844                 break;
1845         case DRBD_PROT_A:
1846                 /* nothing to do */
1847                 break;
1848         }
1849
1850         if (mdev->state.pdsk < D_INCONSISTENT) {
1851                 /* In case we have the only disk of the cluster, */
1852                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1853                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1854                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1855                 drbd_al_begin_io(mdev, e->sector);
1856         }
1857
1858         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1859                 return true;
1860
1861         /* don't care for the reason here */
1862         dev_err(DEV, "submit failed, triggering re-connect\n");
1863         spin_lock_irq(&mdev->req_lock);
1864         list_del(&e->w.list);
1865         hlist_del_init(&e->colision);
1866         spin_unlock_irq(&mdev->req_lock);
1867         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1868                 drbd_al_complete_io(mdev, e->sector);
1869
1870 out_interrupted:
1871         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1872         put_ldev(mdev);
1873         drbd_free_ee(mdev, e);
1874         return false;
1875 }
1876
1877 /* We may throttle resync, if the lower device seems to be busy,
1878  * and current sync rate is above c_min_rate.
1879  *
1880  * To decide whether or not the lower device is busy, we use a scheme similar
1881  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1882  * (more than 64 sectors) of activity we cannot account for with our own resync
1883  * activity, it obviously is "busy".
1884  *
1885  * The current sync rate used here uses only the most recent two step marks,
1886  * to have a short time average so we can react faster.
1887  */
1888 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1889 {
1890         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1891         unsigned long db, dt, dbdt;
1892         struct lc_element *tmp;
1893         int curr_events;
1894         int throttle = 0;
1895
1896         /* feature disabled? */
1897         if (mdev->sync_conf.c_min_rate == 0)
1898                 return 0;
1899
1900         spin_lock_irq(&mdev->al_lock);
1901         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1902         if (tmp) {
1903                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1904                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1905                         spin_unlock_irq(&mdev->al_lock);
1906                         return 0;
1907                 }
1908                 /* Do not slow down if app IO is already waiting for this extent */
1909         }
1910         spin_unlock_irq(&mdev->al_lock);
1911
1912         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1913                       (int)part_stat_read(&disk->part0, sectors[1]) -
1914                         atomic_read(&mdev->rs_sect_ev);
1915
1916         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1917                 unsigned long rs_left;
1918                 int i;
1919
1920                 mdev->rs_last_events = curr_events;
1921
1922                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1923                  * approx. */
1924                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1925
1926                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1927                         rs_left = mdev->ov_left;
1928                 else
1929                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1930
1931                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1932                 if (!dt)
1933                         dt++;
1934                 db = mdev->rs_mark_left[i] - rs_left;
1935                 dbdt = Bit2KB(db/dt);
1936
1937                 if (dbdt > mdev->sync_conf.c_min_rate)
1938                         throttle = 1;
1939         }
1940         return throttle;
1941 }
1942
1943
1944 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1945 {
1946         sector_t sector;
1947         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1948         struct drbd_epoch_entry *e;
1949         struct digest_info *di = NULL;
1950         int size, verb;
1951         unsigned int fault_type;
1952         struct p_block_req *p = &mdev->data.rbuf.block_req;
1953
1954         sector = be64_to_cpu(p->sector);
1955         size   = be32_to_cpu(p->blksize);
1956
1957         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1958                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1959                                 (unsigned long long)sector, size);
1960                 return false;
1961         }
1962         if (sector + (size>>9) > capacity) {
1963                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1964                                 (unsigned long long)sector, size);
1965                 return false;
1966         }
1967
1968         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1969                 verb = 1;
1970                 switch (cmd) {
1971                 case P_DATA_REQUEST:
1972                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1973                         break;
1974                 case P_RS_DATA_REQUEST:
1975                 case P_CSUM_RS_REQUEST:
1976                 case P_OV_REQUEST:
1977                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1978                         break;
1979                 case P_OV_REPLY:
1980                         verb = 0;
1981                         dec_rs_pending(mdev);
1982                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1983                         break;
1984                 default:
1985                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1986                                 cmdname(cmd));
1987                 }
1988                 if (verb && __ratelimit(&drbd_ratelimit_state))
1989                         dev_err(DEV, "Can not satisfy peer's read request, "
1990                             "no local data.\n");
1991
1992                 /* drain possibly payload */
1993                 return drbd_drain_block(mdev, digest_size);
1994         }
1995
1996         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1997          * "criss-cross" setup, that might cause write-out on some other DRBD,
1998          * which in turn might block on the other node at this very place.  */
1999         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2000         if (!e) {
2001                 put_ldev(mdev);
2002                 return false;
2003         }
2004
2005         switch (cmd) {
2006         case P_DATA_REQUEST:
2007                 e->w.cb = w_e_end_data_req;
2008                 fault_type = DRBD_FAULT_DT_RD;
2009                 /* application IO, don't drbd_rs_begin_io */
2010                 goto submit;
2011
2012         case P_RS_DATA_REQUEST:
2013                 e->w.cb = w_e_end_rsdata_req;
2014                 fault_type = DRBD_FAULT_RS_RD;
2015                 /* used in the sector offset progress display */
2016                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2017                 break;
2018
2019         case P_OV_REPLY:
2020         case P_CSUM_RS_REQUEST:
2021                 fault_type = DRBD_FAULT_RS_RD;
2022                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2023                 if (!di)
2024                         goto out_free_e;
2025
2026                 di->digest_size = digest_size;
2027                 di->digest = (((char *)di)+sizeof(struct digest_info));
2028
2029                 e->digest = di;
2030                 e->flags |= EE_HAS_DIGEST;
2031
2032                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2033                         goto out_free_e;
2034
2035                 if (cmd == P_CSUM_RS_REQUEST) {
2036                         D_ASSERT(mdev->agreed_pro_version >= 89);
2037                         e->w.cb = w_e_end_csum_rs_req;
2038                         /* used in the sector offset progress display */
2039                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2040                 } else if (cmd == P_OV_REPLY) {
2041                         /* track progress, we may need to throttle */
2042                         atomic_add(size >> 9, &mdev->rs_sect_in);
2043                         e->w.cb = w_e_end_ov_reply;
2044                         dec_rs_pending(mdev);
2045                         /* drbd_rs_begin_io done when we sent this request,
2046                          * but accounting still needs to be done. */
2047                         goto submit_for_resync;
2048                 }
2049                 break;
2050
2051         case P_OV_REQUEST:
2052                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2053                     mdev->agreed_pro_version >= 90) {
2054                         unsigned long now = jiffies;
2055                         int i;
2056                         mdev->ov_start_sector = sector;
2057                         mdev->ov_position = sector;
2058                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2059                         mdev->rs_total = mdev->ov_left;
2060                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2061                                 mdev->rs_mark_left[i] = mdev->ov_left;
2062                                 mdev->rs_mark_time[i] = now;
2063                         }
2064                         dev_info(DEV, "Online Verify start sector: %llu\n",
2065                                         (unsigned long long)sector);
2066                 }
2067                 e->w.cb = w_e_end_ov_req;
2068                 fault_type = DRBD_FAULT_RS_RD;
2069                 break;
2070
2071         default:
2072                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2073                     cmdname(cmd));
2074                 fault_type = DRBD_FAULT_MAX;
2075                 goto out_free_e;
2076         }
2077
2078         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2079          * wrt the receiver, but it is not as straightforward as it may seem.
2080          * Various places in the resync start and stop logic assume resync
2081          * requests are processed in order, requeuing this on the worker thread
2082          * introduces a bunch of new code for synchronization between threads.
2083          *
2084          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2085          * "forever", throttling after drbd_rs_begin_io will lock that extent
2086          * for application writes for the same time.  For now, just throttle
2087          * here, where the rest of the code expects the receiver to sleep for
2088          * a while, anyways.
2089          */
2090
2091         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2092          * this defers syncer requests for some time, before letting at least
2093          * on request through.  The resync controller on the receiving side
2094          * will adapt to the incoming rate accordingly.
2095          *
2096          * We cannot throttle here if remote is Primary/SyncTarget:
2097          * we would also throttle its application reads.
2098          * In that case, throttling is done on the SyncTarget only.
2099          */
2100         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2101                 schedule_timeout_uninterruptible(HZ/10);
2102         if (drbd_rs_begin_io(mdev, sector))
2103                 goto out_free_e;
2104
2105 submit_for_resync:
2106         atomic_add(size >> 9, &mdev->rs_sect_ev);
2107
2108 submit:
2109         inc_unacked(mdev);
2110         spin_lock_irq(&mdev->req_lock);
2111         list_add_tail(&e->w.list, &mdev->read_ee);
2112         spin_unlock_irq(&mdev->req_lock);
2113
2114         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2115                 return true;
2116
2117         /* don't care for the reason here */
2118         dev_err(DEV, "submit failed, triggering re-connect\n");
2119         spin_lock_irq(&mdev->req_lock);
2120         list_del(&e->w.list);
2121         spin_unlock_irq(&mdev->req_lock);
2122         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2123
2124 out_free_e:
2125         put_ldev(mdev);
2126         drbd_free_ee(mdev, e);
2127         return false;
2128 }
2129
2130 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2131 {
2132         int self, peer, rv = -100;
2133         unsigned long ch_self, ch_peer;
2134
2135         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2136         peer = mdev->p_uuid[UI_BITMAP] & 1;
2137
2138         ch_peer = mdev->p_uuid[UI_SIZE];
2139         ch_self = mdev->comm_bm_set;
2140
2141         switch (mdev->net_conf->after_sb_0p) {
2142         case ASB_CONSENSUS:
2143         case ASB_DISCARD_SECONDARY:
2144         case ASB_CALL_HELPER:
2145                 dev_err(DEV, "Configuration error.\n");
2146                 break;
2147         case ASB_DISCONNECT:
2148                 break;
2149         case ASB_DISCARD_YOUNGER_PRI:
2150                 if (self == 0 && peer == 1) {
2151                         rv = -1;
2152                         break;
2153                 }
2154                 if (self == 1 && peer == 0) {
2155                         rv =  1;
2156                         break;
2157                 }
2158                 /* Else fall through to one of the other strategies... */
2159         case ASB_DISCARD_OLDER_PRI:
2160                 if (self == 0 && peer == 1) {
2161                         rv = 1;
2162                         break;
2163                 }
2164                 if (self == 1 && peer == 0) {
2165                         rv = -1;
2166                         break;
2167                 }
2168                 /* Else fall through to one of the other strategies... */
2169                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2170                      "Using discard-least-changes instead\n");
2171         case ASB_DISCARD_ZERO_CHG:
2172                 if (ch_peer == 0 && ch_self == 0) {
2173                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2174                                 ? -1 : 1;
2175                         break;
2176                 } else {
2177                         if (ch_peer == 0) { rv =  1; break; }
2178                         if (ch_self == 0) { rv = -1; break; }
2179                 }
2180                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2181                         break;
2182         case ASB_DISCARD_LEAST_CHG:
2183                 if      (ch_self < ch_peer)
2184                         rv = -1;
2185                 else if (ch_self > ch_peer)
2186                         rv =  1;
2187                 else /* ( ch_self == ch_peer ) */
2188                      /* Well, then use something else. */
2189                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2190                                 ? -1 : 1;
2191                 break;
2192         case ASB_DISCARD_LOCAL:
2193                 rv = -1;
2194                 break;
2195         case ASB_DISCARD_REMOTE:
2196                 rv =  1;
2197         }
2198
2199         return rv;
2200 }
2201
2202 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2203 {
2204         int hg, rv = -100;
2205
2206         switch (mdev->net_conf->after_sb_1p) {
2207         case ASB_DISCARD_YOUNGER_PRI:
2208         case ASB_DISCARD_OLDER_PRI:
2209         case ASB_DISCARD_LEAST_CHG:
2210         case ASB_DISCARD_LOCAL:
2211         case ASB_DISCARD_REMOTE:
2212                 dev_err(DEV, "Configuration error.\n");
2213                 break;
2214         case ASB_DISCONNECT:
2215                 break;
2216         case ASB_CONSENSUS:
2217                 hg = drbd_asb_recover_0p(mdev);
2218                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2219                         rv = hg;
2220                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2221                         rv = hg;
2222                 break;
2223         case ASB_VIOLENTLY:
2224                 rv = drbd_asb_recover_0p(mdev);
2225                 break;
2226         case ASB_DISCARD_SECONDARY:
2227                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2228         case ASB_CALL_HELPER:
2229                 hg = drbd_asb_recover_0p(mdev);
2230                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2231                         enum drbd_state_rv rv2;
2232
2233                         drbd_set_role(mdev, R_SECONDARY, 0);
2234                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2235                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2236                           * we do not need to wait for the after state change work either. */
2237                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2238                         if (rv2 != SS_SUCCESS) {
2239                                 drbd_khelper(mdev, "pri-lost-after-sb");
2240                         } else {
2241                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2242                                 rv = hg;
2243                         }
2244                 } else
2245                         rv = hg;
2246         }
2247
2248         return rv;
2249 }
2250
2251 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2252 {
2253         int hg, rv = -100;
2254
2255         switch (mdev->net_conf->after_sb_2p) {
2256         case ASB_DISCARD_YOUNGER_PRI:
2257         case ASB_DISCARD_OLDER_PRI:
2258         case ASB_DISCARD_LEAST_CHG:
2259         case ASB_DISCARD_LOCAL:
2260         case ASB_DISCARD_REMOTE:
2261         case ASB_CONSENSUS:
2262         case ASB_DISCARD_SECONDARY:
2263                 dev_err(DEV, "Configuration error.\n");
2264                 break;
2265         case ASB_VIOLENTLY:
2266                 rv = drbd_asb_recover_0p(mdev);
2267                 break;
2268         case ASB_DISCONNECT:
2269                 break;
2270         case ASB_CALL_HELPER:
2271                 hg = drbd_asb_recover_0p(mdev);
2272                 if (hg == -1) {
2273                         enum drbd_state_rv rv2;
2274
2275                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2277                           * we do not need to wait for the after state change work either. */
2278                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2279                         if (rv2 != SS_SUCCESS) {
2280                                 drbd_khelper(mdev, "pri-lost-after-sb");
2281                         } else {
2282                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2283                                 rv = hg;
2284                         }
2285                 } else
2286                         rv = hg;
2287         }
2288
2289         return rv;
2290 }
2291
2292 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2293                            u64 bits, u64 flags)
2294 {
2295         if (!uuid) {
2296                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2297                 return;
2298         }
2299         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2300              text,
2301              (unsigned long long)uuid[UI_CURRENT],
2302              (unsigned long long)uuid[UI_BITMAP],
2303              (unsigned long long)uuid[UI_HISTORY_START],
2304              (unsigned long long)uuid[UI_HISTORY_END],
2305              (unsigned long long)bits,
2306              (unsigned long long)flags);
2307 }
2308
2309 /*
2310   100   after split brain try auto recover
2311     2   C_SYNC_SOURCE set BitMap
2312     1   C_SYNC_SOURCE use BitMap
2313     0   no Sync
2314    -1   C_SYNC_TARGET use BitMap
2315    -2   C_SYNC_TARGET set BitMap
2316  -100   after split brain, disconnect
2317 -1000   unrelated data
2318 -1091   requires proto 91
2319 -1096   requires proto 96
2320  */
2321 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2322 {
2323         u64 self, peer;
2324         int i, j;
2325
2326         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2327         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2328
2329         *rule_nr = 10;
2330         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2331                 return 0;
2332
2333         *rule_nr = 20;
2334         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2335              peer != UUID_JUST_CREATED)
2336                 return -2;
2337
2338         *rule_nr = 30;
2339         if (self != UUID_JUST_CREATED &&
2340             (peer == UUID_JUST_CREATED || peer == (u64)0))
2341                 return 2;
2342
2343         if (self == peer) {
2344                 int rct, dc; /* roles at crash time */
2345
2346                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2347
2348                         if (mdev->agreed_pro_version < 91)
2349                                 return -1091;
2350
2351                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2352                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2353                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2354                                 drbd_uuid_set_bm(mdev, 0UL);
2355
2356                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2357                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2358                                 *rule_nr = 34;
2359                         } else {
2360                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2361                                 *rule_nr = 36;
2362                         }
2363
2364                         return 1;
2365                 }
2366
2367                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2368
2369                         if (mdev->agreed_pro_version < 91)
2370                                 return -1091;
2371
2372                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2373                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2374                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2375
2376                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2377                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2378                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2379
2380                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2381                                 *rule_nr = 35;
2382                         } else {
2383                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2384                                 *rule_nr = 37;
2385                         }
2386
2387                         return -1;
2388                 }
2389
2390                 /* Common power [off|failure] */
2391                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2392                         (mdev->p_uuid[UI_FLAGS] & 2);
2393                 /* lowest bit is set when we were primary,
2394                  * next bit (weight 2) is set when peer was primary */
2395                 *rule_nr = 40;
2396
2397                 switch (rct) {
2398                 case 0: /* !self_pri && !peer_pri */ return 0;
2399                 case 1: /*  self_pri && !peer_pri */ return 1;
2400                 case 2: /* !self_pri &&  peer_pri */ return -1;
2401                 case 3: /*  self_pri &&  peer_pri */
2402                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2403                         return dc ? -1 : 1;
2404                 }
2405         }
2406
2407         *rule_nr = 50;
2408         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2409         if (self == peer)
2410                 return -1;
2411
2412         *rule_nr = 51;
2413         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2414         if (self == peer) {
2415                 if (mdev->agreed_pro_version < 96 ?
2416                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2417                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2418                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2419                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2420                            resync as sync source modifications of the peer's UUIDs. */
2421
2422                         if (mdev->agreed_pro_version < 91)
2423                                 return -1091;
2424
2425                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2426                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2427
2428                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2429                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2430
2431                         return -1;
2432                 }
2433         }
2434
2435         *rule_nr = 60;
2436         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2437         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2438                 peer = mdev->p_uuid[i] & ~((u64)1);
2439                 if (self == peer)
2440                         return -2;
2441         }
2442
2443         *rule_nr = 70;
2444         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2445         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2446         if (self == peer)
2447                 return 1;
2448
2449         *rule_nr = 71;
2450         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2451         if (self == peer) {
2452                 if (mdev->agreed_pro_version < 96 ?
2453                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2454                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2455                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2456                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2457                            resync as sync source modifications of our UUIDs. */
2458
2459                         if (mdev->agreed_pro_version < 91)
2460                                 return -1091;
2461
2462                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2463                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2464
2465                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2466                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2467                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2468
2469                         return 1;
2470                 }
2471         }
2472
2473
2474         *rule_nr = 80;
2475         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2476         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2477                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2478                 if (self == peer)
2479                         return 2;
2480         }
2481
2482         *rule_nr = 90;
2483         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2484         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2485         if (self == peer && self != ((u64)0))
2486                 return 100;
2487
2488         *rule_nr = 100;
2489         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2490                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2491                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2492                         peer = mdev->p_uuid[j] & ~((u64)1);
2493                         if (self == peer)
2494                                 return -100;
2495                 }
2496         }
2497
2498         return -1000;
2499 }
2500
2501 /* drbd_sync_handshake() returns the new conn state on success, or
2502    CONN_MASK (-1) on failure.
2503  */
2504 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2505                                            enum drbd_disk_state peer_disk) __must_hold(local)
2506 {
2507         int hg, rule_nr;
2508         enum drbd_conns rv = C_MASK;
2509         enum drbd_disk_state mydisk;
2510
2511         mydisk = mdev->state.disk;
2512         if (mydisk == D_NEGOTIATING)
2513                 mydisk = mdev->new_state_tmp.disk;
2514
2515         dev_info(DEV, "drbd_sync_handshake:\n");
2516         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2517         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2518                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2519
2520         hg = drbd_uuid_compare(mdev, &rule_nr);
2521
2522         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2523
2524         if (hg == -1000) {
2525                 dev_alert(DEV, "Unrelated data, aborting!\n");
2526                 return C_MASK;
2527         }
2528         if (hg < -1000) {
2529                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2530                 return C_MASK;
2531         }
2532
2533         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2534             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2535                 int f = (hg == -100) || abs(hg) == 2;
2536                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2537                 if (f)
2538                         hg = hg*2;
2539                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2540                      hg > 0 ? "source" : "target");
2541         }
2542
2543         if (abs(hg) == 100)
2544                 drbd_khelper(mdev, "initial-split-brain");
2545
2546         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2547                 int pcount = (mdev->state.role == R_PRIMARY)
2548                            + (peer_role == R_PRIMARY);
2549                 int forced = (hg == -100);
2550
2551                 switch (pcount) {
2552                 case 0:
2553                         hg = drbd_asb_recover_0p(mdev);
2554                         break;
2555                 case 1:
2556                         hg = drbd_asb_recover_1p(mdev);
2557                         break;
2558                 case 2:
2559                         hg = drbd_asb_recover_2p(mdev);
2560                         break;
2561                 }
2562                 if (abs(hg) < 100) {
2563                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2564                              "automatically solved. Sync from %s node\n",
2565                              pcount, (hg < 0) ? "peer" : "this");
2566                         if (forced) {
2567                                 dev_warn(DEV, "Doing a full sync, since"
2568                                      " UUIDs where ambiguous.\n");
2569                                 hg = hg*2;
2570                         }
2571                 }
2572         }
2573
2574         if (hg == -100) {
2575                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2576                         hg = -1;
2577                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2578                         hg = 1;
2579
2580                 if (abs(hg) < 100)
2581                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2582                              "Sync from %s node\n",
2583                              (hg < 0) ? "peer" : "this");
2584         }
2585
2586         if (hg == -100) {
2587                 /* FIXME this log message is not correct if we end up here
2588                  * after an attempted attach on a diskless node.
2589                  * We just refuse to attach -- well, we drop the "connection"
2590                  * to that disk, in a way... */
2591                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2592                 drbd_khelper(mdev, "split-brain");
2593                 return C_MASK;
2594         }
2595
2596         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2597                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2598                 return C_MASK;
2599         }
2600
2601         if (hg < 0 && /* by intention we do not use mydisk here. */
2602             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2603                 switch (mdev->net_conf->rr_conflict) {
2604                 case ASB_CALL_HELPER:
2605                         drbd_khelper(mdev, "pri-lost");
2606                         /* fall through */
2607                 case ASB_DISCONNECT:
2608                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2609                         return C_MASK;
2610                 case ASB_VIOLENTLY:
2611                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2612                              "assumption\n");
2613                 }
2614         }
2615
2616         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2617                 if (hg == 0)
2618                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2619                 else
2620                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2621                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2622                                  abs(hg) >= 2 ? "full" : "bit-map based");
2623                 return C_MASK;
2624         }
2625
2626         if (abs(hg) >= 2) {
2627                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2628                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2629                                         BM_LOCKED_SET_ALLOWED))
2630                         return C_MASK;
2631         }
2632
2633         if (hg > 0) { /* become sync source. */
2634                 rv = C_WF_BITMAP_S;
2635         } else if (hg < 0) { /* become sync target */
2636                 rv = C_WF_BITMAP_T;
2637         } else {
2638                 rv = C_CONNECTED;
2639                 if (drbd_bm_total_weight(mdev)) {
2640                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2641                              drbd_bm_total_weight(mdev));
2642                 }
2643         }
2644
2645         return rv;
2646 }
2647
2648 /* returns 1 if invalid */
2649 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2650 {
2651         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2652         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2653             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2654                 return 0;
2655
2656         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2657         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2658             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2659                 return 1;
2660
2661         /* everything else is valid if they are equal on both sides. */
2662         if (peer == self)
2663                 return 0;
2664
2665         /* everything es is invalid. */
2666         return 1;
2667 }
2668
2669 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2670 {
2671         struct p_protocol *p = &mdev->data.rbuf.protocol;
2672         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2673         int p_want_lose, p_two_primaries, cf;
2674         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2675
2676         p_proto         = be32_to_cpu(p->protocol);
2677         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2678         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2679         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2680         p_two_primaries = be32_to_cpu(p->two_primaries);
2681         cf              = be32_to_cpu(p->conn_flags);
2682         p_want_lose = cf & CF_WANT_LOSE;
2683
2684         clear_bit(CONN_DRY_RUN, &mdev->flags);
2685
2686         if (cf & CF_DRY_RUN)
2687                 set_bit(CONN_DRY_RUN, &mdev->flags);
2688
2689         if (p_proto != mdev->net_conf->wire_protocol) {
2690                 dev_err(DEV, "incompatible communication protocols\n");
2691                 goto disconnect;
2692         }
2693
2694         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2695                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2696                 goto disconnect;
2697         }
2698
2699         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2700                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2701                 goto disconnect;
2702         }
2703
2704         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2705                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2706                 goto disconnect;
2707         }
2708
2709         if (p_want_lose && mdev->net_conf->want_lose) {
2710                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2711                 goto disconnect;
2712         }
2713
2714         if (p_two_primaries != mdev->net_conf->two_primaries) {
2715                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2716                 goto disconnect;
2717         }
2718
2719         if (mdev->agreed_pro_version >= 87) {
2720                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2721
2722                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2723                         return false;
2724
2725                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2726                 if (strcmp(p_integrity_alg, my_alg)) {
2727                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2728                         goto disconnect;
2729                 }
2730                 dev_info(DEV, "data-integrity-alg: %s\n",
2731                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2732         }
2733
2734         return true;
2735
2736 disconnect:
2737         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2738         return false;
2739 }
2740
2741 /* helper function
2742  * input: alg name, feature name
2743  * return: NULL (alg name was "")
2744  *         ERR_PTR(error) if something goes wrong
2745  *         or the crypto hash ptr, if it worked out ok. */
2746 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2747                 const char *alg, const char *name)
2748 {
2749         struct crypto_hash *tfm;
2750
2751         if (!alg[0])
2752                 return NULL;
2753
2754         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2755         if (IS_ERR(tfm)) {
2756                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2757                         alg, name, PTR_ERR(tfm));
2758                 return tfm;
2759         }
2760         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2761                 crypto_free_hash(tfm);
2762                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2763                 return ERR_PTR(-EINVAL);
2764         }
2765         return tfm;
2766 }
2767
2768 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2769 {
2770         int ok = true;
2771         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2772         unsigned int header_size, data_size, exp_max_sz;
2773         struct crypto_hash *verify_tfm = NULL;
2774         struct crypto_hash *csums_tfm = NULL;
2775         const int apv = mdev->agreed_pro_version;
2776         int *rs_plan_s = NULL;
2777         int fifo_size = 0;
2778
2779         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2780                     : apv == 88 ? sizeof(struct p_rs_param)
2781                                         + SHARED_SECRET_MAX
2782                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2783                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2784
2785         if (packet_size > exp_max_sz) {
2786                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2787                     packet_size, exp_max_sz);
2788                 return false;
2789         }
2790
2791         if (apv <= 88) {
2792                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2793                 data_size   = packet_size  - header_size;
2794         } else if (apv <= 94) {
2795                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2796                 data_size   = packet_size  - header_size;
2797                 D_ASSERT(data_size == 0);
2798         } else {
2799                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2800                 data_size   = packet_size  - header_size;
2801                 D_ASSERT(data_size == 0);
2802         }
2803
2804         /* initialize verify_alg and csums_alg */
2805         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2806
2807         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2808                 return false;
2809
2810         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2811
2812         if (apv >= 88) {
2813                 if (apv == 88) {
2814                         if (data_size > SHARED_SECRET_MAX) {
2815                                 dev_err(DEV, "verify-alg too long, "
2816                                     "peer wants %u, accepting only %u byte\n",
2817                                                 data_size, SHARED_SECRET_MAX);
2818                                 return false;
2819                         }
2820
2821                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2822                                 return false;
2823
2824                         /* we expect NUL terminated string */
2825                         /* but just in case someone tries to be evil */
2826                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2827                         p->verify_alg[data_size-1] = 0;
2828
2829                 } else /* apv >= 89 */ {
2830                         /* we still expect NUL terminated strings */
2831                         /* but just in case someone tries to be evil */
2832                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2833                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2834                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2835                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2836                 }
2837
2838                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2839                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2840                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2841                                     mdev->sync_conf.verify_alg, p->verify_alg);
2842                                 goto disconnect;
2843                         }
2844                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2845                                         p->verify_alg, "verify-alg");
2846                         if (IS_ERR(verify_tfm)) {
2847                                 verify_tfm = NULL;
2848                                 goto disconnect;
2849                         }
2850                 }
2851
2852                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2853                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2854                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2855                                     mdev->sync_conf.csums_alg, p->csums_alg);
2856                                 goto disconnect;
2857                         }
2858                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2859                                         p->csums_alg, "csums-alg");
2860                         if (IS_ERR(csums_tfm)) {
2861                                 csums_tfm = NULL;
2862                                 goto disconnect;
2863                         }
2864                 }
2865
2866                 if (apv > 94) {
2867                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2868                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2869                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2870                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2871                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2872
2873                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2874                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2875                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2876                                 if (!rs_plan_s) {
2877                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2878                                         goto disconnect;
2879                                 }
2880                         }
2881                 }
2882
2883                 spin_lock(&mdev->peer_seq_lock);
2884                 /* lock against drbd_nl_syncer_conf() */
2885                 if (verify_tfm) {
2886                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2887                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2888                         crypto_free_hash(mdev->verify_tfm);
2889                         mdev->verify_tfm = verify_tfm;
2890                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2891                 }
2892                 if (csums_tfm) {
2893                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2894                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2895                         crypto_free_hash(mdev->csums_tfm);
2896                         mdev->csums_tfm = csums_tfm;
2897                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2898                 }
2899                 if (fifo_size != mdev->rs_plan_s.size) {
2900                         kfree(mdev->rs_plan_s.values);
2901                         mdev->rs_plan_s.values = rs_plan_s;
2902                         mdev->rs_plan_s.size   = fifo_size;
2903                         mdev->rs_planed = 0;
2904                 }
2905                 spin_unlock(&mdev->peer_seq_lock);
2906         }
2907
2908         return ok;
2909 disconnect:
2910         /* just for completeness: actually not needed,
2911          * as this is not reached if csums_tfm was ok. */
2912         crypto_free_hash(csums_tfm);
2913         /* but free the verify_tfm again, if csums_tfm did not work out */
2914         crypto_free_hash(verify_tfm);
2915         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2916         return false;
2917 }
2918
2919 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2920 {
2921         /* sorry, we currently have no working implementation
2922          * of distributed TCQ */
2923 }
2924
2925 /* warn if the arguments differ by more than 12.5% */
2926 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2927         const char *s, sector_t a, sector_t b)
2928 {
2929         sector_t d;
2930         if (a == 0 || b == 0)
2931                 return;
2932         d = (a > b) ? (a - b) : (b - a);
2933         if (d > (a>>3) || d > (b>>3))
2934                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2935                      (unsigned long long)a, (unsigned long long)b);
2936 }
2937
2938 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2939 {
2940         struct p_sizes *p = &mdev->data.rbuf.sizes;
2941         enum determine_dev_size dd = unchanged;
2942         unsigned int max_bio_size;
2943         sector_t p_size, p_usize, my_usize;
2944         int ldsc = 0; /* local disk size changed */
2945         enum dds_flags ddsf;
2946
2947         p_size = be64_to_cpu(p->d_size);
2948         p_usize = be64_to_cpu(p->u_size);
2949
2950         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2951                 dev_err(DEV, "some backing storage is needed\n");
2952                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2953                 return false;
2954         }
2955
2956         /* just store the peer's disk size for now.
2957          * we still need to figure out whether we accept that. */
2958         mdev->p_size = p_size;
2959
2960         if (get_ldev(mdev)) {
2961                 warn_if_differ_considerably(mdev, "lower level device sizes",
2962                            p_size, drbd_get_max_capacity(mdev->ldev));
2963                 warn_if_differ_considerably(mdev, "user requested size",
2964                                             p_usize, mdev->ldev->dc.disk_size);
2965
2966                 /* if this is the first connect, or an otherwise expected
2967                  * param exchange, choose the minimum */
2968                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2969                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2970                                              p_usize);
2971
2972                 my_usize = mdev->ldev->dc.disk_size;
2973
2974                 if (mdev->ldev->dc.disk_size != p_usize) {
2975                         mdev->ldev->dc.disk_size = p_usize;
2976                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2977                              (unsigned long)mdev->ldev->dc.disk_size);
2978                 }
2979
2980                 /* Never shrink a device with usable data during connect.
2981                    But allow online shrinking if we are connected. */
2982                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2983                    drbd_get_capacity(mdev->this_bdev) &&
2984                    mdev->state.disk >= D_OUTDATED &&
2985                    mdev->state.conn < C_CONNECTED) {
2986                         dev_err(DEV, "The peer's disk size is too small!\n");
2987                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2988                         mdev->ldev->dc.disk_size = my_usize;
2989                         put_ldev(mdev);
2990                         return false;
2991                 }
2992                 put_ldev(mdev);
2993         }
2994
2995         ddsf = be16_to_cpu(p->dds_flags);
2996         if (get_ldev(mdev)) {
2997                 dd = drbd_determin_dev_size(mdev, ddsf);
2998                 put_ldev(mdev);
2999                 if (dd == dev_size_error)
3000                         return false;
3001                 drbd_md_sync(mdev);
3002         } else {
3003                 /* I am diskless, need to accept the peer's size. */
3004                 drbd_set_my_capacity(mdev, p_size);
3005         }
3006
3007         if (get_ldev(mdev)) {
3008                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3009                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3010                         ldsc = 1;
3011                 }
3012
3013                 if (mdev->agreed_pro_version < 94)
3014                         max_bio_size = be32_to_cpu(p->max_bio_size);
3015                 else if (mdev->agreed_pro_version == 94)
3016                         max_bio_size = DRBD_MAX_SIZE_H80_PACKET;
3017                 else /* drbd 8.3.8 onwards */
3018                         max_bio_size = DRBD_MAX_BIO_SIZE;
3019
3020                 if (max_bio_size != queue_max_hw_sectors(mdev->rq_queue) << 9)
3021                         drbd_setup_queue_param(mdev, max_bio_size);
3022
3023                 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3024                 put_ldev(mdev);
3025         }
3026
3027         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3028                 if (be64_to_cpu(p->c_size) !=
3029                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3030                         /* we have different sizes, probably peer
3031                          * needs to know my new size... */
3032                         drbd_send_sizes(mdev, 0, ddsf);
3033                 }
3034                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3035                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3036                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3037                             mdev->state.disk >= D_INCONSISTENT) {
3038                                 if (ddsf & DDSF_NO_RESYNC)
3039                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3040                                 else
3041                                         resync_after_online_grow(mdev);
3042                         } else
3043                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3044                 }
3045         }
3046
3047         return true;
3048 }
3049
3050 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3051 {
3052         struct p_uuids *p = &mdev->data.rbuf.uuids;
3053         u64 *p_uuid;
3054         int i, updated_uuids = 0;
3055
3056         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3057
3058         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3059                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3060
3061         kfree(mdev->p_uuid);
3062         mdev->p_uuid = p_uuid;
3063
3064         if (mdev->state.conn < C_CONNECTED &&
3065             mdev->state.disk < D_INCONSISTENT &&
3066             mdev->state.role == R_PRIMARY &&
3067             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3068                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3069                     (unsigned long long)mdev->ed_uuid);
3070                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3071                 return false;
3072         }
3073
3074         if (get_ldev(mdev)) {
3075                 int skip_initial_sync =
3076                         mdev->state.conn == C_CONNECTED &&
3077                         mdev->agreed_pro_version >= 90 &&
3078                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3079                         (p_uuid[UI_FLAGS] & 8);
3080                 if (skip_initial_sync) {
3081                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3082                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3083                                         "clear_n_write from receive_uuids",
3084                                         BM_LOCKED_TEST_ALLOWED);
3085                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3086                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3087                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3088                                         CS_VERBOSE, NULL);
3089                         drbd_md_sync(mdev);
3090                         updated_uuids = 1;
3091                 }
3092                 put_ldev(mdev);
3093         } else if (mdev->state.disk < D_INCONSISTENT &&
3094                    mdev->state.role == R_PRIMARY) {
3095                 /* I am a diskless primary, the peer just created a new current UUID
3096                    for me. */
3097                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3098         }
3099
3100         /* Before we test for the disk state, we should wait until an eventually
3101            ongoing cluster wide state change is finished. That is important if
3102            we are primary and are detaching from our disk. We need to see the
3103            new disk state... */
3104         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3105         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3106                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3107
3108         if (updated_uuids)
3109                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3110
3111         return true;
3112 }
3113
3114 /**
3115  * convert_state() - Converts the peer's view of the cluster state to our point of view
3116  * @ps:         The state as seen by the peer.
3117  */
3118 static union drbd_state convert_state(union drbd_state ps)
3119 {
3120         union drbd_state ms;
3121
3122         static enum drbd_conns c_tab[] = {
3123                 [C_CONNECTED] = C_CONNECTED,
3124
3125                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3126                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3127                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3128                 [C_VERIFY_S]       = C_VERIFY_T,
3129                 [C_MASK]   = C_MASK,
3130         };
3131
3132         ms.i = ps.i;
3133
3134         ms.conn = c_tab[ps.conn];
3135         ms.peer = ps.role;
3136         ms.role = ps.peer;
3137         ms.pdsk = ps.disk;
3138         ms.disk = ps.pdsk;
3139         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3140
3141         return ms;
3142 }
3143
3144 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3145 {
3146         struct p_req_state *p = &mdev->data.rbuf.req_state;
3147         union drbd_state mask, val;
3148         enum drbd_state_rv rv;
3149
3150         mask.i = be32_to_cpu(p->mask);
3151         val.i = be32_to_cpu(p->val);
3152
3153         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3154             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3155                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3156                 return true;
3157         }
3158
3159         mask = convert_state(mask);
3160         val = convert_state(val);
3161
3162         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3163
3164         drbd_send_sr_reply(mdev, rv);
3165         drbd_md_sync(mdev);
3166
3167         return true;
3168 }
3169
3170 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3171 {
3172         struct p_state *p = &mdev->data.rbuf.state;
3173         union drbd_state os, ns, peer_state;
3174         enum drbd_disk_state real_peer_disk;
3175         enum chg_state_flags cs_flags;
3176         int rv;
3177
3178         peer_state.i = be32_to_cpu(p->state);
3179
3180         real_peer_disk = peer_state.disk;
3181         if (peer_state.disk == D_NEGOTIATING) {
3182                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3183                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3184         }
3185
3186         spin_lock_irq(&mdev->req_lock);
3187  retry:
3188         os = ns = mdev->state;
3189         spin_unlock_irq(&mdev->req_lock);
3190
3191         /* peer says his disk is uptodate, while we think it is inconsistent,
3192          * and this happens while we think we have a sync going on. */
3193         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3194             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3195                 /* If we are (becoming) SyncSource, but peer is still in sync
3196                  * preparation, ignore its uptodate-ness to avoid flapping, it
3197                  * will change to inconsistent once the peer reaches active
3198                  * syncing states.
3199                  * It may have changed syncer-paused flags, however, so we
3200                  * cannot ignore this completely. */
3201                 if (peer_state.conn > C_CONNECTED &&
3202                     peer_state.conn < C_SYNC_SOURCE)
3203                         real_peer_disk = D_INCONSISTENT;
3204
3205                 /* if peer_state changes to connected at the same time,
3206                  * it explicitly notifies us that it finished resync.
3207                  * Maybe we should finish it up, too? */
3208                 else if (os.conn >= C_SYNC_SOURCE &&
3209                          peer_state.conn == C_CONNECTED) {
3210                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3211                                 drbd_resync_finished(mdev);
3212                         return true;
3213                 }
3214         }
3215
3216         /* peer says his disk is inconsistent, while we think it is uptodate,
3217          * and this happens while the peer still thinks we have a sync going on,
3218          * but we think we are already done with the sync.
3219          * We ignore this to avoid flapping pdsk.
3220          * This should not happen, if the peer is a recent version of drbd. */
3221         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3222             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3223                 real_peer_disk = D_UP_TO_DATE;
3224
3225         if (ns.conn == C_WF_REPORT_PARAMS)
3226                 ns.conn = C_CONNECTED;
3227
3228         if (peer_state.conn == C_AHEAD)
3229                 ns.conn = C_BEHIND;
3230
3231         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3232             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3233                 int cr; /* consider resync */
3234
3235                 /* if we established a new connection */
3236                 cr  = (os.conn < C_CONNECTED);
3237                 /* if we had an established connection
3238                  * and one of the nodes newly attaches a disk */
3239                 cr |= (os.conn == C_CONNECTED &&
3240                        (peer_state.disk == D_NEGOTIATING ||
3241                         os.disk == D_NEGOTIATING));
3242                 /* if we have both been inconsistent, and the peer has been
3243                  * forced to be UpToDate with --overwrite-data */
3244                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3245                 /* if we had been plain connected, and the admin requested to
3246                  * start a sync by "invalidate" or "invalidate-remote" */
3247                 cr |= (os.conn == C_CONNECTED &&
3248                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3249                                  peer_state.conn <= C_WF_BITMAP_T));
3250
3251                 if (cr)
3252                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3253
3254                 put_ldev(mdev);
3255                 if (ns.conn == C_MASK) {
3256                         ns.conn = C_CONNECTED;
3257                         if (mdev->state.disk == D_NEGOTIATING) {
3258                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3259                         } else if (peer_state.disk == D_NEGOTIATING) {
3260                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3261                                 peer_state.disk = D_DISKLESS;
3262                                 real_peer_disk = D_DISKLESS;
3263                         } else {
3264                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3265                                         return false;
3266                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3267                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3268                                 return false;
3269                         }
3270                 }
3271         }
3272
3273         spin_lock_irq(&mdev->req_lock);
3274         if (mdev->state.i != os.i)
3275                 goto retry;
3276         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3277         ns.peer = peer_state.role;
3278         ns.pdsk = real_peer_disk;
3279         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3280         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3281                 ns.disk = mdev->new_state_tmp.disk;
3282         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3283         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3284             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3285                 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3286                    for temporal network outages! */
3287                 spin_unlock_irq(&mdev->req_lock);
3288                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3289                 tl_clear(mdev);
3290                 drbd_uuid_new_current(mdev);
3291                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3292                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3293                 return false;
3294         }
3295         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3296         ns = mdev->state;
3297         spin_unlock_irq(&mdev->req_lock);
3298
3299         if (rv < SS_SUCCESS) {
3300                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3301                 return false;
3302         }
3303
3304         if (os.conn > C_WF_REPORT_PARAMS) {
3305                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3306                     peer_state.disk != D_NEGOTIATING ) {
3307                         /* we want resync, peer has not yet decided to sync... */
3308                         /* Nowadays only used when forcing a node into primary role and
3309                            setting its disk to UpToDate with that */
3310                         drbd_send_uuids(mdev);
3311                         drbd_send_state(mdev);
3312                 }
3313         }
3314
3315         mdev->net_conf->want_lose = 0;
3316
3317         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3318
3319         return true;
3320 }
3321
3322 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3323 {
3324         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3325
3326         wait_event(mdev->misc_wait,
3327                    mdev->state.conn == C_WF_SYNC_UUID ||
3328                    mdev->state.conn == C_BEHIND ||
3329                    mdev->state.conn < C_CONNECTED ||
3330                    mdev->state.disk < D_NEGOTIATING);
3331
3332         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3333
3334         /* Here the _drbd_uuid_ functions are right, current should
3335            _not_ be rotated into the history */
3336         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3337                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3338                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3339
3340                 drbd_print_uuids(mdev, "updated sync uuid");
3341                 drbd_start_resync(mdev, C_SYNC_TARGET);
3342
3343                 put_ldev(mdev);
3344         } else
3345                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3346
3347         return true;
3348 }
3349
3350 /**
3351  * receive_bitmap_plain
3352  *
3353  * Return 0 when done, 1 when another iteration is needed, and a negative error
3354  * code upon failure.
3355  */
3356 static int
3357 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3358                      unsigned long *buffer, struct bm_xfer_ctx *c)
3359 {
3360         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3361         unsigned want = num_words * sizeof(long);
3362         int err;
3363
3364         if (want != data_size) {
3365                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3366                 return -EIO;
3367         }
3368         if (want == 0)
3369                 return 0;
3370         err = drbd_recv(mdev, buffer, want);
3371         if (err != want) {
3372                 if (err >= 0)
3373                         err = -EIO;
3374                 return err;
3375         }
3376
3377         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3378
3379         c->word_offset += num_words;
3380         c->bit_offset = c->word_offset * BITS_PER_LONG;
3381         if (c->bit_offset > c->bm_bits)
3382                 c->bit_offset = c->bm_bits;
3383
3384         return 1;
3385 }
3386
3387 /**
3388  * recv_bm_rle_bits
3389  *
3390  * Return 0 when done, 1 when another iteration is needed, and a negative error
3391  * code upon failure.
3392  */
3393 static int
3394 recv_bm_rle_bits(struct drbd_conf *mdev,
3395                 struct p_compressed_bm *p,
3396                 struct bm_xfer_ctx *c)
3397 {
3398         struct bitstream bs;
3399         u64 look_ahead;
3400         u64 rl;
3401         u64 tmp;
3402         unsigned long s = c->bit_offset;
3403         unsigned long e;
3404         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3405         int toggle = DCBP_get_start(p);
3406         int have;
3407         int bits;
3408
3409         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3410
3411         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3412         if (bits < 0)
3413                 return -EIO;
3414
3415         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3416                 bits = vli_decode_bits(&rl, look_ahead);
3417                 if (bits <= 0)
3418                         return -EIO;
3419
3420                 if (toggle) {
3421                         e = s + rl -1;
3422                         if (e >= c->bm_bits) {
3423                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3424                                 return -EIO;
3425                         }
3426                         _drbd_bm_set_bits(mdev, s, e);
3427                 }
3428
3429                 if (have < bits) {
3430                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3431                                 have, bits, look_ahead,
3432                                 (unsigned int)(bs.cur.b - p->code),
3433                                 (unsigned int)bs.buf_len);
3434                         return -EIO;
3435                 }
3436                 look_ahead >>= bits;
3437                 have -= bits;
3438
3439                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3440                 if (bits < 0)
3441                         return -EIO;
3442                 look_ahead |= tmp << have;
3443                 have += bits;
3444         }
3445
3446         c->bit_offset = s;
3447         bm_xfer_ctx_bit_to_word_offset(c);
3448
3449         return (s != c->bm_bits);
3450 }
3451
3452 /**
3453  * decode_bitmap_c
3454  *
3455  * Return 0 when done, 1 when another iteration is needed, and a negative error
3456  * code upon failure.
3457  */
3458 static int
3459 decode_bitmap_c(struct drbd_conf *mdev,
3460                 struct p_compressed_bm *p,
3461                 struct bm_xfer_ctx *c)
3462 {
3463         if (DCBP_get_code(p) == RLE_VLI_Bits)
3464                 return recv_bm_rle_bits(mdev, p, c);
3465
3466         /* other variants had been implemented for evaluation,
3467          * but have been dropped as this one turned out to be "best"
3468          * during all our tests. */
3469
3470         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3471         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3472         return -EIO;
3473 }
3474
3475 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3476                 const char *direction, struct bm_xfer_ctx *c)
3477 {
3478         /* what would it take to transfer it "plaintext" */
3479         unsigned plain = sizeof(struct p_header80) *
3480                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3481                 + c->bm_words * sizeof(long);
3482         unsigned total = c->bytes[0] + c->bytes[1];
3483         unsigned r;
3484
3485         /* total can not be zero. but just in case: */
3486         if (total == 0)
3487                 return;
3488
3489         /* don't report if not compressed */
3490         if (total >= plain)
3491                 return;
3492
3493         /* total < plain. check for overflow, still */
3494         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3495                                     : (1000 * total / plain);
3496
3497         if (r > 1000)
3498                 r = 1000;
3499
3500         r = 1000 - r;
3501         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3502              "total %u; compression: %u.%u%%\n",
3503                         direction,
3504                         c->bytes[1], c->packets[1],
3505                         c->bytes[0], c->packets[0],
3506                         total, r/10, r % 10);
3507 }
3508
3509 /* Since we are processing the bitfield from lower addresses to higher,
3510    it does not matter if the process it in 32 bit chunks or 64 bit
3511    chunks as long as it is little endian. (Understand it as byte stream,
3512    beginning with the lowest byte...) If we would use big endian
3513    we would need to process it from the highest address to the lowest,
3514    in order to be agnostic to the 32 vs 64 bits issue.
3515
3516    returns 0 on failure, 1 if we successfully received it. */
3517 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3518 {
3519         struct bm_xfer_ctx c;
3520         void *buffer;
3521         int err;
3522         int ok = false;
3523         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3524
3525         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3526         /* you are supposed to send additional out-of-sync information
3527          * if you actually set bits during this phase */
3528
3529         /* maybe we should use some per thread scratch page,
3530          * and allocate that during initial device creation? */
3531         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3532         if (!buffer) {
3533                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3534                 goto out;
3535         }
3536
3537         c = (struct bm_xfer_ctx) {
3538                 .bm_bits = drbd_bm_bits(mdev),
3539                 .bm_words = drbd_bm_words(mdev),
3540         };
3541
3542         for(;;) {
3543                 if (cmd == P_BITMAP) {
3544                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3545                 } else if (cmd == P_COMPRESSED_BITMAP) {
3546                         /* MAYBE: sanity check that we speak proto >= 90,
3547                          * and the feature is enabled! */
3548                         struct p_compressed_bm *p;
3549
3550                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3551                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3552                                 goto out;
3553                         }
3554                         /* use the page buff */
3555                         p = buffer;
3556                         memcpy(p, h, sizeof(*h));
3557                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3558                                 goto out;
3559                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3560                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3561                                 goto out;
3562                         }
3563                         err = decode_bitmap_c(mdev, p, &c);
3564                 } else {
3565                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3566                         goto out;
3567                 }
3568
3569                 c.packets[cmd == P_BITMAP]++;
3570                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3571
3572                 if (err <= 0) {
3573                         if (err < 0)
3574                                 goto out;
3575                         break;
3576                 }
3577                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3578                         goto out;
3579         }
3580
3581         INFO_bm_xfer_stats(mdev, "receive", &c);
3582
3583         if (mdev->state.conn == C_WF_BITMAP_T) {
3584                 enum drbd_state_rv rv;
3585
3586                 ok = !drbd_send_bitmap(mdev);
3587                 if (!ok)
3588                         goto out;
3589                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3590                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3591                 D_ASSERT(rv == SS_SUCCESS);
3592         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3593                 /* admin may have requested C_DISCONNECTING,
3594                  * other threads may have noticed network errors */
3595                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3596                     drbd_conn_str(mdev->state.conn));
3597         }
3598
3599         ok = true;
3600  out:
3601         drbd_bm_unlock(mdev);
3602         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3603                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3604         free_page((unsigned long) buffer);
3605         return ok;
3606 }
3607
3608 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3609 {
3610         /* TODO zero copy sink :) */
3611         static char sink[128];
3612         int size, want, r;
3613
3614         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3615                  cmd, data_size);
3616
3617         size = data_size;
3618         while (size > 0) {
3619                 want = min_t(int, size, sizeof(sink));
3620                 r = drbd_recv(mdev, sink, want);
3621                 ERR_IF(r <= 0) break;
3622                 size -= r;
3623         }
3624         return size == 0;
3625 }
3626
3627 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3628 {
3629         /* Make sure we've acked all the TCP data associated
3630          * with the data requests being unplugged */
3631         drbd_tcp_quickack(mdev->data.socket);
3632
3633         return true;
3634 }
3635
3636 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3637 {
3638         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3639
3640         switch (mdev->state.conn) {
3641         case C_WF_SYNC_UUID:
3642         case C_WF_BITMAP_T:
3643         case C_BEHIND:
3644                         break;
3645         default:
3646                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3647                                 drbd_conn_str(mdev->state.conn));
3648         }
3649
3650         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3651
3652         return true;
3653 }
3654
3655 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3656
3657 struct data_cmd {
3658         int expect_payload;
3659         size_t pkt_size;
3660         drbd_cmd_handler_f function;
3661 };
3662
3663 static struct data_cmd drbd_cmd_handler[] = {
3664         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3665         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3666         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3667         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3668         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3669         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3670         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3671         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3672         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3673         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3674         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3675         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3676         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3677         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3678         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3679         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3680         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3681         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3682         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3683         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3684         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3685         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3686         /* anything missing from this table is in
3687          * the asender_tbl, see get_asender_cmd */
3688         [P_MAX_CMD]         = { 0, 0, NULL },
3689 };
3690
3691 /* All handler functions that expect a sub-header get that sub-heder in
3692    mdev->data.rbuf.header.head.payload.
3693
3694    Usually in mdev->data.rbuf.header.head the callback can find the usual
3695    p_header, but they may not rely on that. Since there is also p_header95 !
3696  */
3697
3698 static void drbdd(struct drbd_conf *mdev)
3699 {
3700         union p_header *header = &mdev->data.rbuf.header;
3701         unsigned int packet_size;
3702         enum drbd_packets cmd;
3703         size_t shs; /* sub header size */
3704         int rv;
3705
3706         while (get_t_state(&mdev->receiver) == Running) {
3707                 drbd_thread_current_set_cpu(mdev);
3708                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3709                         goto err_out;
3710
3711                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3712                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3713                         goto err_out;
3714                 }
3715
3716                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3717                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3718                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3719                         goto err_out;
3720                 }
3721
3722                 if (shs) {
3723                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3724                         if (unlikely(rv != shs)) {
3725                                 if (!signal_pending(current))
3726                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3727                                 goto err_out;
3728                         }
3729                 }
3730
3731                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3732
3733                 if (unlikely(!rv)) {
3734                         dev_err(DEV, "error receiving %s, l: %d!\n",
3735                             cmdname(cmd), packet_size);
3736                         goto err_out;
3737                 }
3738         }
3739
3740         if (0) {
3741         err_out:
3742                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3743         }
3744         /* If we leave here, we probably want to update at least the
3745          * "Connected" indicator on stable storage. Do so explicitly here. */
3746         drbd_md_sync(mdev);
3747 }
3748
3749 void drbd_flush_workqueue(struct drbd_conf *mdev)
3750 {
3751         struct drbd_wq_barrier barr;
3752
3753         barr.w.cb = w_prev_work_done;
3754         init_completion(&barr.done);
3755         drbd_queue_work(&mdev->data.work, &barr.w);
3756         wait_for_completion(&barr.done);
3757 }
3758
3759 void drbd_free_tl_hash(struct drbd_conf *mdev)
3760 {
3761         struct hlist_head *h;
3762
3763         spin_lock_irq(&mdev->req_lock);
3764
3765         if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3766                 spin_unlock_irq(&mdev->req_lock);
3767                 return;
3768         }
3769         /* paranoia code */
3770         for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3771                 if (h->first)
3772                         dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3773                                 (int)(h - mdev->ee_hash), h->first);
3774         kfree(mdev->ee_hash);
3775         mdev->ee_hash = NULL;
3776         mdev->ee_hash_s = 0;
3777
3778         /* paranoia code */
3779         for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3780                 if (h->first)
3781                         dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3782                                 (int)(h - mdev->tl_hash), h->first);
3783         kfree(mdev->tl_hash);
3784         mdev->tl_hash = NULL;
3785         mdev->tl_hash_s = 0;
3786         spin_unlock_irq(&mdev->req_lock);
3787 }
3788
3789 static void drbd_disconnect(struct drbd_conf *mdev)
3790 {
3791         enum drbd_fencing_p fp;
3792         union drbd_state os, ns;
3793         int rv = SS_UNKNOWN_ERROR;
3794         unsigned int i;
3795
3796         if (mdev->state.conn == C_STANDALONE)
3797                 return;
3798
3799         /* asender does not clean up anything. it must not interfere, either */
3800         drbd_thread_stop(&mdev->asender);
3801         drbd_free_sock(mdev);
3802
3803         /* wait for current activity to cease. */
3804         spin_lock_irq(&mdev->req_lock);
3805         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3806         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3807         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3808         spin_unlock_irq(&mdev->req_lock);
3809
3810         /* We do not have data structures that would allow us to
3811          * get the rs_pending_cnt down to 0 again.
3812          *  * On C_SYNC_TARGET we do not have any data structures describing
3813          *    the pending RSDataRequest's we have sent.
3814          *  * On C_SYNC_SOURCE there is no data structure that tracks
3815          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3816          *  And no, it is not the sum of the reference counts in the
3817          *  resync_LRU. The resync_LRU tracks the whole operation including
3818          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3819          *  on the fly. */
3820         drbd_rs_cancel_all(mdev);
3821         mdev->rs_total = 0;
3822         mdev->rs_failed = 0;
3823         atomic_set(&mdev->rs_pending_cnt, 0);
3824         wake_up(&mdev->misc_wait);
3825
3826         del_timer(&mdev->request_timer);
3827
3828         /* make sure syncer is stopped and w_resume_next_sg queued */
3829         del_timer_sync(&mdev->resync_timer);
3830         resync_timer_fn((unsigned long)mdev);
3831
3832         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3833          * w_make_resync_request etc. which may still be on the worker queue
3834          * to be "canceled" */
3835         drbd_flush_workqueue(mdev);
3836
3837         /* This also does reclaim_net_ee().  If we do this too early, we might
3838          * miss some resync ee and pages.*/
3839         drbd_process_done_ee(mdev);
3840
3841         kfree(mdev->p_uuid);
3842         mdev->p_uuid = NULL;
3843
3844         if (!is_susp(mdev->state))
3845                 tl_clear(mdev);
3846
3847         dev_info(DEV, "Connection closed\n");
3848
3849         drbd_md_sync(mdev);
3850
3851         fp = FP_DONT_CARE;
3852         if (get_ldev(mdev)) {
3853                 fp = mdev->ldev->dc.fencing;
3854                 put_ldev(mdev);
3855         }
3856
3857         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3858                 drbd_try_outdate_peer_async(mdev);
3859
3860         spin_lock_irq(&mdev->req_lock);
3861         os = mdev->state;
3862         if (os.conn >= C_UNCONNECTED) {
3863                 /* Do not restart in case we are C_DISCONNECTING */
3864                 ns = os;
3865                 ns.conn = C_UNCONNECTED;
3866                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3867         }
3868         spin_unlock_irq(&mdev->req_lock);
3869
3870         if (os.conn == C_DISCONNECTING) {
3871                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3872
3873                 crypto_free_hash(mdev->cram_hmac_tfm);
3874                 mdev->cram_hmac_tfm = NULL;
3875
3876                 kfree(mdev->net_conf);
3877                 mdev->net_conf = NULL;
3878                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3879         }
3880
3881         /* serialize with bitmap writeout triggered by the state change,
3882          * if any. */
3883         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3884
3885         /* tcp_close and release of sendpage pages can be deferred.  I don't
3886          * want to use SO_LINGER, because apparently it can be deferred for
3887          * more than 20 seconds (longest time I checked).
3888          *
3889          * Actually we don't care for exactly when the network stack does its
3890          * put_page(), but release our reference on these pages right here.
3891          */
3892         i = drbd_release_ee(mdev, &mdev->net_ee);
3893         if (i)
3894                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3895         i = atomic_read(&mdev->pp_in_use_by_net);
3896         if (i)
3897                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3898         i = atomic_read(&mdev->pp_in_use);
3899         if (i)
3900                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3901
3902         D_ASSERT(list_empty(&mdev->read_ee));
3903         D_ASSERT(list_empty(&mdev->active_ee));
3904         D_ASSERT(list_empty(&mdev->sync_ee));
3905         D_ASSERT(list_empty(&mdev->done_ee));
3906
3907         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3908         atomic_set(&mdev->current_epoch->epoch_size, 0);
3909         D_ASSERT(list_empty(&mdev->current_epoch->list));
3910 }
3911
3912 /*
3913  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3914  * we can agree on is stored in agreed_pro_version.
3915  *
3916  * feature flags and the reserved array should be enough room for future
3917  * enhancements of the handshake protocol, and possible plugins...
3918  *
3919  * for now, they are expected to be zero, but ignored.
3920  */
3921 static int drbd_send_handshake(struct drbd_conf *mdev)
3922 {
3923         /* ASSERT current == mdev->receiver ... */
3924         struct p_handshake *p = &mdev->data.sbuf.handshake;
3925         int ok;
3926
3927         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3928                 dev_err(DEV, "interrupted during initial handshake\n");
3929                 return 0; /* interrupted. not ok. */
3930         }
3931
3932         if (mdev->data.socket == NULL) {
3933                 mutex_unlock(&mdev->data.mutex);
3934                 return 0;
3935         }
3936
3937         memset(p, 0, sizeof(*p));
3938         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3939         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3940         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3941                              (struct p_header80 *)p, sizeof(*p), 0 );
3942         mutex_unlock(&mdev->data.mutex);
3943         return ok;
3944 }
3945
3946 /*
3947  * return values:
3948  *   1 yes, we have a valid connection
3949  *   0 oops, did not work out, please try again
3950  *  -1 peer talks different language,
3951  *     no point in trying again, please go standalone.
3952  */
3953 static int drbd_do_handshake(struct drbd_conf *mdev)
3954 {
3955         /* ASSERT current == mdev->receiver ... */
3956         struct p_handshake *p = &mdev->data.rbuf.handshake;
3957         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3958         unsigned int length;
3959         enum drbd_packets cmd;
3960         int rv;
3961
3962         rv = drbd_send_handshake(mdev);
3963         if (!rv)
3964                 return 0;
3965
3966         rv = drbd_recv_header(mdev, &cmd, &length);
3967         if (!rv)
3968                 return 0;
3969
3970         if (cmd != P_HAND_SHAKE) {
3971                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3972                      cmdname(cmd), cmd);
3973                 return -1;
3974         }
3975
3976         if (length != expect) {
3977                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3978                      expect, length);
3979                 return -1;
3980         }
3981
3982         rv = drbd_recv(mdev, &p->head.payload, expect);
3983
3984         if (rv != expect) {
3985                 if (!signal_pending(current))
3986                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3987                 return 0;
3988         }
3989
3990         p->protocol_min = be32_to_cpu(p->protocol_min);
3991         p->protocol_max = be32_to_cpu(p->protocol_max);
3992         if (p->protocol_max == 0)
3993                 p->protocol_max = p->protocol_min;
3994
3995         if (PRO_VERSION_MAX < p->protocol_min ||
3996             PRO_VERSION_MIN > p->protocol_max)
3997                 goto incompat;
3998
3999         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4000
4001         dev_info(DEV, "Handshake successful: "
4002              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4003
4004         return 1;
4005
4006  incompat:
4007         dev_err(DEV, "incompatible DRBD dialects: "
4008             "I support %d-%d, peer supports %d-%d\n",
4009             PRO_VERSION_MIN, PRO_VERSION_MAX,
4010             p->protocol_min, p->protocol_max);
4011         return -1;
4012 }
4013
4014 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4015 static int drbd_do_auth(struct drbd_conf *mdev)
4016 {
4017         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4018         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4019         return -1;
4020 }
4021 #else
4022 #define CHALLENGE_LEN 64
4023
4024 /* Return value:
4025         1 - auth succeeded,
4026         0 - failed, try again (network error),
4027         -1 - auth failed, don't try again.
4028 */
4029
4030 static int drbd_do_auth(struct drbd_conf *mdev)
4031 {
4032         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4033         struct scatterlist sg;
4034         char *response = NULL;
4035         char *right_response = NULL;
4036         char *peers_ch = NULL;
4037         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4038         unsigned int resp_size;
4039         struct hash_desc desc;
4040         enum drbd_packets cmd;
4041         unsigned int length;
4042         int rv;
4043
4044         desc.tfm = mdev->cram_hmac_tfm;
4045         desc.flags = 0;
4046
4047         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4048                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4049         if (rv) {
4050                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4051                 rv = -1;
4052                 goto fail;
4053         }
4054
4055         get_random_bytes(my_challenge, CHALLENGE_LEN);
4056
4057         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4058         if (!rv)
4059                 goto fail;
4060
4061         rv = drbd_recv_header(mdev, &cmd, &length);
4062         if (!rv)
4063                 goto fail;
4064
4065         if (cmd != P_AUTH_CHALLENGE) {
4066                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4067                     cmdname(cmd), cmd);
4068                 rv = 0;
4069                 goto fail;
4070         }
4071
4072         if (length > CHALLENGE_LEN * 2) {
4073                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4074                 rv = -1;
4075                 goto fail;
4076         }
4077
4078         peers_ch = kmalloc(length, GFP_NOIO);
4079         if (peers_ch == NULL) {
4080                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4081                 rv = -1;
4082                 goto fail;
4083         }
4084
4085         rv = drbd_recv(mdev, peers_ch, length);
4086
4087         if (rv != length) {
4088                 if (!signal_pending(current))
4089                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4090                 rv = 0;
4091                 goto fail;
4092         }
4093
4094         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4095         response = kmalloc(resp_size, GFP_NOIO);
4096         if (response == NULL) {
4097                 dev_err(DEV, "kmalloc of response failed\n");
4098                 rv = -1;
4099                 goto fail;
4100         }
4101
4102         sg_init_table(&sg, 1);
4103         sg_set_buf(&sg, peers_ch, length);
4104
4105         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4106         if (rv) {
4107                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4108                 rv = -1;
4109                 goto fail;
4110         }
4111
4112         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4113         if (!rv)
4114                 goto fail;
4115
4116         rv = drbd_recv_header(mdev, &cmd, &length);
4117         if (!rv)
4118                 goto fail;
4119
4120         if (cmd != P_AUTH_RESPONSE) {
4121                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4122                         cmdname(cmd), cmd);
4123                 rv = 0;
4124                 goto fail;
4125         }
4126
4127         if (length != resp_size) {
4128                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4129                 rv = 0;
4130                 goto fail;
4131         }
4132
4133         rv = drbd_recv(mdev, response , resp_size);
4134
4135         if (rv != resp_size) {
4136                 if (!signal_pending(current))
4137                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4138                 rv = 0;
4139                 goto fail;
4140         }
4141
4142         right_response = kmalloc(resp_size, GFP_NOIO);
4143         if (right_response == NULL) {
4144                 dev_err(DEV, "kmalloc of right_response failed\n");
4145                 rv = -1;
4146                 goto fail;
4147         }
4148
4149         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4150
4151         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4152         if (rv) {
4153                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4154                 rv = -1;
4155                 goto fail;
4156         }
4157
4158         rv = !memcmp(response, right_response, resp_size);
4159
4160         if (rv)
4161                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4162                      resp_size, mdev->net_conf->cram_hmac_alg);
4163         else
4164                 rv = -1;
4165
4166  fail:
4167         kfree(peers_ch);
4168         kfree(response);
4169         kfree(right_response);
4170
4171         return rv;
4172 }
4173 #endif
4174
4175 int drbdd_init(struct drbd_thread *thi)
4176 {
4177         struct drbd_conf *mdev = thi->mdev;
4178         unsigned int minor = mdev_to_minor(mdev);
4179         int h;
4180
4181         sprintf(current->comm, "drbd%d_receiver", minor);
4182
4183         dev_info(DEV, "receiver (re)started\n");
4184
4185         do {
4186                 h = drbd_connect(mdev);
4187                 if (h == 0) {
4188                         drbd_disconnect(mdev);
4189                         schedule_timeout_interruptible(HZ);
4190                 }
4191                 if (h == -1) {
4192                         dev_warn(DEV, "Discarding network configuration.\n");
4193                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4194                 }
4195         } while (h == 0);
4196
4197         if (h > 0) {
4198                 if (get_net_conf(mdev)) {
4199                         drbdd(mdev);
4200                         put_net_conf(mdev);
4201                 }
4202         }
4203
4204         drbd_disconnect(mdev);
4205
4206         dev_info(DEV, "receiver terminated\n");
4207         return 0;
4208 }
4209
4210 /* ********* acknowledge sender ******** */
4211
4212 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4213 {
4214         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4215
4216         int retcode = be32_to_cpu(p->retcode);
4217
4218         if (retcode >= SS_SUCCESS) {
4219                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4220         } else {
4221                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4222                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4223                     drbd_set_st_err_str(retcode), retcode);
4224         }
4225         wake_up(&mdev->state_wait);
4226
4227         return true;
4228 }
4229
4230 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4231 {
4232         return drbd_send_ping_ack(mdev);
4233
4234 }
4235
4236 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4237 {
4238         /* restore idle timeout */
4239         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4240         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4241                 wake_up(&mdev->misc_wait);
4242
4243         return true;
4244 }
4245
4246 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4247 {
4248         struct p_block_ack *p = (struct p_block_ack *)h;
4249         sector_t sector = be64_to_cpu(p->sector);
4250         int blksize = be32_to_cpu(p->blksize);
4251
4252         D_ASSERT(mdev->agreed_pro_version >= 89);
4253
4254         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4255
4256         if (get_ldev(mdev)) {
4257                 drbd_rs_complete_io(mdev, sector);
4258                 drbd_set_in_sync(mdev, sector, blksize);
4259                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4260                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4261                 put_ldev(mdev);
4262         }
4263         dec_rs_pending(mdev);
4264         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4265
4266         return true;
4267 }
4268
4269 /* when we receive the ACK for a write request,
4270  * verify that we actually know about it */
4271 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4272         u64 id, sector_t sector)
4273 {
4274         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4275         struct hlist_node *n;
4276         struct drbd_request *req;
4277
4278         hlist_for_each_entry(req, n, slot, colision) {
4279                 if ((unsigned long)req == (unsigned long)id) {
4280                         if (req->sector != sector) {
4281                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4282                                     "wrong sector (%llus versus %llus)\n", req,
4283                                     (unsigned long long)req->sector,
4284                                     (unsigned long long)sector);
4285                                 break;
4286                         }
4287                         return req;
4288                 }
4289         }
4290         return NULL;
4291 }
4292
4293 typedef struct drbd_request *(req_validator_fn)
4294         (struct drbd_conf *mdev, u64 id, sector_t sector);
4295
4296 static int validate_req_change_req_state(struct drbd_conf *mdev,
4297         u64 id, sector_t sector, req_validator_fn validator,
4298         const char *func, enum drbd_req_event what)
4299 {
4300         struct drbd_request *req;
4301         struct bio_and_error m;
4302
4303         spin_lock_irq(&mdev->req_lock);
4304         req = validator(mdev, id, sector);
4305         if (unlikely(!req)) {
4306                 spin_unlock_irq(&mdev->req_lock);
4307
4308                 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4309                         (void *)(unsigned long)id, (unsigned long long)sector);
4310                 return false;
4311         }
4312         __req_mod(req, what, &m);
4313         spin_unlock_irq(&mdev->req_lock);
4314
4315         if (m.bio)
4316                 complete_master_bio(mdev, &m);
4317         return true;
4318 }
4319
4320 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4321 {
4322         struct p_block_ack *p = (struct p_block_ack *)h;
4323         sector_t sector = be64_to_cpu(p->sector);
4324         int blksize = be32_to_cpu(p->blksize);
4325         enum drbd_req_event what;
4326
4327         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4328
4329         if (is_syncer_block_id(p->block_id)) {
4330                 drbd_set_in_sync(mdev, sector, blksize);
4331                 dec_rs_pending(mdev);
4332                 return true;
4333         }
4334         switch (be16_to_cpu(h->command)) {
4335         case P_RS_WRITE_ACK:
4336                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4337                 what = write_acked_by_peer_and_sis;
4338                 break;
4339         case P_WRITE_ACK:
4340                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4341                 what = write_acked_by_peer;
4342                 break;
4343         case P_RECV_ACK:
4344                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4345                 what = recv_acked_by_peer;
4346                 break;
4347         case P_DISCARD_ACK:
4348                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4349                 what = conflict_discarded_by_peer;
4350                 break;
4351         default:
4352                 D_ASSERT(0);
4353                 return false;
4354         }
4355
4356         return validate_req_change_req_state(mdev, p->block_id, sector,
4357                 _ack_id_to_req, __func__ , what);
4358 }
4359
4360 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4361 {
4362         struct p_block_ack *p = (struct p_block_ack *)h;
4363         sector_t sector = be64_to_cpu(p->sector);
4364         int size = be32_to_cpu(p->blksize);
4365         struct drbd_request *req;
4366         struct bio_and_error m;
4367
4368         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4369
4370         if (is_syncer_block_id(p->block_id)) {
4371                 dec_rs_pending(mdev);
4372                 drbd_rs_failed_io(mdev, sector, size);
4373                 return true;
4374         }
4375
4376         spin_lock_irq(&mdev->req_lock);
4377         req = _ack_id_to_req(mdev, p->block_id, sector);
4378         if (!req) {
4379                 spin_unlock_irq(&mdev->req_lock);
4380                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4381                     mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4382                         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4383                            The master bio might already be completed, therefore the
4384                            request is no longer in the collision hash.
4385                            => Do not try to validate block_id as request. */
4386                         /* In Protocol B we might already have got a P_RECV_ACK
4387                            but then get a P_NEG_ACK after wards. */
4388                         drbd_set_out_of_sync(mdev, sector, size);
4389                         return true;
4390                 } else {
4391                         dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4392                                 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4393                         return false;
4394                 }
4395         }
4396         __req_mod(req, neg_acked, &m);
4397         spin_unlock_irq(&mdev->req_lock);
4398
4399         if (m.bio)
4400                 complete_master_bio(mdev, &m);
4401         return true;
4402 }
4403
4404 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4405 {
4406         struct p_block_ack *p = (struct p_block_ack *)h;
4407         sector_t sector = be64_to_cpu(p->sector);
4408
4409         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4410         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4411             (unsigned long long)sector, be32_to_cpu(p->blksize));
4412
4413         return validate_req_change_req_state(mdev, p->block_id, sector,
4414                 _ar_id_to_req, __func__ , neg_acked);
4415 }
4416
4417 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4418 {
4419         sector_t sector;
4420         int size;
4421         struct p_block_ack *p = (struct p_block_ack *)h;
4422
4423         sector = be64_to_cpu(p->sector);
4424         size = be32_to_cpu(p->blksize);
4425
4426         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4427
4428         dec_rs_pending(mdev);
4429
4430         if (get_ldev_if_state(mdev, D_FAILED)) {
4431                 drbd_rs_complete_io(mdev, sector);
4432                 switch (be16_to_cpu(h->command)) {
4433                 case P_NEG_RS_DREPLY:
4434                         drbd_rs_failed_io(mdev, sector, size);
4435                 case P_RS_CANCEL:
4436                         break;
4437                 default:
4438                         D_ASSERT(0);
4439                         put_ldev(mdev);
4440                         return false;
4441                 }
4442                 put_ldev(mdev);
4443         }
4444
4445         return true;
4446 }
4447
4448 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4449 {
4450         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4451
4452         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4453
4454         if (mdev->state.conn == C_AHEAD &&
4455             atomic_read(&mdev->ap_in_flight) == 0 &&
4456             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4457                 mdev->start_resync_timer.expires = jiffies + HZ;
4458                 add_timer(&mdev->start_resync_timer);
4459         }
4460
4461         return true;
4462 }
4463
4464 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4465 {
4466         struct p_block_ack *p = (struct p_block_ack *)h;
4467         struct drbd_work *w;
4468         sector_t sector;
4469         int size;
4470
4471         sector = be64_to_cpu(p->sector);
4472         size = be32_to_cpu(p->blksize);
4473
4474         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4475
4476         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4477                 drbd_ov_oos_found(mdev, sector, size);
4478         else
4479                 ov_oos_print(mdev);
4480
4481         if (!get_ldev(mdev))
4482                 return true;
4483
4484         drbd_rs_complete_io(mdev, sector);
4485         dec_rs_pending(mdev);
4486
4487         --mdev->ov_left;
4488
4489         /* let's advance progress step marks only for every other megabyte */
4490         if ((mdev->ov_left & 0x200) == 0x200)
4491                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4492
4493         if (mdev->ov_left == 0) {
4494                 w = kmalloc(sizeof(*w), GFP_NOIO);
4495                 if (w) {
4496                         w->cb = w_ov_finished;
4497                         drbd_queue_work_front(&mdev->data.work, w);
4498                 } else {
4499                         dev_err(DEV, "kmalloc(w) failed.");
4500                         ov_oos_print(mdev);
4501                         drbd_resync_finished(mdev);
4502                 }
4503         }
4504         put_ldev(mdev);
4505         return true;
4506 }
4507
4508 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4509 {
4510         return true;
4511 }
4512
4513 struct asender_cmd {
4514         size_t pkt_size;
4515         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4516 };
4517
4518 static struct asender_cmd *get_asender_cmd(int cmd)
4519 {
4520         static struct asender_cmd asender_tbl[] = {
4521                 /* anything missing from this table is in
4522                  * the drbd_cmd_handler (drbd_default_handler) table,
4523                  * see the beginning of drbdd() */
4524         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4525         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4526         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4527         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4528         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4529         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4530         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4531         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4532         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4533         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4534         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4535         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4536         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4537         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4538         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4539         [P_MAX_CMD]         = { 0, NULL },
4540         };
4541         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4542                 return NULL;
4543         return &asender_tbl[cmd];
4544 }
4545
4546 int drbd_asender(struct drbd_thread *thi)
4547 {
4548         struct drbd_conf *mdev = thi->mdev;
4549         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4550         struct asender_cmd *cmd = NULL;
4551
4552         int rv, len;
4553         void *buf    = h;
4554         int received = 0;
4555         int expect   = sizeof(struct p_header80);
4556         int empty;
4557
4558         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4559
4560         current->policy = SCHED_RR;  /* Make this a realtime task! */
4561         current->rt_priority = 2;    /* more important than all other tasks */
4562
4563         while (get_t_state(thi) == Running) {
4564                 drbd_thread_current_set_cpu(mdev);
4565                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4566                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4567                         mdev->meta.socket->sk->sk_rcvtimeo =
4568                                 mdev->net_conf->ping_timeo*HZ/10;
4569                 }
4570
4571                 /* conditionally cork;
4572                  * it may hurt latency if we cork without much to send */
4573                 if (!mdev->net_conf->no_cork &&
4574                         3 < atomic_read(&mdev->unacked_cnt))
4575                         drbd_tcp_cork(mdev->meta.socket);
4576                 while (1) {
4577                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4578                         flush_signals(current);
4579                         if (!drbd_process_done_ee(mdev))
4580                                 goto reconnect;
4581                         /* to avoid race with newly queued ACKs */
4582                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4583                         spin_lock_irq(&mdev->req_lock);
4584                         empty = list_empty(&mdev->done_ee);
4585                         spin_unlock_irq(&mdev->req_lock);
4586                         /* new ack may have been queued right here,
4587                          * but then there is also a signal pending,
4588                          * and we start over... */
4589                         if (empty)
4590                                 break;
4591                 }
4592                 /* but unconditionally uncork unless disabled */
4593                 if (!mdev->net_conf->no_cork)
4594                         drbd_tcp_uncork(mdev->meta.socket);
4595
4596                 /* short circuit, recv_msg would return EINTR anyways. */
4597                 if (signal_pending(current))
4598                         continue;
4599
4600                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4601                                      buf, expect-received, 0);
4602                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4603
4604                 flush_signals(current);
4605
4606                 /* Note:
4607                  * -EINTR        (on meta) we got a signal
4608                  * -EAGAIN       (on meta) rcvtimeo expired
4609                  * -ECONNRESET   other side closed the connection
4610                  * -ERESTARTSYS  (on data) we got a signal
4611                  * rv <  0       other than above: unexpected error!
4612                  * rv == expected: full header or command
4613                  * rv <  expected: "woken" by signal during receive
4614                  * rv == 0       : "connection shut down by peer"
4615                  */
4616                 if (likely(rv > 0)) {
4617                         received += rv;
4618                         buf      += rv;
4619                 } else if (rv == 0) {
4620                         dev_err(DEV, "meta connection shut down by peer.\n");
4621                         goto reconnect;
4622                 } else if (rv == -EAGAIN) {
4623                         if (mdev->meta.socket->sk->sk_rcvtimeo ==
4624                             mdev->net_conf->ping_timeo*HZ/10) {
4625                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4626                                 goto reconnect;
4627                         }
4628                         set_bit(SEND_PING, &mdev->flags);
4629                         continue;
4630                 } else if (rv == -EINTR) {
4631                         continue;
4632                 } else {
4633                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4634                         goto reconnect;
4635                 }
4636
4637                 if (received == expect && cmd == NULL) {
4638                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4639                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4640                                     be32_to_cpu(h->magic),
4641                                     be16_to_cpu(h->command),
4642                                     be16_to_cpu(h->length));
4643                                 goto reconnect;
4644                         }
4645                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4646                         len = be16_to_cpu(h->length);
4647                         if (unlikely(cmd == NULL)) {
4648                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4649                                     be32_to_cpu(h->magic),
4650                                     be16_to_cpu(h->command),
4651                                     be16_to_cpu(h->length));
4652                                 goto disconnect;
4653                         }
4654                         expect = cmd->pkt_size;
4655                         ERR_IF(len != expect-sizeof(struct p_header80))
4656                                 goto reconnect;
4657                 }
4658                 if (received == expect) {
4659                         D_ASSERT(cmd != NULL);
4660                         if (!cmd->process(mdev, h))
4661                                 goto reconnect;
4662
4663                         buf      = h;
4664                         received = 0;
4665                         expect   = sizeof(struct p_header80);
4666                         cmd      = NULL;
4667                 }
4668         }
4669
4670         if (0) {
4671 reconnect:
4672                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4673                 drbd_md_sync(mdev);
4674         }
4675         if (0) {
4676 disconnect:
4677                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4678                 drbd_md_sync(mdev);
4679         }
4680         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4681
4682         D_ASSERT(mdev->state.conn < C_CONNECTED);
4683         dev_info(DEV, "asender terminated\n");
4684
4685         return 0;
4686 }