Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs-2.6
[pandora-kernel.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/string.h>
46 #include <linux/scatterlist.h>
47 #include "drbd_int.h"
48 #include "drbd_req.h"
49
50 #include "drbd_vli.h"
51
52 struct flush_work {
53         struct drbd_work w;
54         struct drbd_epoch *epoch;
55 };
56
57 enum finish_epoch {
58         FE_STILL_LIVE,
59         FE_DESTROYED,
60         FE_RECYCLED,
61 };
62
63 static int drbd_do_handshake(struct drbd_conf *mdev);
64 static int drbd_do_auth(struct drbd_conf *mdev);
65
66 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68
69 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70 {
71         struct drbd_epoch *prev;
72         spin_lock(&mdev->epoch_lock);
73         prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74         if (prev == epoch || prev == mdev->current_epoch)
75                 prev = NULL;
76         spin_unlock(&mdev->epoch_lock);
77         return prev;
78 }
79
80 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
81
82 /*
83  * some helper functions to deal with single linked page lists,
84  * page->private being our "next" pointer.
85  */
86
87 /* If at least n pages are linked at head, get n pages off.
88  * Otherwise, don't modify head, and return NULL.
89  * Locking is the responsibility of the caller.
90  */
91 static struct page *page_chain_del(struct page **head, int n)
92 {
93         struct page *page;
94         struct page *tmp;
95
96         BUG_ON(!n);
97         BUG_ON(!head);
98
99         page = *head;
100
101         if (!page)
102                 return NULL;
103
104         while (page) {
105                 tmp = page_chain_next(page);
106                 if (--n == 0)
107                         break; /* found sufficient pages */
108                 if (tmp == NULL)
109                         /* insufficient pages, don't use any of them. */
110                         return NULL;
111                 page = tmp;
112         }
113
114         /* add end of list marker for the returned list */
115         set_page_private(page, 0);
116         /* actual return value, and adjustment of head */
117         page = *head;
118         *head = tmp;
119         return page;
120 }
121
122 /* may be used outside of locks to find the tail of a (usually short)
123  * "private" page chain, before adding it back to a global chain head
124  * with page_chain_add() under a spinlock. */
125 static struct page *page_chain_tail(struct page *page, int *len)
126 {
127         struct page *tmp;
128         int i = 1;
129         while ((tmp = page_chain_next(page)))
130                 ++i, page = tmp;
131         if (len)
132                 *len = i;
133         return page;
134 }
135
136 static int page_chain_free(struct page *page)
137 {
138         struct page *tmp;
139         int i = 0;
140         page_chain_for_each_safe(page, tmp) {
141                 put_page(page);
142                 ++i;
143         }
144         return i;
145 }
146
147 static void page_chain_add(struct page **head,
148                 struct page *chain_first, struct page *chain_last)
149 {
150 #if 1
151         struct page *tmp;
152         tmp = page_chain_tail(chain_first, NULL);
153         BUG_ON(tmp != chain_last);
154 #endif
155
156         /* add chain to head */
157         set_page_private(chain_last, (unsigned long)*head);
158         *head = chain_first;
159 }
160
161 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
162 {
163         struct page *page = NULL;
164         struct page *tmp = NULL;
165         int i = 0;
166
167         /* Yes, testing drbd_pp_vacant outside the lock is racy.
168          * So what. It saves a spin_lock. */
169         if (drbd_pp_vacant >= number) {
170                 spin_lock(&drbd_pp_lock);
171                 page = page_chain_del(&drbd_pp_pool, number);
172                 if (page)
173                         drbd_pp_vacant -= number;
174                 spin_unlock(&drbd_pp_lock);
175                 if (page)
176                         return page;
177         }
178
179         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180          * "criss-cross" setup, that might cause write-out on some other DRBD,
181          * which in turn might block on the other node at this very place.  */
182         for (i = 0; i < number; i++) {
183                 tmp = alloc_page(GFP_TRY);
184                 if (!tmp)
185                         break;
186                 set_page_private(tmp, (unsigned long)page);
187                 page = tmp;
188         }
189
190         if (i == number)
191                 return page;
192
193         /* Not enough pages immediately available this time.
194          * No need to jump around here, drbd_pp_alloc will retry this
195          * function "soon". */
196         if (page) {
197                 tmp = page_chain_tail(page, NULL);
198                 spin_lock(&drbd_pp_lock);
199                 page_chain_add(&drbd_pp_pool, page, tmp);
200                 drbd_pp_vacant += i;
201                 spin_unlock(&drbd_pp_lock);
202         }
203         return NULL;
204 }
205
206 /* kick lower level device, if we have more than (arbitrary number)
207  * reference counts on it, which typically are locally submitted io
208  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
209 static void maybe_kick_lo(struct drbd_conf *mdev)
210 {
211         if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212                 drbd_kick_lo(mdev);
213 }
214
215 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216 {
217         struct drbd_epoch_entry *e;
218         struct list_head *le, *tle;
219
220         /* The EEs are always appended to the end of the list. Since
221            they are sent in order over the wire, they have to finish
222            in order. As soon as we see the first not finished we can
223            stop to examine the list... */
224
225         list_for_each_safe(le, tle, &mdev->net_ee) {
226                 e = list_entry(le, struct drbd_epoch_entry, w.list);
227                 if (drbd_ee_has_active_page(e))
228                         break;
229                 list_move(le, to_be_freed);
230         }
231 }
232
233 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234 {
235         LIST_HEAD(reclaimed);
236         struct drbd_epoch_entry *e, *t;
237
238         maybe_kick_lo(mdev);
239         spin_lock_irq(&mdev->req_lock);
240         reclaim_net_ee(mdev, &reclaimed);
241         spin_unlock_irq(&mdev->req_lock);
242
243         list_for_each_entry_safe(e, t, &reclaimed, w.list)
244                 drbd_free_ee(mdev, e);
245 }
246
247 /**
248  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
249  * @mdev:       DRBD device.
250  * @number:     number of pages requested
251  * @retry:      whether to retry, if not enough pages are available right now
252  *
253  * Tries to allocate number pages, first from our own page pool, then from
254  * the kernel, unless this allocation would exceed the max_buffers setting.
255  * Possibly retry until DRBD frees sufficient pages somewhere else.
256  *
257  * Returns a page chain linked via page->private.
258  */
259 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
260 {
261         struct page *page = NULL;
262         DEFINE_WAIT(wait);
263
264         /* Yes, we may run up to @number over max_buffers. If we
265          * follow it strictly, the admin will get it wrong anyways. */
266         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
268
269         while (page == NULL) {
270                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271
272                 drbd_kick_lo_and_reclaim_net(mdev);
273
274                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
275                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
276                         if (page)
277                                 break;
278                 }
279
280                 if (!retry)
281                         break;
282
283                 if (signal_pending(current)) {
284                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285                         break;
286                 }
287
288                 schedule();
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &mdev->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
298  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
302 {
303         int i;
304         if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
305                 i = page_chain_free(page);
306         else {
307                 struct page *tmp;
308                 tmp = page_chain_tail(page, &i);
309                 spin_lock(&drbd_pp_lock);
310                 page_chain_add(&drbd_pp_pool, page, tmp);
311                 drbd_pp_vacant += i;
312                 spin_unlock(&drbd_pp_lock);
313         }
314         atomic_sub(i, &mdev->pp_in_use);
315         i = atomic_read(&mdev->pp_in_use);
316         if (i < 0)
317                 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
318         wake_up(&drbd_pp_wait);
319 }
320
321 /*
322 You need to hold the req_lock:
323  _drbd_wait_ee_list_empty()
324
325 You must not have the req_lock:
326  drbd_free_ee()
327  drbd_alloc_ee()
328  drbd_init_ee()
329  drbd_release_ee()
330  drbd_ee_fix_bhs()
331  drbd_process_done_ee()
332  drbd_clear_done_ee()
333  drbd_wait_ee_list_empty()
334 */
335
336 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
337                                      u64 id,
338                                      sector_t sector,
339                                      unsigned int data_size,
340                                      gfp_t gfp_mask) __must_hold(local)
341 {
342         struct drbd_epoch_entry *e;
343         struct page *page;
344         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
345
346         if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
347                 return NULL;
348
349         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
350         if (!e) {
351                 if (!(gfp_mask & __GFP_NOWARN))
352                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
353                 return NULL;
354         }
355
356         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
357         if (!page)
358                 goto fail;
359
360         INIT_HLIST_NODE(&e->colision);
361         e->epoch = NULL;
362         e->mdev = mdev;
363         e->pages = page;
364         atomic_set(&e->pending_bios, 0);
365         e->size = data_size;
366         e->flags = 0;
367         e->sector = sector;
368         e->sector = sector;
369         e->block_id = id;
370
371         return e;
372
373  fail:
374         mempool_free(e, drbd_ee_mempool);
375         return NULL;
376 }
377
378 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
379 {
380         drbd_pp_free(mdev, e->pages);
381         D_ASSERT(atomic_read(&e->pending_bios) == 0);
382         D_ASSERT(hlist_unhashed(&e->colision));
383         mempool_free(e, drbd_ee_mempool);
384 }
385
386 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_epoch_entry *e, *t;
390         int count = 0;
391
392         spin_lock_irq(&mdev->req_lock);
393         list_splice_init(list, &work_list);
394         spin_unlock_irq(&mdev->req_lock);
395
396         list_for_each_entry_safe(e, t, &work_list, w.list) {
397                 drbd_free_ee(mdev, e);
398                 count++;
399         }
400         return count;
401 }
402
403
404 /*
405  * This function is called from _asender only_
406  * but see also comments in _req_mod(,barrier_acked)
407  * and receive_Barrier.
408  *
409  * Move entries from net_ee to done_ee, if ready.
410  * Grab done_ee, call all callbacks, free the entries.
411  * The callbacks typically send out ACKs.
412  */
413 static int drbd_process_done_ee(struct drbd_conf *mdev)
414 {
415         LIST_HEAD(work_list);
416         LIST_HEAD(reclaimed);
417         struct drbd_epoch_entry *e, *t;
418         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
419
420         spin_lock_irq(&mdev->req_lock);
421         reclaim_net_ee(mdev, &reclaimed);
422         list_splice_init(&mdev->done_ee, &work_list);
423         spin_unlock_irq(&mdev->req_lock);
424
425         list_for_each_entry_safe(e, t, &reclaimed, w.list)
426                 drbd_free_ee(mdev, e);
427
428         /* possible callbacks here:
429          * e_end_block, and e_end_resync_block, e_send_discard_ack.
430          * all ignore the last argument.
431          */
432         list_for_each_entry_safe(e, t, &work_list, w.list) {
433                 /* list_del not necessary, next/prev members not touched */
434                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
435                 drbd_free_ee(mdev, e);
436         }
437         wake_up(&mdev->ee_wait);
438
439         return ok;
440 }
441
442 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
443 {
444         DEFINE_WAIT(wait);
445
446         /* avoids spin_lock/unlock
447          * and calling prepare_to_wait in the fast path */
448         while (!list_empty(head)) {
449                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
450                 spin_unlock_irq(&mdev->req_lock);
451                 drbd_kick_lo(mdev);
452                 schedule();
453                 finish_wait(&mdev->ee_wait, &wait);
454                 spin_lock_irq(&mdev->req_lock);
455         }
456 }
457
458 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
459 {
460         spin_lock_irq(&mdev->req_lock);
461         _drbd_wait_ee_list_empty(mdev, head);
462         spin_unlock_irq(&mdev->req_lock);
463 }
464
465 /* see also kernel_accept; which is only present since 2.6.18.
466  * also we want to log which part of it failed, exactly */
467 static int drbd_accept(struct drbd_conf *mdev, const char **what,
468                 struct socket *sock, struct socket **newsock)
469 {
470         struct sock *sk = sock->sk;
471         int err = 0;
472
473         *what = "listen";
474         err = sock->ops->listen(sock, 5);
475         if (err < 0)
476                 goto out;
477
478         *what = "sock_create_lite";
479         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
480                                newsock);
481         if (err < 0)
482                 goto out;
483
484         *what = "accept";
485         err = sock->ops->accept(sock, *newsock, 0);
486         if (err < 0) {
487                 sock_release(*newsock);
488                 *newsock = NULL;
489                 goto out;
490         }
491         (*newsock)->ops  = sock->ops;
492
493 out:
494         return err;
495 }
496
497 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
498                     void *buf, size_t size, int flags)
499 {
500         mm_segment_t oldfs;
501         struct kvec iov = {
502                 .iov_base = buf,
503                 .iov_len = size,
504         };
505         struct msghdr msg = {
506                 .msg_iovlen = 1,
507                 .msg_iov = (struct iovec *)&iov,
508                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
509         };
510         int rv;
511
512         oldfs = get_fs();
513         set_fs(KERNEL_DS);
514         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
515         set_fs(oldfs);
516
517         return rv;
518 }
519
520 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
521 {
522         mm_segment_t oldfs;
523         struct kvec iov = {
524                 .iov_base = buf,
525                 .iov_len = size,
526         };
527         struct msghdr msg = {
528                 .msg_iovlen = 1,
529                 .msg_iov = (struct iovec *)&iov,
530                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
531         };
532         int rv;
533
534         oldfs = get_fs();
535         set_fs(KERNEL_DS);
536
537         for (;;) {
538                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
539                 if (rv == size)
540                         break;
541
542                 /* Note:
543                  * ECONNRESET   other side closed the connection
544                  * ERESTARTSYS  (on  sock) we got a signal
545                  */
546
547                 if (rv < 0) {
548                         if (rv == -ECONNRESET)
549                                 dev_info(DEV, "sock was reset by peer\n");
550                         else if (rv != -ERESTARTSYS)
551                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
552                         break;
553                 } else if (rv == 0) {
554                         dev_info(DEV, "sock was shut down by peer\n");
555                         break;
556                 } else  {
557                         /* signal came in, or peer/link went down,
558                          * after we read a partial message
559                          */
560                         /* D_ASSERT(signal_pending(current)); */
561                         break;
562                 }
563         };
564
565         set_fs(oldfs);
566
567         if (rv != size)
568                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
569
570         return rv;
571 }
572
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579                 unsigned int rcv)
580 {
581         /* open coded SO_SNDBUF, SO_RCVBUF */
582         if (snd) {
583                 sock->sk->sk_sndbuf = snd;
584                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585         }
586         if (rcv) {
587                 sock->sk->sk_rcvbuf = rcv;
588                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589         }
590 }
591
592 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
593 {
594         const char *what;
595         struct socket *sock;
596         struct sockaddr_in6 src_in6;
597         int err;
598         int disconnect_on_error = 1;
599
600         if (!get_net_conf(mdev))
601                 return NULL;
602
603         what = "sock_create_kern";
604         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
605                 SOCK_STREAM, IPPROTO_TCP, &sock);
606         if (err < 0) {
607                 sock = NULL;
608                 goto out;
609         }
610
611         sock->sk->sk_rcvtimeo =
612         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
613         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
614                         mdev->net_conf->rcvbuf_size);
615
616        /* explicitly bind to the configured IP as source IP
617         *  for the outgoing connections.
618         *  This is needed for multihomed hosts and to be
619         *  able to use lo: interfaces for drbd.
620         * Make sure to use 0 as port number, so linux selects
621         *  a free one dynamically.
622         */
623         memcpy(&src_in6, mdev->net_conf->my_addr,
624                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
625         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
626                 src_in6.sin6_port = 0;
627         else
628                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
629
630         what = "bind before connect";
631         err = sock->ops->bind(sock,
632                               (struct sockaddr *) &src_in6,
633                               mdev->net_conf->my_addr_len);
634         if (err < 0)
635                 goto out;
636
637         /* connect may fail, peer not yet available.
638          * stay C_WF_CONNECTION, don't go Disconnecting! */
639         disconnect_on_error = 0;
640         what = "connect";
641         err = sock->ops->connect(sock,
642                                  (struct sockaddr *)mdev->net_conf->peer_addr,
643                                  mdev->net_conf->peer_addr_len, 0);
644
645 out:
646         if (err < 0) {
647                 if (sock) {
648                         sock_release(sock);
649                         sock = NULL;
650                 }
651                 switch (-err) {
652                         /* timeout, busy, signal pending */
653                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
654                 case EINTR: case ERESTARTSYS:
655                         /* peer not (yet) available, network problem */
656                 case ECONNREFUSED: case ENETUNREACH:
657                 case EHOSTDOWN:    case EHOSTUNREACH:
658                         disconnect_on_error = 0;
659                         break;
660                 default:
661                         dev_err(DEV, "%s failed, err = %d\n", what, err);
662                 }
663                 if (disconnect_on_error)
664                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
665         }
666         put_net_conf(mdev);
667         return sock;
668 }
669
670 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
671 {
672         int timeo, err;
673         struct socket *s_estab = NULL, *s_listen;
674         const char *what;
675
676         if (!get_net_conf(mdev))
677                 return NULL;
678
679         what = "sock_create_kern";
680         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
681                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
682         if (err) {
683                 s_listen = NULL;
684                 goto out;
685         }
686
687         timeo = mdev->net_conf->try_connect_int * HZ;
688         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
689
690         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
691         s_listen->sk->sk_rcvtimeo = timeo;
692         s_listen->sk->sk_sndtimeo = timeo;
693         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
694                         mdev->net_conf->rcvbuf_size);
695
696         what = "bind before listen";
697         err = s_listen->ops->bind(s_listen,
698                               (struct sockaddr *) mdev->net_conf->my_addr,
699                               mdev->net_conf->my_addr_len);
700         if (err < 0)
701                 goto out;
702
703         err = drbd_accept(mdev, &what, s_listen, &s_estab);
704
705 out:
706         if (s_listen)
707                 sock_release(s_listen);
708         if (err < 0) {
709                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
710                         dev_err(DEV, "%s failed, err = %d\n", what, err);
711                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
712                 }
713         }
714         put_net_conf(mdev);
715
716         return s_estab;
717 }
718
719 static int drbd_send_fp(struct drbd_conf *mdev,
720         struct socket *sock, enum drbd_packets cmd)
721 {
722         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
723
724         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
725 }
726
727 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
728 {
729         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
730         int rr;
731
732         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
733
734         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
735                 return be16_to_cpu(h->command);
736
737         return 0xffff;
738 }
739
740 /**
741  * drbd_socket_okay() - Free the socket if its connection is not okay
742  * @mdev:       DRBD device.
743  * @sock:       pointer to the pointer to the socket.
744  */
745 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
746 {
747         int rr;
748         char tb[4];
749
750         if (!*sock)
751                 return FALSE;
752
753         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
754
755         if (rr > 0 || rr == -EAGAIN) {
756                 return TRUE;
757         } else {
758                 sock_release(*sock);
759                 *sock = NULL;
760                 return FALSE;
761         }
762 }
763
764 /*
765  * return values:
766  *   1 yes, we have a valid connection
767  *   0 oops, did not work out, please try again
768  *  -1 peer talks different language,
769  *     no point in trying again, please go standalone.
770  *  -2 We do not have a network config...
771  */
772 static int drbd_connect(struct drbd_conf *mdev)
773 {
774         struct socket *s, *sock, *msock;
775         int try, h, ok;
776
777         D_ASSERT(!mdev->data.socket);
778
779         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
780                 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
781
782         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
783                 return -2;
784
785         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
786
787         sock  = NULL;
788         msock = NULL;
789
790         do {
791                 for (try = 0;;) {
792                         /* 3 tries, this should take less than a second! */
793                         s = drbd_try_connect(mdev);
794                         if (s || ++try >= 3)
795                                 break;
796                         /* give the other side time to call bind() & listen() */
797                         __set_current_state(TASK_INTERRUPTIBLE);
798                         schedule_timeout(HZ / 10);
799                 }
800
801                 if (s) {
802                         if (!sock) {
803                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
804                                 sock = s;
805                                 s = NULL;
806                         } else if (!msock) {
807                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
808                                 msock = s;
809                                 s = NULL;
810                         } else {
811                                 dev_err(DEV, "Logic error in drbd_connect()\n");
812                                 goto out_release_sockets;
813                         }
814                 }
815
816                 if (sock && msock) {
817                         __set_current_state(TASK_INTERRUPTIBLE);
818                         schedule_timeout(HZ / 10);
819                         ok = drbd_socket_okay(mdev, &sock);
820                         ok = drbd_socket_okay(mdev, &msock) && ok;
821                         if (ok)
822                                 break;
823                 }
824
825 retry:
826                 s = drbd_wait_for_connect(mdev);
827                 if (s) {
828                         try = drbd_recv_fp(mdev, s);
829                         drbd_socket_okay(mdev, &sock);
830                         drbd_socket_okay(mdev, &msock);
831                         switch (try) {
832                         case P_HAND_SHAKE_S:
833                                 if (sock) {
834                                         dev_warn(DEV, "initial packet S crossed\n");
835                                         sock_release(sock);
836                                 }
837                                 sock = s;
838                                 break;
839                         case P_HAND_SHAKE_M:
840                                 if (msock) {
841                                         dev_warn(DEV, "initial packet M crossed\n");
842                                         sock_release(msock);
843                                 }
844                                 msock = s;
845                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
846                                 break;
847                         default:
848                                 dev_warn(DEV, "Error receiving initial packet\n");
849                                 sock_release(s);
850                                 if (random32() & 1)
851                                         goto retry;
852                         }
853                 }
854
855                 if (mdev->state.conn <= C_DISCONNECTING)
856                         goto out_release_sockets;
857                 if (signal_pending(current)) {
858                         flush_signals(current);
859                         smp_rmb();
860                         if (get_t_state(&mdev->receiver) == Exiting)
861                                 goto out_release_sockets;
862                 }
863
864                 if (sock && msock) {
865                         ok = drbd_socket_okay(mdev, &sock);
866                         ok = drbd_socket_okay(mdev, &msock) && ok;
867                         if (ok)
868                                 break;
869                 }
870         } while (1);
871
872         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
874
875         sock->sk->sk_allocation = GFP_NOIO;
876         msock->sk->sk_allocation = GFP_NOIO;
877
878         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
879         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
880
881         /* NOT YET ...
882          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
883          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
884          * first set it to the P_HAND_SHAKE timeout,
885          * which we set to 4x the configured ping_timeout. */
886         sock->sk->sk_sndtimeo =
887         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
888
889         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
890         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
891
892         /* we don't want delays.
893          * we use TCP_CORK where apropriate, though */
894         drbd_tcp_nodelay(sock);
895         drbd_tcp_nodelay(msock);
896
897         mdev->data.socket = sock;
898         mdev->meta.socket = msock;
899         mdev->last_received = jiffies;
900
901         D_ASSERT(mdev->asender.task == NULL);
902
903         h = drbd_do_handshake(mdev);
904         if (h <= 0)
905                 return h;
906
907         if (mdev->cram_hmac_tfm) {
908                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
909                 switch (drbd_do_auth(mdev)) {
910                 case -1:
911                         dev_err(DEV, "Authentication of peer failed\n");
912                         return -1;
913                 case 0:
914                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
915                         return 0;
916                 }
917         }
918
919         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
920                 return 0;
921
922         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
923         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
924
925         atomic_set(&mdev->packet_seq, 0);
926         mdev->peer_seq = 0;
927
928         drbd_thread_start(&mdev->asender);
929
930         if (!drbd_send_protocol(mdev))
931                 return -1;
932         drbd_send_sync_param(mdev, &mdev->sync_conf);
933         drbd_send_sizes(mdev, 0, 0);
934         drbd_send_uuids(mdev);
935         drbd_send_state(mdev);
936         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
937         clear_bit(RESIZE_PENDING, &mdev->flags);
938
939         return 1;
940
941 out_release_sockets:
942         if (sock)
943                 sock_release(sock);
944         if (msock)
945                 sock_release(msock);
946         return -1;
947 }
948
949 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
950 {
951         int r;
952
953         r = drbd_recv(mdev, h, sizeof(*h));
954
955         if (unlikely(r != sizeof(*h))) {
956                 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
957                 return FALSE;
958         };
959         h->command = be16_to_cpu(h->command);
960         h->length  = be16_to_cpu(h->length);
961         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
962                 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
963                     (long)be32_to_cpu(h->magic),
964                     h->command, h->length);
965                 return FALSE;
966         }
967         mdev->last_received = jiffies;
968
969         return TRUE;
970 }
971
972 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
973 {
974         int rv;
975
976         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
977                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
978                                         NULL, BLKDEV_IFL_WAIT);
979                 if (rv) {
980                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
981                         /* would rather check on EOPNOTSUPP, but that is not reliable.
982                          * don't try again for ANY return value != 0
983                          * if (rv == -EOPNOTSUPP) */
984                         drbd_bump_write_ordering(mdev, WO_drain_io);
985                 }
986                 put_ldev(mdev);
987         }
988
989         return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
990 }
991
992 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
993 {
994         struct flush_work *fw = (struct flush_work *)w;
995         struct drbd_epoch *epoch = fw->epoch;
996
997         kfree(w);
998
999         if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
1000                 drbd_flush_after_epoch(mdev, epoch);
1001
1002         drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1003                               (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1004
1005         return 1;
1006 }
1007
1008 /**
1009  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1010  * @mdev:       DRBD device.
1011  * @epoch:      Epoch object.
1012  * @ev:         Epoch event.
1013  */
1014 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1015                                                struct drbd_epoch *epoch,
1016                                                enum epoch_event ev)
1017 {
1018         int finish, epoch_size;
1019         struct drbd_epoch *next_epoch;
1020         int schedule_flush = 0;
1021         enum finish_epoch rv = FE_STILL_LIVE;
1022
1023         spin_lock(&mdev->epoch_lock);
1024         do {
1025                 next_epoch = NULL;
1026                 finish = 0;
1027
1028                 epoch_size = atomic_read(&epoch->epoch_size);
1029
1030                 switch (ev & ~EV_CLEANUP) {
1031                 case EV_PUT:
1032                         atomic_dec(&epoch->active);
1033                         break;
1034                 case EV_GOT_BARRIER_NR:
1035                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1036
1037                         /* Special case: If we just switched from WO_bio_barrier to
1038                            WO_bdev_flush we should not finish the current epoch */
1039                         if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1040                             mdev->write_ordering != WO_bio_barrier &&
1041                             epoch == mdev->current_epoch)
1042                                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1043                         break;
1044                 case EV_BARRIER_DONE:
1045                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1046                         break;
1047                 case EV_BECAME_LAST:
1048                         /* nothing to do*/
1049                         break;
1050                 }
1051
1052                 if (epoch_size != 0 &&
1053                     atomic_read(&epoch->active) == 0 &&
1054                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1055                     epoch->list.prev == &mdev->current_epoch->list &&
1056                     !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1057                         /* Nearly all conditions are met to finish that epoch... */
1058                         if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1059                             mdev->write_ordering == WO_none ||
1060                             (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1061                             ev & EV_CLEANUP) {
1062                                 finish = 1;
1063                                 set_bit(DE_IS_FINISHING, &epoch->flags);
1064                         } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1065                                  mdev->write_ordering == WO_bio_barrier) {
1066                                 atomic_inc(&epoch->active);
1067                                 schedule_flush = 1;
1068                         }
1069                 }
1070                 if (finish) {
1071                         if (!(ev & EV_CLEANUP)) {
1072                                 spin_unlock(&mdev->epoch_lock);
1073                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1074                                 spin_lock(&mdev->epoch_lock);
1075                         }
1076                         dec_unacked(mdev);
1077
1078                         if (mdev->current_epoch != epoch) {
1079                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1080                                 list_del(&epoch->list);
1081                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1082                                 mdev->epochs--;
1083                                 kfree(epoch);
1084
1085                                 if (rv == FE_STILL_LIVE)
1086                                         rv = FE_DESTROYED;
1087                         } else {
1088                                 epoch->flags = 0;
1089                                 atomic_set(&epoch->epoch_size, 0);
1090                                 /* atomic_set(&epoch->active, 0); is alrady zero */
1091                                 if (rv == FE_STILL_LIVE)
1092                                         rv = FE_RECYCLED;
1093                         }
1094                 }
1095
1096                 if (!next_epoch)
1097                         break;
1098
1099                 epoch = next_epoch;
1100         } while (1);
1101
1102         spin_unlock(&mdev->epoch_lock);
1103
1104         if (schedule_flush) {
1105                 struct flush_work *fw;
1106                 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1107                 if (fw) {
1108                         fw->w.cb = w_flush;
1109                         fw->epoch = epoch;
1110                         drbd_queue_work(&mdev->data.work, &fw->w);
1111                 } else {
1112                         dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1113                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1114                         /* That is not a recursion, only one level */
1115                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1116                         drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1117                 }
1118         }
1119
1120         return rv;
1121 }
1122
1123 /**
1124  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1125  * @mdev:       DRBD device.
1126  * @wo:         Write ordering method to try.
1127  */
1128 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1129 {
1130         enum write_ordering_e pwo;
1131         static char *write_ordering_str[] = {
1132                 [WO_none] = "none",
1133                 [WO_drain_io] = "drain",
1134                 [WO_bdev_flush] = "flush",
1135                 [WO_bio_barrier] = "barrier",
1136         };
1137
1138         pwo = mdev->write_ordering;
1139         wo = min(pwo, wo);
1140         if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1141                 wo = WO_bdev_flush;
1142         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1143                 wo = WO_drain_io;
1144         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1145                 wo = WO_none;
1146         mdev->write_ordering = wo;
1147         if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1148                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1149 }
1150
1151 /**
1152  * drbd_submit_ee()
1153  * @mdev:       DRBD device.
1154  * @e:          epoch entry
1155  * @rw:         flag field, see bio->bi_rw
1156  */
1157 /* TODO allocate from our own bio_set. */
1158 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1159                 const unsigned rw, const int fault_type)
1160 {
1161         struct bio *bios = NULL;
1162         struct bio *bio;
1163         struct page *page = e->pages;
1164         sector_t sector = e->sector;
1165         unsigned ds = e->size;
1166         unsigned n_bios = 0;
1167         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1168
1169         /* In most cases, we will only need one bio.  But in case the lower
1170          * level restrictions happen to be different at this offset on this
1171          * side than those of the sending peer, we may need to submit the
1172          * request in more than one bio. */
1173 next_bio:
1174         bio = bio_alloc(GFP_NOIO, nr_pages);
1175         if (!bio) {
1176                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1177                 goto fail;
1178         }
1179         /* > e->sector, unless this is the first bio */
1180         bio->bi_sector = sector;
1181         bio->bi_bdev = mdev->ldev->backing_bdev;
1182         /* we special case some flags in the multi-bio case, see below
1183          * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1184         bio->bi_rw = rw;
1185         bio->bi_private = e;
1186         bio->bi_end_io = drbd_endio_sec;
1187
1188         bio->bi_next = bios;
1189         bios = bio;
1190         ++n_bios;
1191
1192         page_chain_for_each(page) {
1193                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1194                 if (!bio_add_page(bio, page, len, 0)) {
1195                         /* a single page must always be possible! */
1196                         BUG_ON(bio->bi_vcnt == 0);
1197                         goto next_bio;
1198                 }
1199                 ds -= len;
1200                 sector += len >> 9;
1201                 --nr_pages;
1202         }
1203         D_ASSERT(page == NULL);
1204         D_ASSERT(ds == 0);
1205
1206         atomic_set(&e->pending_bios, n_bios);
1207         do {
1208                 bio = bios;
1209                 bios = bios->bi_next;
1210                 bio->bi_next = NULL;
1211
1212                 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1213                 if (bios)
1214                         bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1215
1216                 drbd_generic_make_request(mdev, fault_type, bio);
1217
1218                 /* strip off BIO_RW_BARRIER,
1219                  * unless it is the first or last bio */
1220                 if (bios && bios->bi_next)
1221                         bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1222         } while (bios);
1223         maybe_kick_lo(mdev);
1224         return 0;
1225
1226 fail:
1227         while (bios) {
1228                 bio = bios;
1229                 bios = bios->bi_next;
1230                 bio_put(bio);
1231         }
1232         return -ENOMEM;
1233 }
1234
1235 /**
1236  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1237  * @mdev:       DRBD device.
1238  * @w:          work object.
1239  * @cancel:     The connection will be closed anyways (unused in this callback)
1240  */
1241 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1242 {
1243         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1244         /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1245            (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1246            so that we can finish that epoch in drbd_may_finish_epoch().
1247            That is necessary if we already have a long chain of Epochs, before
1248            we realize that BIO_RW_BARRIER is actually not supported */
1249
1250         /* As long as the -ENOTSUPP on the barrier is reported immediately
1251            that will never trigger. If it is reported late, we will just
1252            print that warning and continue correctly for all future requests
1253            with WO_bdev_flush */
1254         if (previous_epoch(mdev, e->epoch))
1255                 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1256
1257         /* we still have a local reference,
1258          * get_ldev was done in receive_Data. */
1259
1260         e->w.cb = e_end_block;
1261         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1262                 /* drbd_submit_ee fails for one reason only:
1263                  * if was not able to allocate sufficient bios.
1264                  * requeue, try again later. */
1265                 e->w.cb = w_e_reissue;
1266                 drbd_queue_work(&mdev->data.work, &e->w);
1267         }
1268         return 1;
1269 }
1270
1271 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1272 {
1273         int rv, issue_flush;
1274         struct p_barrier *p = (struct p_barrier *)h;
1275         struct drbd_epoch *epoch;
1276
1277         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1278
1279         rv = drbd_recv(mdev, h->payload, h->length);
1280         ERR_IF(rv != h->length) return FALSE;
1281
1282         inc_unacked(mdev);
1283
1284         if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1285                 drbd_kick_lo(mdev);
1286
1287         mdev->current_epoch->barrier_nr = p->barrier;
1288         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1289
1290         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1291          * the activity log, which means it would not be resynced in case the
1292          * R_PRIMARY crashes now.
1293          * Therefore we must send the barrier_ack after the barrier request was
1294          * completed. */
1295         switch (mdev->write_ordering) {
1296         case WO_bio_barrier:
1297         case WO_none:
1298                 if (rv == FE_RECYCLED)
1299                         return TRUE;
1300                 break;
1301
1302         case WO_bdev_flush:
1303         case WO_drain_io:
1304                 if (rv == FE_STILL_LIVE) {
1305                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1306                         drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1307                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1308                 }
1309                 if (rv == FE_RECYCLED)
1310                         return TRUE;
1311
1312                 /* The asender will send all the ACKs and barrier ACKs out, since
1313                    all EEs moved from the active_ee to the done_ee. We need to
1314                    provide a new epoch object for the EEs that come in soon */
1315                 break;
1316         }
1317
1318         /* receiver context, in the writeout path of the other node.
1319          * avoid potential distributed deadlock */
1320         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1321         if (!epoch) {
1322                 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1323                 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1324                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1325                 if (issue_flush) {
1326                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1327                         if (rv == FE_RECYCLED)
1328                                 return TRUE;
1329                 }
1330
1331                 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1332
1333                 return TRUE;
1334         }
1335
1336         epoch->flags = 0;
1337         atomic_set(&epoch->epoch_size, 0);
1338         atomic_set(&epoch->active, 0);
1339
1340         spin_lock(&mdev->epoch_lock);
1341         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1342                 list_add(&epoch->list, &mdev->current_epoch->list);
1343                 mdev->current_epoch = epoch;
1344                 mdev->epochs++;
1345         } else {
1346                 /* The current_epoch got recycled while we allocated this one... */
1347                 kfree(epoch);
1348         }
1349         spin_unlock(&mdev->epoch_lock);
1350
1351         return TRUE;
1352 }
1353
1354 /* used from receive_RSDataReply (recv_resync_read)
1355  * and from receive_Data */
1356 static struct drbd_epoch_entry *
1357 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1358 {
1359         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1360         struct drbd_epoch_entry *e;
1361         struct page *page;
1362         int dgs, ds, rr;
1363         void *dig_in = mdev->int_dig_in;
1364         void *dig_vv = mdev->int_dig_vv;
1365         unsigned long *data;
1366
1367         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1368                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1369
1370         if (dgs) {
1371                 rr = drbd_recv(mdev, dig_in, dgs);
1372                 if (rr != dgs) {
1373                         dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1374                              rr, dgs);
1375                         return NULL;
1376                 }
1377         }
1378
1379         data_size -= dgs;
1380
1381         ERR_IF(data_size &  0x1ff) return NULL;
1382         ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1383
1384         /* even though we trust out peer,
1385          * we sometimes have to double check. */
1386         if (sector + (data_size>>9) > capacity) {
1387                 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1388                         (unsigned long long)capacity,
1389                         (unsigned long long)sector, data_size);
1390                 return NULL;
1391         }
1392
1393         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1394          * "criss-cross" setup, that might cause write-out on some other DRBD,
1395          * which in turn might block on the other node at this very place.  */
1396         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1397         if (!e)
1398                 return NULL;
1399
1400         ds = data_size;
1401         page = e->pages;
1402         page_chain_for_each(page) {
1403                 unsigned len = min_t(int, ds, PAGE_SIZE);
1404                 data = kmap(page);
1405                 rr = drbd_recv(mdev, data, len);
1406                 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1407                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1408                         data[0] = data[0] ^ (unsigned long)-1;
1409                 }
1410                 kunmap(page);
1411                 if (rr != len) {
1412                         drbd_free_ee(mdev, e);
1413                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1414                              rr, len);
1415                         return NULL;
1416                 }
1417                 ds -= rr;
1418         }
1419
1420         if (dgs) {
1421                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1422                 if (memcmp(dig_in, dig_vv, dgs)) {
1423                         dev_err(DEV, "Digest integrity check FAILED.\n");
1424                         drbd_bcast_ee(mdev, "digest failed",
1425                                         dgs, dig_in, dig_vv, e);
1426                         drbd_free_ee(mdev, e);
1427                         return NULL;
1428                 }
1429         }
1430         mdev->recv_cnt += data_size>>9;
1431         return e;
1432 }
1433
1434 /* drbd_drain_block() just takes a data block
1435  * out of the socket input buffer, and discards it.
1436  */
1437 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1438 {
1439         struct page *page;
1440         int rr, rv = 1;
1441         void *data;
1442
1443         if (!data_size)
1444                 return TRUE;
1445
1446         page = drbd_pp_alloc(mdev, 1, 1);
1447
1448         data = kmap(page);
1449         while (data_size) {
1450                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1451                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1452                         rv = 0;
1453                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1454                              rr, min_t(int, data_size, PAGE_SIZE));
1455                         break;
1456                 }
1457                 data_size -= rr;
1458         }
1459         kunmap(page);
1460         drbd_pp_free(mdev, page);
1461         return rv;
1462 }
1463
1464 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1465                            sector_t sector, int data_size)
1466 {
1467         struct bio_vec *bvec;
1468         struct bio *bio;
1469         int dgs, rr, i, expect;
1470         void *dig_in = mdev->int_dig_in;
1471         void *dig_vv = mdev->int_dig_vv;
1472
1473         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1474                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1475
1476         if (dgs) {
1477                 rr = drbd_recv(mdev, dig_in, dgs);
1478                 if (rr != dgs) {
1479                         dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1480                              rr, dgs);
1481                         return 0;
1482                 }
1483         }
1484
1485         data_size -= dgs;
1486
1487         /* optimistically update recv_cnt.  if receiving fails below,
1488          * we disconnect anyways, and counters will be reset. */
1489         mdev->recv_cnt += data_size>>9;
1490
1491         bio = req->master_bio;
1492         D_ASSERT(sector == bio->bi_sector);
1493
1494         bio_for_each_segment(bvec, bio, i) {
1495                 expect = min_t(int, data_size, bvec->bv_len);
1496                 rr = drbd_recv(mdev,
1497                              kmap(bvec->bv_page)+bvec->bv_offset,
1498                              expect);
1499                 kunmap(bvec->bv_page);
1500                 if (rr != expect) {
1501                         dev_warn(DEV, "short read receiving data reply: "
1502                              "read %d expected %d\n",
1503                              rr, expect);
1504                         return 0;
1505                 }
1506                 data_size -= rr;
1507         }
1508
1509         if (dgs) {
1510                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1511                 if (memcmp(dig_in, dig_vv, dgs)) {
1512                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1513                         return 0;
1514                 }
1515         }
1516
1517         D_ASSERT(data_size == 0);
1518         return 1;
1519 }
1520
1521 /* e_end_resync_block() is called via
1522  * drbd_process_done_ee() by asender only */
1523 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1524 {
1525         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1526         sector_t sector = e->sector;
1527         int ok;
1528
1529         D_ASSERT(hlist_unhashed(&e->colision));
1530
1531         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1532                 drbd_set_in_sync(mdev, sector, e->size);
1533                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1534         } else {
1535                 /* Record failure to sync */
1536                 drbd_rs_failed_io(mdev, sector, e->size);
1537
1538                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1539         }
1540         dec_unacked(mdev);
1541
1542         return ok;
1543 }
1544
1545 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1546 {
1547         struct drbd_epoch_entry *e;
1548
1549         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1550         if (!e)
1551                 goto fail;
1552
1553         dec_rs_pending(mdev);
1554
1555         inc_unacked(mdev);
1556         /* corresponding dec_unacked() in e_end_resync_block()
1557          * respective _drbd_clear_done_ee */
1558
1559         e->w.cb = e_end_resync_block;
1560
1561         spin_lock_irq(&mdev->req_lock);
1562         list_add(&e->w.list, &mdev->sync_ee);
1563         spin_unlock_irq(&mdev->req_lock);
1564
1565         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1566                 return TRUE;
1567
1568         drbd_free_ee(mdev, e);
1569 fail:
1570         put_ldev(mdev);
1571         return FALSE;
1572 }
1573
1574 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1575 {
1576         struct drbd_request *req;
1577         sector_t sector;
1578         unsigned int header_size, data_size;
1579         int ok;
1580         struct p_data *p = (struct p_data *)h;
1581
1582         header_size = sizeof(*p) - sizeof(*h);
1583         data_size   = h->length  - header_size;
1584
1585         ERR_IF(data_size == 0) return FALSE;
1586
1587         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1588                 return FALSE;
1589
1590         sector = be64_to_cpu(p->sector);
1591
1592         spin_lock_irq(&mdev->req_lock);
1593         req = _ar_id_to_req(mdev, p->block_id, sector);
1594         spin_unlock_irq(&mdev->req_lock);
1595         if (unlikely(!req)) {
1596                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1597                 return FALSE;
1598         }
1599
1600         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1601          * special casing it there for the various failure cases.
1602          * still no race with drbd_fail_pending_reads */
1603         ok = recv_dless_read(mdev, req, sector, data_size);
1604
1605         if (ok)
1606                 req_mod(req, data_received);
1607         /* else: nothing. handled from drbd_disconnect...
1608          * I don't think we may complete this just yet
1609          * in case we are "on-disconnect: freeze" */
1610
1611         return ok;
1612 }
1613
1614 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1615 {
1616         sector_t sector;
1617         unsigned int header_size, data_size;
1618         int ok;
1619         struct p_data *p = (struct p_data *)h;
1620
1621         header_size = sizeof(*p) - sizeof(*h);
1622         data_size   = h->length  - header_size;
1623
1624         ERR_IF(data_size == 0) return FALSE;
1625
1626         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1627                 return FALSE;
1628
1629         sector = be64_to_cpu(p->sector);
1630         D_ASSERT(p->block_id == ID_SYNCER);
1631
1632         if (get_ldev(mdev)) {
1633                 /* data is submitted to disk within recv_resync_read.
1634                  * corresponding put_ldev done below on error,
1635                  * or in drbd_endio_write_sec. */
1636                 ok = recv_resync_read(mdev, sector, data_size);
1637         } else {
1638                 if (__ratelimit(&drbd_ratelimit_state))
1639                         dev_err(DEV, "Can not write resync data to local disk.\n");
1640
1641                 ok = drbd_drain_block(mdev, data_size);
1642
1643                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1644         }
1645
1646         return ok;
1647 }
1648
1649 /* e_end_block() is called via drbd_process_done_ee().
1650  * this means this function only runs in the asender thread
1651  */
1652 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1653 {
1654         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1655         sector_t sector = e->sector;
1656         struct drbd_epoch *epoch;
1657         int ok = 1, pcmd;
1658
1659         if (e->flags & EE_IS_BARRIER) {
1660                 epoch = previous_epoch(mdev, e->epoch);
1661                 if (epoch)
1662                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1663         }
1664
1665         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1666                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1667                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1668                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1669                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1670                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1671                         ok &= drbd_send_ack(mdev, pcmd, e);
1672                         if (pcmd == P_RS_WRITE_ACK)
1673                                 drbd_set_in_sync(mdev, sector, e->size);
1674                 } else {
1675                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1676                         /* we expect it to be marked out of sync anyways...
1677                          * maybe assert this?  */
1678                 }
1679                 dec_unacked(mdev);
1680         }
1681         /* we delete from the conflict detection hash _after_ we sent out the
1682          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1683         if (mdev->net_conf->two_primaries) {
1684                 spin_lock_irq(&mdev->req_lock);
1685                 D_ASSERT(!hlist_unhashed(&e->colision));
1686                 hlist_del_init(&e->colision);
1687                 spin_unlock_irq(&mdev->req_lock);
1688         } else {
1689                 D_ASSERT(hlist_unhashed(&e->colision));
1690         }
1691
1692         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1693
1694         return ok;
1695 }
1696
1697 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1698 {
1699         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1700         int ok = 1;
1701
1702         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1703         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1704
1705         spin_lock_irq(&mdev->req_lock);
1706         D_ASSERT(!hlist_unhashed(&e->colision));
1707         hlist_del_init(&e->colision);
1708         spin_unlock_irq(&mdev->req_lock);
1709
1710         dec_unacked(mdev);
1711
1712         return ok;
1713 }
1714
1715 /* Called from receive_Data.
1716  * Synchronize packets on sock with packets on msock.
1717  *
1718  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1719  * packet traveling on msock, they are still processed in the order they have
1720  * been sent.
1721  *
1722  * Note: we don't care for Ack packets overtaking P_DATA packets.
1723  *
1724  * In case packet_seq is larger than mdev->peer_seq number, there are
1725  * outstanding packets on the msock. We wait for them to arrive.
1726  * In case we are the logically next packet, we update mdev->peer_seq
1727  * ourselves. Correctly handles 32bit wrap around.
1728  *
1729  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1730  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1731  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1732  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1733  *
1734  * returns 0 if we may process the packet,
1735  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1736 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1737 {
1738         DEFINE_WAIT(wait);
1739         unsigned int p_seq;
1740         long timeout;
1741         int ret = 0;
1742         spin_lock(&mdev->peer_seq_lock);
1743         for (;;) {
1744                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1745                 if (seq_le(packet_seq, mdev->peer_seq+1))
1746                         break;
1747                 if (signal_pending(current)) {
1748                         ret = -ERESTARTSYS;
1749                         break;
1750                 }
1751                 p_seq = mdev->peer_seq;
1752                 spin_unlock(&mdev->peer_seq_lock);
1753                 timeout = schedule_timeout(30*HZ);
1754                 spin_lock(&mdev->peer_seq_lock);
1755                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1756                         ret = -ETIMEDOUT;
1757                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1758                         break;
1759                 }
1760         }
1761         finish_wait(&mdev->seq_wait, &wait);
1762         if (mdev->peer_seq+1 == packet_seq)
1763                 mdev->peer_seq++;
1764         spin_unlock(&mdev->peer_seq_lock);
1765         return ret;
1766 }
1767
1768 /* mirrored write */
1769 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1770 {
1771         sector_t sector;
1772         struct drbd_epoch_entry *e;
1773         struct p_data *p = (struct p_data *)h;
1774         int header_size, data_size;
1775         int rw = WRITE;
1776         u32 dp_flags;
1777
1778         header_size = sizeof(*p) - sizeof(*h);
1779         data_size   = h->length  - header_size;
1780
1781         ERR_IF(data_size == 0) return FALSE;
1782
1783         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1784                 return FALSE;
1785
1786         if (!get_ldev(mdev)) {
1787                 if (__ratelimit(&drbd_ratelimit_state))
1788                         dev_err(DEV, "Can not write mirrored data block "
1789                             "to local disk.\n");
1790                 spin_lock(&mdev->peer_seq_lock);
1791                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1792                         mdev->peer_seq++;
1793                 spin_unlock(&mdev->peer_seq_lock);
1794
1795                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1796                 atomic_inc(&mdev->current_epoch->epoch_size);
1797                 return drbd_drain_block(mdev, data_size);
1798         }
1799
1800         /* get_ldev(mdev) successful.
1801          * Corresponding put_ldev done either below (on various errors),
1802          * or in drbd_endio_write_sec, if we successfully submit the data at
1803          * the end of this function. */
1804
1805         sector = be64_to_cpu(p->sector);
1806         e = read_in_block(mdev, p->block_id, sector, data_size);
1807         if (!e) {
1808                 put_ldev(mdev);
1809                 return FALSE;
1810         }
1811
1812         e->w.cb = e_end_block;
1813
1814         spin_lock(&mdev->epoch_lock);
1815         e->epoch = mdev->current_epoch;
1816         atomic_inc(&e->epoch->epoch_size);
1817         atomic_inc(&e->epoch->active);
1818
1819         if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1820                 struct drbd_epoch *epoch;
1821                 /* Issue a barrier if we start a new epoch, and the previous epoch
1822                    was not a epoch containing a single request which already was
1823                    a Barrier. */
1824                 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1825                 if (epoch == e->epoch) {
1826                         set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1827                         rw |= (1<<BIO_RW_BARRIER);
1828                         e->flags |= EE_IS_BARRIER;
1829                 } else {
1830                         if (atomic_read(&epoch->epoch_size) > 1 ||
1831                             !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1832                                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1833                                 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1834                                 rw |= (1<<BIO_RW_BARRIER);
1835                                 e->flags |= EE_IS_BARRIER;
1836                         }
1837                 }
1838         }
1839         spin_unlock(&mdev->epoch_lock);
1840
1841         dp_flags = be32_to_cpu(p->dp_flags);
1842         if (dp_flags & DP_HARDBARRIER) {
1843                 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1844                 /* rw |= (1<<BIO_RW_BARRIER); */
1845         }
1846         if (dp_flags & DP_RW_SYNC)
1847                 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1848         if (dp_flags & DP_MAY_SET_IN_SYNC)
1849                 e->flags |= EE_MAY_SET_IN_SYNC;
1850
1851         /* I'm the receiver, I do hold a net_cnt reference. */
1852         if (!mdev->net_conf->two_primaries) {
1853                 spin_lock_irq(&mdev->req_lock);
1854         } else {
1855                 /* don't get the req_lock yet,
1856                  * we may sleep in drbd_wait_peer_seq */
1857                 const int size = e->size;
1858                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1859                 DEFINE_WAIT(wait);
1860                 struct drbd_request *i;
1861                 struct hlist_node *n;
1862                 struct hlist_head *slot;
1863                 int first;
1864
1865                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1866                 BUG_ON(mdev->ee_hash == NULL);
1867                 BUG_ON(mdev->tl_hash == NULL);
1868
1869                 /* conflict detection and handling:
1870                  * 1. wait on the sequence number,
1871                  *    in case this data packet overtook ACK packets.
1872                  * 2. check our hash tables for conflicting requests.
1873                  *    we only need to walk the tl_hash, since an ee can not
1874                  *    have a conflict with an other ee: on the submitting
1875                  *    node, the corresponding req had already been conflicting,
1876                  *    and a conflicting req is never sent.
1877                  *
1878                  * Note: for two_primaries, we are protocol C,
1879                  * so there cannot be any request that is DONE
1880                  * but still on the transfer log.
1881                  *
1882                  * unconditionally add to the ee_hash.
1883                  *
1884                  * if no conflicting request is found:
1885                  *    submit.
1886                  *
1887                  * if any conflicting request is found
1888                  * that has not yet been acked,
1889                  * AND I have the "discard concurrent writes" flag:
1890                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1891                  *
1892                  * if any conflicting request is found:
1893                  *       block the receiver, waiting on misc_wait
1894                  *       until no more conflicting requests are there,
1895                  *       or we get interrupted (disconnect).
1896                  *
1897                  *       we do not just write after local io completion of those
1898                  *       requests, but only after req is done completely, i.e.
1899                  *       we wait for the P_DISCARD_ACK to arrive!
1900                  *
1901                  *       then proceed normally, i.e. submit.
1902                  */
1903                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1904                         goto out_interrupted;
1905
1906                 spin_lock_irq(&mdev->req_lock);
1907
1908                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1909
1910 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1911                 slot = tl_hash_slot(mdev, sector);
1912                 first = 1;
1913                 for (;;) {
1914                         int have_unacked = 0;
1915                         int have_conflict = 0;
1916                         prepare_to_wait(&mdev->misc_wait, &wait,
1917                                 TASK_INTERRUPTIBLE);
1918                         hlist_for_each_entry(i, n, slot, colision) {
1919                                 if (OVERLAPS) {
1920                                         /* only ALERT on first iteration,
1921                                          * we may be woken up early... */
1922                                         if (first)
1923                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1924                                                       " new: %llus +%u; pending: %llus +%u\n",
1925                                                       current->comm, current->pid,
1926                                                       (unsigned long long)sector, size,
1927                                                       (unsigned long long)i->sector, i->size);
1928                                         if (i->rq_state & RQ_NET_PENDING)
1929                                                 ++have_unacked;
1930                                         ++have_conflict;
1931                                 }
1932                         }
1933 #undef OVERLAPS
1934                         if (!have_conflict)
1935                                 break;
1936
1937                         /* Discard Ack only for the _first_ iteration */
1938                         if (first && discard && have_unacked) {
1939                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1940                                      (unsigned long long)sector);
1941                                 inc_unacked(mdev);
1942                                 e->w.cb = e_send_discard_ack;
1943                                 list_add_tail(&e->w.list, &mdev->done_ee);
1944
1945                                 spin_unlock_irq(&mdev->req_lock);
1946
1947                                 /* we could probably send that P_DISCARD_ACK ourselves,
1948                                  * but I don't like the receiver using the msock */
1949
1950                                 put_ldev(mdev);
1951                                 wake_asender(mdev);
1952                                 finish_wait(&mdev->misc_wait, &wait);
1953                                 return TRUE;
1954                         }
1955
1956                         if (signal_pending(current)) {
1957                                 hlist_del_init(&e->colision);
1958
1959                                 spin_unlock_irq(&mdev->req_lock);
1960
1961                                 finish_wait(&mdev->misc_wait, &wait);
1962                                 goto out_interrupted;
1963                         }
1964
1965                         spin_unlock_irq(&mdev->req_lock);
1966                         if (first) {
1967                                 first = 0;
1968                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1969                                      "sec=%llus\n", (unsigned long long)sector);
1970                         } else if (discard) {
1971                                 /* we had none on the first iteration.
1972                                  * there must be none now. */
1973                                 D_ASSERT(have_unacked == 0);
1974                         }
1975                         schedule();
1976                         spin_lock_irq(&mdev->req_lock);
1977                 }
1978                 finish_wait(&mdev->misc_wait, &wait);
1979         }
1980
1981         list_add(&e->w.list, &mdev->active_ee);
1982         spin_unlock_irq(&mdev->req_lock);
1983
1984         switch (mdev->net_conf->wire_protocol) {
1985         case DRBD_PROT_C:
1986                 inc_unacked(mdev);
1987                 /* corresponding dec_unacked() in e_end_block()
1988                  * respective _drbd_clear_done_ee */
1989                 break;
1990         case DRBD_PROT_B:
1991                 /* I really don't like it that the receiver thread
1992                  * sends on the msock, but anyways */
1993                 drbd_send_ack(mdev, P_RECV_ACK, e);
1994                 break;
1995         case DRBD_PROT_A:
1996                 /* nothing to do */
1997                 break;
1998         }
1999
2000         if (mdev->state.pdsk == D_DISKLESS) {
2001                 /* In case we have the only disk of the cluster, */
2002                 drbd_set_out_of_sync(mdev, e->sector, e->size);
2003                 e->flags |= EE_CALL_AL_COMPLETE_IO;
2004                 drbd_al_begin_io(mdev, e->sector);
2005         }
2006
2007         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2008                 return TRUE;
2009
2010 out_interrupted:
2011         /* yes, the epoch_size now is imbalanced.
2012          * but we drop the connection anyways, so we don't have a chance to
2013          * receive a barrier... atomic_inc(&mdev->epoch_size); */
2014         put_ldev(mdev);
2015         drbd_free_ee(mdev, e);
2016         return FALSE;
2017 }
2018
2019 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2020 {
2021         sector_t sector;
2022         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2023         struct drbd_epoch_entry *e;
2024         struct digest_info *di = NULL;
2025         int size, digest_size;
2026         unsigned int fault_type;
2027         struct p_block_req *p =
2028                 (struct p_block_req *)h;
2029         const int brps = sizeof(*p)-sizeof(*h);
2030
2031         if (drbd_recv(mdev, h->payload, brps) != brps)
2032                 return FALSE;
2033
2034         sector = be64_to_cpu(p->sector);
2035         size   = be32_to_cpu(p->blksize);
2036
2037         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2038                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2039                                 (unsigned long long)sector, size);
2040                 return FALSE;
2041         }
2042         if (sector + (size>>9) > capacity) {
2043                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2044                                 (unsigned long long)sector, size);
2045                 return FALSE;
2046         }
2047
2048         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2049                 if (__ratelimit(&drbd_ratelimit_state))
2050                         dev_err(DEV, "Can not satisfy peer's read request, "
2051                             "no local data.\n");
2052                 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2053                                  P_NEG_RS_DREPLY , p);
2054                 return drbd_drain_block(mdev, h->length - brps);
2055         }
2056
2057         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2058          * "criss-cross" setup, that might cause write-out on some other DRBD,
2059          * which in turn might block on the other node at this very place.  */
2060         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2061         if (!e) {
2062                 put_ldev(mdev);
2063                 return FALSE;
2064         }
2065
2066         switch (h->command) {
2067         case P_DATA_REQUEST:
2068                 e->w.cb = w_e_end_data_req;
2069                 fault_type = DRBD_FAULT_DT_RD;
2070                 break;
2071         case P_RS_DATA_REQUEST:
2072                 e->w.cb = w_e_end_rsdata_req;
2073                 fault_type = DRBD_FAULT_RS_RD;
2074                 /* Eventually this should become asynchronously. Currently it
2075                  * blocks the whole receiver just to delay the reading of a
2076                  * resync data block.
2077                  * the drbd_work_queue mechanism is made for this...
2078                  */
2079                 if (!drbd_rs_begin_io(mdev, sector)) {
2080                         /* we have been interrupted,
2081                          * probably connection lost! */
2082                         D_ASSERT(signal_pending(current));
2083                         goto out_free_e;
2084                 }
2085                 break;
2086
2087         case P_OV_REPLY:
2088         case P_CSUM_RS_REQUEST:
2089                 fault_type = DRBD_FAULT_RS_RD;
2090                 digest_size = h->length - brps ;
2091                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2092                 if (!di)
2093                         goto out_free_e;
2094
2095                 di->digest_size = digest_size;
2096                 di->digest = (((char *)di)+sizeof(struct digest_info));
2097
2098                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2099                         goto out_free_e;
2100
2101                 e->block_id = (u64)(unsigned long)di;
2102                 if (h->command == P_CSUM_RS_REQUEST) {
2103                         D_ASSERT(mdev->agreed_pro_version >= 89);
2104                         e->w.cb = w_e_end_csum_rs_req;
2105                 } else if (h->command == P_OV_REPLY) {
2106                         e->w.cb = w_e_end_ov_reply;
2107                         dec_rs_pending(mdev);
2108                         break;
2109                 }
2110
2111                 if (!drbd_rs_begin_io(mdev, sector)) {
2112                         /* we have been interrupted, probably connection lost! */
2113                         D_ASSERT(signal_pending(current));
2114                         goto out_free_e;
2115                 }
2116                 break;
2117
2118         case P_OV_REQUEST:
2119                 if (mdev->state.conn >= C_CONNECTED &&
2120                     mdev->state.conn != C_VERIFY_T)
2121                         dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2122                                 drbd_conn_str(mdev->state.conn));
2123                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2124                     mdev->agreed_pro_version >= 90) {
2125                         mdev->ov_start_sector = sector;
2126                         mdev->ov_position = sector;
2127                         mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2128                         dev_info(DEV, "Online Verify start sector: %llu\n",
2129                                         (unsigned long long)sector);
2130                 }
2131                 e->w.cb = w_e_end_ov_req;
2132                 fault_type = DRBD_FAULT_RS_RD;
2133                 /* Eventually this should become asynchronous. Currently it
2134                  * blocks the whole receiver just to delay the reading of a
2135                  * resync data block.
2136                  * the drbd_work_queue mechanism is made for this...
2137                  */
2138                 if (!drbd_rs_begin_io(mdev, sector)) {
2139                         /* we have been interrupted,
2140                          * probably connection lost! */
2141                         D_ASSERT(signal_pending(current));
2142                         goto out_free_e;
2143                 }
2144                 break;
2145
2146
2147         default:
2148                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2149                     cmdname(h->command));
2150                 fault_type = DRBD_FAULT_MAX;
2151         }
2152
2153         spin_lock_irq(&mdev->req_lock);
2154         list_add(&e->w.list, &mdev->read_ee);
2155         spin_unlock_irq(&mdev->req_lock);
2156
2157         inc_unacked(mdev);
2158
2159         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2160                 return TRUE;
2161
2162 out_free_e:
2163         kfree(di);
2164         put_ldev(mdev);
2165         drbd_free_ee(mdev, e);
2166         return FALSE;
2167 }
2168
2169 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2170 {
2171         int self, peer, rv = -100;
2172         unsigned long ch_self, ch_peer;
2173
2174         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2175         peer = mdev->p_uuid[UI_BITMAP] & 1;
2176
2177         ch_peer = mdev->p_uuid[UI_SIZE];
2178         ch_self = mdev->comm_bm_set;
2179
2180         switch (mdev->net_conf->after_sb_0p) {
2181         case ASB_CONSENSUS:
2182         case ASB_DISCARD_SECONDARY:
2183         case ASB_CALL_HELPER:
2184                 dev_err(DEV, "Configuration error.\n");
2185                 break;
2186         case ASB_DISCONNECT:
2187                 break;
2188         case ASB_DISCARD_YOUNGER_PRI:
2189                 if (self == 0 && peer == 1) {
2190                         rv = -1;
2191                         break;
2192                 }
2193                 if (self == 1 && peer == 0) {
2194                         rv =  1;
2195                         break;
2196                 }
2197                 /* Else fall through to one of the other strategies... */
2198         case ASB_DISCARD_OLDER_PRI:
2199                 if (self == 0 && peer == 1) {
2200                         rv = 1;
2201                         break;
2202                 }
2203                 if (self == 1 && peer == 0) {
2204                         rv = -1;
2205                         break;
2206                 }
2207                 /* Else fall through to one of the other strategies... */
2208                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2209                      "Using discard-least-changes instead\n");
2210         case ASB_DISCARD_ZERO_CHG:
2211                 if (ch_peer == 0 && ch_self == 0) {
2212                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2213                                 ? -1 : 1;
2214                         break;
2215                 } else {
2216                         if (ch_peer == 0) { rv =  1; break; }
2217                         if (ch_self == 0) { rv = -1; break; }
2218                 }
2219                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2220                         break;
2221         case ASB_DISCARD_LEAST_CHG:
2222                 if      (ch_self < ch_peer)
2223                         rv = -1;
2224                 else if (ch_self > ch_peer)
2225                         rv =  1;
2226                 else /* ( ch_self == ch_peer ) */
2227                      /* Well, then use something else. */
2228                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2229                                 ? -1 : 1;
2230                 break;
2231         case ASB_DISCARD_LOCAL:
2232                 rv = -1;
2233                 break;
2234         case ASB_DISCARD_REMOTE:
2235                 rv =  1;
2236         }
2237
2238         return rv;
2239 }
2240
2241 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2242 {
2243         int self, peer, hg, rv = -100;
2244
2245         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2246         peer = mdev->p_uuid[UI_BITMAP] & 1;
2247
2248         switch (mdev->net_conf->after_sb_1p) {
2249         case ASB_DISCARD_YOUNGER_PRI:
2250         case ASB_DISCARD_OLDER_PRI:
2251         case ASB_DISCARD_LEAST_CHG:
2252         case ASB_DISCARD_LOCAL:
2253         case ASB_DISCARD_REMOTE:
2254                 dev_err(DEV, "Configuration error.\n");
2255                 break;
2256         case ASB_DISCONNECT:
2257                 break;
2258         case ASB_CONSENSUS:
2259                 hg = drbd_asb_recover_0p(mdev);
2260                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2261                         rv = hg;
2262                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2263                         rv = hg;
2264                 break;
2265         case ASB_VIOLENTLY:
2266                 rv = drbd_asb_recover_0p(mdev);
2267                 break;
2268         case ASB_DISCARD_SECONDARY:
2269                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2270         case ASB_CALL_HELPER:
2271                 hg = drbd_asb_recover_0p(mdev);
2272                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2273                         self = drbd_set_role(mdev, R_SECONDARY, 0);
2274                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2275                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2276                           * we do not need to wait for the after state change work either. */
2277                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2278                         if (self != SS_SUCCESS) {
2279                                 drbd_khelper(mdev, "pri-lost-after-sb");
2280                         } else {
2281                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2282                                 rv = hg;
2283                         }
2284                 } else
2285                         rv = hg;
2286         }
2287
2288         return rv;
2289 }
2290
2291 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2292 {
2293         int self, peer, hg, rv = -100;
2294
2295         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2296         peer = mdev->p_uuid[UI_BITMAP] & 1;
2297
2298         switch (mdev->net_conf->after_sb_2p) {
2299         case ASB_DISCARD_YOUNGER_PRI:
2300         case ASB_DISCARD_OLDER_PRI:
2301         case ASB_DISCARD_LEAST_CHG:
2302         case ASB_DISCARD_LOCAL:
2303         case ASB_DISCARD_REMOTE:
2304         case ASB_CONSENSUS:
2305         case ASB_DISCARD_SECONDARY:
2306                 dev_err(DEV, "Configuration error.\n");
2307                 break;
2308         case ASB_VIOLENTLY:
2309                 rv = drbd_asb_recover_0p(mdev);
2310                 break;
2311         case ASB_DISCONNECT:
2312                 break;
2313         case ASB_CALL_HELPER:
2314                 hg = drbd_asb_recover_0p(mdev);
2315                 if (hg == -1) {
2316                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2318                           * we do not need to wait for the after state change work either. */
2319                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320                         if (self != SS_SUCCESS) {
2321                                 drbd_khelper(mdev, "pri-lost-after-sb");
2322                         } else {
2323                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2324                                 rv = hg;
2325                         }
2326                 } else
2327                         rv = hg;
2328         }
2329
2330         return rv;
2331 }
2332
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334                            u64 bits, u64 flags)
2335 {
2336         if (!uuid) {
2337                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338                 return;
2339         }
2340         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341              text,
2342              (unsigned long long)uuid[UI_CURRENT],
2343              (unsigned long long)uuid[UI_BITMAP],
2344              (unsigned long long)uuid[UI_HISTORY_START],
2345              (unsigned long long)uuid[UI_HISTORY_END],
2346              (unsigned long long)bits,
2347              (unsigned long long)flags);
2348 }
2349
2350 /*
2351   100   after split brain try auto recover
2352     2   C_SYNC_SOURCE set BitMap
2353     1   C_SYNC_SOURCE use BitMap
2354     0   no Sync
2355    -1   C_SYNC_TARGET use BitMap
2356    -2   C_SYNC_TARGET set BitMap
2357  -100   after split brain, disconnect
2358 -1000   unrelated data
2359  */
2360 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2361 {
2362         u64 self, peer;
2363         int i, j;
2364
2365         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2366         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2367
2368         *rule_nr = 10;
2369         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2370                 return 0;
2371
2372         *rule_nr = 20;
2373         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2374              peer != UUID_JUST_CREATED)
2375                 return -2;
2376
2377         *rule_nr = 30;
2378         if (self != UUID_JUST_CREATED &&
2379             (peer == UUID_JUST_CREATED || peer == (u64)0))
2380                 return 2;
2381
2382         if (self == peer) {
2383                 int rct, dc; /* roles at crash time */
2384
2385                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2386
2387                         if (mdev->agreed_pro_version < 91)
2388                                 return -1001;
2389
2390                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2391                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2392                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2393                                 drbd_uuid_set_bm(mdev, 0UL);
2394
2395                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2396                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2397                                 *rule_nr = 34;
2398                         } else {
2399                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2400                                 *rule_nr = 36;
2401                         }
2402
2403                         return 1;
2404                 }
2405
2406                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2407
2408                         if (mdev->agreed_pro_version < 91)
2409                                 return -1001;
2410
2411                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2412                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2413                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2414
2415                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2416                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2417                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2418
2419                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2420                                 *rule_nr = 35;
2421                         } else {
2422                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2423                                 *rule_nr = 37;
2424                         }
2425
2426                         return -1;
2427                 }
2428
2429                 /* Common power [off|failure] */
2430                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2431                         (mdev->p_uuid[UI_FLAGS] & 2);
2432                 /* lowest bit is set when we were primary,
2433                  * next bit (weight 2) is set when peer was primary */
2434                 *rule_nr = 40;
2435
2436                 switch (rct) {
2437                 case 0: /* !self_pri && !peer_pri */ return 0;
2438                 case 1: /*  self_pri && !peer_pri */ return 1;
2439                 case 2: /* !self_pri &&  peer_pri */ return -1;
2440                 case 3: /*  self_pri &&  peer_pri */
2441                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2442                         return dc ? -1 : 1;
2443                 }
2444         }
2445
2446         *rule_nr = 50;
2447         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2448         if (self == peer)
2449                 return -1;
2450
2451         *rule_nr = 51;
2452         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2453         if (self == peer) {
2454                 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2455                 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2456                 if (self == peer) {
2457                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2458                            resync as sync source modifications of the peer's UUIDs. */
2459
2460                         if (mdev->agreed_pro_version < 91)
2461                                 return -1001;
2462
2463                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2464                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2465                         return -1;
2466                 }
2467         }
2468
2469         *rule_nr = 60;
2470         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2471         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472                 peer = mdev->p_uuid[i] & ~((u64)1);
2473                 if (self == peer)
2474                         return -2;
2475         }
2476
2477         *rule_nr = 70;
2478         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2480         if (self == peer)
2481                 return 1;
2482
2483         *rule_nr = 71;
2484         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2485         if (self == peer) {
2486                 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2487                 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2488                 if (self == peer) {
2489                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2490                            resync as sync source modifications of our UUIDs. */
2491
2492                         if (mdev->agreed_pro_version < 91)
2493                                 return -1001;
2494
2495                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2496                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2497
2498                         dev_info(DEV, "Undid last start of resync:\n");
2499
2500                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2501                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2502
2503                         return 1;
2504                 }
2505         }
2506
2507
2508         *rule_nr = 80;
2509         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2510         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2511                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2512                 if (self == peer)
2513                         return 2;
2514         }
2515
2516         *rule_nr = 90;
2517         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2518         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2519         if (self == peer && self != ((u64)0))
2520                 return 100;
2521
2522         *rule_nr = 100;
2523         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2524                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2525                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2526                         peer = mdev->p_uuid[j] & ~((u64)1);
2527                         if (self == peer)
2528                                 return -100;
2529                 }
2530         }
2531
2532         return -1000;
2533 }
2534
2535 /* drbd_sync_handshake() returns the new conn state on success, or
2536    CONN_MASK (-1) on failure.
2537  */
2538 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2539                                            enum drbd_disk_state peer_disk) __must_hold(local)
2540 {
2541         int hg, rule_nr;
2542         enum drbd_conns rv = C_MASK;
2543         enum drbd_disk_state mydisk;
2544
2545         mydisk = mdev->state.disk;
2546         if (mydisk == D_NEGOTIATING)
2547                 mydisk = mdev->new_state_tmp.disk;
2548
2549         dev_info(DEV, "drbd_sync_handshake:\n");
2550         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2551         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2552                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2553
2554         hg = drbd_uuid_compare(mdev, &rule_nr);
2555
2556         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2557
2558         if (hg == -1000) {
2559                 dev_alert(DEV, "Unrelated data, aborting!\n");
2560                 return C_MASK;
2561         }
2562         if (hg == -1001) {
2563                 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2564                 return C_MASK;
2565         }
2566
2567         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2568             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2569                 int f = (hg == -100) || abs(hg) == 2;
2570                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2571                 if (f)
2572                         hg = hg*2;
2573                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2574                      hg > 0 ? "source" : "target");
2575         }
2576
2577         if (abs(hg) == 100)
2578                 drbd_khelper(mdev, "initial-split-brain");
2579
2580         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2581                 int pcount = (mdev->state.role == R_PRIMARY)
2582                            + (peer_role == R_PRIMARY);
2583                 int forced = (hg == -100);
2584
2585                 switch (pcount) {
2586                 case 0:
2587                         hg = drbd_asb_recover_0p(mdev);
2588                         break;
2589                 case 1:
2590                         hg = drbd_asb_recover_1p(mdev);
2591                         break;
2592                 case 2:
2593                         hg = drbd_asb_recover_2p(mdev);
2594                         break;
2595                 }
2596                 if (abs(hg) < 100) {
2597                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2598                              "automatically solved. Sync from %s node\n",
2599                              pcount, (hg < 0) ? "peer" : "this");
2600                         if (forced) {
2601                                 dev_warn(DEV, "Doing a full sync, since"
2602                                      " UUIDs where ambiguous.\n");
2603                                 hg = hg*2;
2604                         }
2605                 }
2606         }
2607
2608         if (hg == -100) {
2609                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2610                         hg = -1;
2611                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2612                         hg = 1;
2613
2614                 if (abs(hg) < 100)
2615                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2616                              "Sync from %s node\n",
2617                              (hg < 0) ? "peer" : "this");
2618         }
2619
2620         if (hg == -100) {
2621                 /* FIXME this log message is not correct if we end up here
2622                  * after an attempted attach on a diskless node.
2623                  * We just refuse to attach -- well, we drop the "connection"
2624                  * to that disk, in a way... */
2625                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2626                 drbd_khelper(mdev, "split-brain");
2627                 return C_MASK;
2628         }
2629
2630         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2631                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2632                 return C_MASK;
2633         }
2634
2635         if (hg < 0 && /* by intention we do not use mydisk here. */
2636             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2637                 switch (mdev->net_conf->rr_conflict) {
2638                 case ASB_CALL_HELPER:
2639                         drbd_khelper(mdev, "pri-lost");
2640                         /* fall through */
2641                 case ASB_DISCONNECT:
2642                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2643                         return C_MASK;
2644                 case ASB_VIOLENTLY:
2645                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2646                              "assumption\n");
2647                 }
2648         }
2649
2650         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2651                 if (hg == 0)
2652                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2653                 else
2654                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2655                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2656                                  abs(hg) >= 2 ? "full" : "bit-map based");
2657                 return C_MASK;
2658         }
2659
2660         if (abs(hg) >= 2) {
2661                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2662                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2663                         return C_MASK;
2664         }
2665
2666         if (hg > 0) { /* become sync source. */
2667                 rv = C_WF_BITMAP_S;
2668         } else if (hg < 0) { /* become sync target */
2669                 rv = C_WF_BITMAP_T;
2670         } else {
2671                 rv = C_CONNECTED;
2672                 if (drbd_bm_total_weight(mdev)) {
2673                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2674                              drbd_bm_total_weight(mdev));
2675                 }
2676         }
2677
2678         return rv;
2679 }
2680
2681 /* returns 1 if invalid */
2682 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2683 {
2684         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2685         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2686             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2687                 return 0;
2688
2689         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2690         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2691             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2692                 return 1;
2693
2694         /* everything else is valid if they are equal on both sides. */
2695         if (peer == self)
2696                 return 0;
2697
2698         /* everything es is invalid. */
2699         return 1;
2700 }
2701
2702 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2703 {
2704         struct p_protocol *p = (struct p_protocol *)h;
2705         int header_size, data_size;
2706         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2707         int p_want_lose, p_two_primaries, cf;
2708         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2709
2710         header_size = sizeof(*p) - sizeof(*h);
2711         data_size   = h->length  - header_size;
2712
2713         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2714                 return FALSE;
2715
2716         p_proto         = be32_to_cpu(p->protocol);
2717         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2718         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2719         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2720         p_two_primaries = be32_to_cpu(p->two_primaries);
2721         cf              = be32_to_cpu(p->conn_flags);
2722         p_want_lose = cf & CF_WANT_LOSE;
2723
2724         clear_bit(CONN_DRY_RUN, &mdev->flags);
2725
2726         if (cf & CF_DRY_RUN)
2727                 set_bit(CONN_DRY_RUN, &mdev->flags);
2728
2729         if (p_proto != mdev->net_conf->wire_protocol) {
2730                 dev_err(DEV, "incompatible communication protocols\n");
2731                 goto disconnect;
2732         }
2733
2734         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2735                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2736                 goto disconnect;
2737         }
2738
2739         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2740                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2741                 goto disconnect;
2742         }
2743
2744         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2745                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2746                 goto disconnect;
2747         }
2748
2749         if (p_want_lose && mdev->net_conf->want_lose) {
2750                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2751                 goto disconnect;
2752         }
2753
2754         if (p_two_primaries != mdev->net_conf->two_primaries) {
2755                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2756                 goto disconnect;
2757         }
2758
2759         if (mdev->agreed_pro_version >= 87) {
2760                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2761
2762                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2763                         return FALSE;
2764
2765                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2766                 if (strcmp(p_integrity_alg, my_alg)) {
2767                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2768                         goto disconnect;
2769                 }
2770                 dev_info(DEV, "data-integrity-alg: %s\n",
2771                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2772         }
2773
2774         return TRUE;
2775
2776 disconnect:
2777         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2778         return FALSE;
2779 }
2780
2781 /* helper function
2782  * input: alg name, feature name
2783  * return: NULL (alg name was "")
2784  *         ERR_PTR(error) if something goes wrong
2785  *         or the crypto hash ptr, if it worked out ok. */
2786 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2787                 const char *alg, const char *name)
2788 {
2789         struct crypto_hash *tfm;
2790
2791         if (!alg[0])
2792                 return NULL;
2793
2794         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2795         if (IS_ERR(tfm)) {
2796                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2797                         alg, name, PTR_ERR(tfm));
2798                 return tfm;
2799         }
2800         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2801                 crypto_free_hash(tfm);
2802                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2803                 return ERR_PTR(-EINVAL);
2804         }
2805         return tfm;
2806 }
2807
2808 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2809 {
2810         int ok = TRUE;
2811         struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2812         unsigned int header_size, data_size, exp_max_sz;
2813         struct crypto_hash *verify_tfm = NULL;
2814         struct crypto_hash *csums_tfm = NULL;
2815         const int apv = mdev->agreed_pro_version;
2816
2817         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2818                     : apv == 88 ? sizeof(struct p_rs_param)
2819                                         + SHARED_SECRET_MAX
2820                     : /* 89 */    sizeof(struct p_rs_param_89);
2821
2822         if (h->length > exp_max_sz) {
2823                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2824                     h->length, exp_max_sz);
2825                 return FALSE;
2826         }
2827
2828         if (apv <= 88) {
2829                 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2830                 data_size   = h->length  - header_size;
2831         } else /* apv >= 89 */ {
2832                 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2833                 data_size   = h->length  - header_size;
2834                 D_ASSERT(data_size == 0);
2835         }
2836
2837         /* initialize verify_alg and csums_alg */
2838         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2839
2840         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2841                 return FALSE;
2842
2843         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2844
2845         if (apv >= 88) {
2846                 if (apv == 88) {
2847                         if (data_size > SHARED_SECRET_MAX) {
2848                                 dev_err(DEV, "verify-alg too long, "
2849                                     "peer wants %u, accepting only %u byte\n",
2850                                                 data_size, SHARED_SECRET_MAX);
2851                                 return FALSE;
2852                         }
2853
2854                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2855                                 return FALSE;
2856
2857                         /* we expect NUL terminated string */
2858                         /* but just in case someone tries to be evil */
2859                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2860                         p->verify_alg[data_size-1] = 0;
2861
2862                 } else /* apv >= 89 */ {
2863                         /* we still expect NUL terminated strings */
2864                         /* but just in case someone tries to be evil */
2865                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2866                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2867                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2868                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2869                 }
2870
2871                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2872                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2873                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2874                                     mdev->sync_conf.verify_alg, p->verify_alg);
2875                                 goto disconnect;
2876                         }
2877                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2878                                         p->verify_alg, "verify-alg");
2879                         if (IS_ERR(verify_tfm)) {
2880                                 verify_tfm = NULL;
2881                                 goto disconnect;
2882                         }
2883                 }
2884
2885                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2886                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2887                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2888                                     mdev->sync_conf.csums_alg, p->csums_alg);
2889                                 goto disconnect;
2890                         }
2891                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2892                                         p->csums_alg, "csums-alg");
2893                         if (IS_ERR(csums_tfm)) {
2894                                 csums_tfm = NULL;
2895                                 goto disconnect;
2896                         }
2897                 }
2898
2899
2900                 spin_lock(&mdev->peer_seq_lock);
2901                 /* lock against drbd_nl_syncer_conf() */
2902                 if (verify_tfm) {
2903                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2904                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2905                         crypto_free_hash(mdev->verify_tfm);
2906                         mdev->verify_tfm = verify_tfm;
2907                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2908                 }
2909                 if (csums_tfm) {
2910                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2911                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2912                         crypto_free_hash(mdev->csums_tfm);
2913                         mdev->csums_tfm = csums_tfm;
2914                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2915                 }
2916                 spin_unlock(&mdev->peer_seq_lock);
2917         }
2918
2919         return ok;
2920 disconnect:
2921         /* just for completeness: actually not needed,
2922          * as this is not reached if csums_tfm was ok. */
2923         crypto_free_hash(csums_tfm);
2924         /* but free the verify_tfm again, if csums_tfm did not work out */
2925         crypto_free_hash(verify_tfm);
2926         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2927         return FALSE;
2928 }
2929
2930 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2931 {
2932         /* sorry, we currently have no working implementation
2933          * of distributed TCQ */
2934 }
2935
2936 /* warn if the arguments differ by more than 12.5% */
2937 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2938         const char *s, sector_t a, sector_t b)
2939 {
2940         sector_t d;
2941         if (a == 0 || b == 0)
2942                 return;
2943         d = (a > b) ? (a - b) : (b - a);
2944         if (d > (a>>3) || d > (b>>3))
2945                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2946                      (unsigned long long)a, (unsigned long long)b);
2947 }
2948
2949 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2950 {
2951         struct p_sizes *p = (struct p_sizes *)h;
2952         enum determine_dev_size dd = unchanged;
2953         unsigned int max_seg_s;
2954         sector_t p_size, p_usize, my_usize;
2955         int ldsc = 0; /* local disk size changed */
2956         enum dds_flags ddsf;
2957
2958         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2959         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2960                 return FALSE;
2961
2962         p_size = be64_to_cpu(p->d_size);
2963         p_usize = be64_to_cpu(p->u_size);
2964
2965         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2966                 dev_err(DEV, "some backing storage is needed\n");
2967                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968                 return FALSE;
2969         }
2970
2971         /* just store the peer's disk size for now.
2972          * we still need to figure out whether we accept that. */
2973         mdev->p_size = p_size;
2974
2975 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2976         if (get_ldev(mdev)) {
2977                 warn_if_differ_considerably(mdev, "lower level device sizes",
2978                            p_size, drbd_get_max_capacity(mdev->ldev));
2979                 warn_if_differ_considerably(mdev, "user requested size",
2980                                             p_usize, mdev->ldev->dc.disk_size);
2981
2982                 /* if this is the first connect, or an otherwise expected
2983                  * param exchange, choose the minimum */
2984                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2985                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2986                                              p_usize);
2987
2988                 my_usize = mdev->ldev->dc.disk_size;
2989
2990                 if (mdev->ldev->dc.disk_size != p_usize) {
2991                         mdev->ldev->dc.disk_size = p_usize;
2992                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2993                              (unsigned long)mdev->ldev->dc.disk_size);
2994                 }
2995
2996                 /* Never shrink a device with usable data during connect.
2997                    But allow online shrinking if we are connected. */
2998                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2999                    drbd_get_capacity(mdev->this_bdev) &&
3000                    mdev->state.disk >= D_OUTDATED &&
3001                    mdev->state.conn < C_CONNECTED) {
3002                         dev_err(DEV, "The peer's disk size is too small!\n");
3003                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3004                         mdev->ldev->dc.disk_size = my_usize;
3005                         put_ldev(mdev);
3006                         return FALSE;
3007                 }
3008                 put_ldev(mdev);
3009         }
3010 #undef min_not_zero
3011
3012         ddsf = be16_to_cpu(p->dds_flags);
3013         if (get_ldev(mdev)) {
3014                 dd = drbd_determin_dev_size(mdev, ddsf);
3015                 put_ldev(mdev);
3016                 if (dd == dev_size_error)
3017                         return FALSE;
3018                 drbd_md_sync(mdev);
3019         } else {
3020                 /* I am diskless, need to accept the peer's size. */
3021                 drbd_set_my_capacity(mdev, p_size);
3022         }
3023
3024         if (get_ldev(mdev)) {
3025                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3026                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3027                         ldsc = 1;
3028                 }
3029
3030                 if (mdev->agreed_pro_version < 94)
3031                         max_seg_s = be32_to_cpu(p->max_segment_size);
3032                 else /* drbd 8.3.8 onwards */
3033                         max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3034
3035                 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3036                         drbd_setup_queue_param(mdev, max_seg_s);
3037
3038                 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3039                 put_ldev(mdev);
3040         }
3041
3042         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043                 if (be64_to_cpu(p->c_size) !=
3044                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045                         /* we have different sizes, probably peer
3046                          * needs to know my new size... */
3047                         drbd_send_sizes(mdev, 0, ddsf);
3048                 }
3049                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3052                             mdev->state.disk >= D_INCONSISTENT) {
3053                                 if (ddsf & DDSF_NO_RESYNC)
3054                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055                                 else
3056                                         resync_after_online_grow(mdev);
3057                         } else
3058                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059                 }
3060         }
3061
3062         return TRUE;
3063 }
3064
3065 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3066 {
3067         struct p_uuids *p = (struct p_uuids *)h;
3068         u64 *p_uuid;
3069         int i;
3070
3071         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3072         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073                 return FALSE;
3074
3075         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3076
3077         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3078                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3079
3080         kfree(mdev->p_uuid);
3081         mdev->p_uuid = p_uuid;
3082
3083         if (mdev->state.conn < C_CONNECTED &&
3084             mdev->state.disk < D_INCONSISTENT &&
3085             mdev->state.role == R_PRIMARY &&
3086             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3087                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3088                     (unsigned long long)mdev->ed_uuid);
3089                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3090                 return FALSE;
3091         }
3092
3093         if (get_ldev(mdev)) {
3094                 int skip_initial_sync =
3095                         mdev->state.conn == C_CONNECTED &&
3096                         mdev->agreed_pro_version >= 90 &&
3097                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3098                         (p_uuid[UI_FLAGS] & 8);
3099                 if (skip_initial_sync) {
3100                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3101                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3102                                         "clear_n_write from receive_uuids");
3103                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3104                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3105                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3106                                         CS_VERBOSE, NULL);
3107                         drbd_md_sync(mdev);
3108                 }
3109                 put_ldev(mdev);
3110         }
3111
3112         /* Before we test for the disk state, we should wait until an eventually
3113            ongoing cluster wide state change is finished. That is important if
3114            we are primary and are detaching from our disk. We need to see the
3115            new disk state... */
3116         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3117         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3118                 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3119
3120         return TRUE;
3121 }
3122
3123 /**
3124  * convert_state() - Converts the peer's view of the cluster state to our point of view
3125  * @ps:         The state as seen by the peer.
3126  */
3127 static union drbd_state convert_state(union drbd_state ps)
3128 {
3129         union drbd_state ms;
3130
3131         static enum drbd_conns c_tab[] = {
3132                 [C_CONNECTED] = C_CONNECTED,
3133
3134                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3135                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3136                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3137                 [C_VERIFY_S]       = C_VERIFY_T,
3138                 [C_MASK]   = C_MASK,
3139         };
3140
3141         ms.i = ps.i;
3142
3143         ms.conn = c_tab[ps.conn];
3144         ms.peer = ps.role;
3145         ms.role = ps.peer;
3146         ms.pdsk = ps.disk;
3147         ms.disk = ps.pdsk;
3148         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3149
3150         return ms;
3151 }
3152
3153 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3154 {
3155         struct p_req_state *p = (struct p_req_state *)h;
3156         union drbd_state mask, val;
3157         int rv;
3158
3159         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3160         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3161                 return FALSE;
3162
3163         mask.i = be32_to_cpu(p->mask);
3164         val.i = be32_to_cpu(p->val);
3165
3166         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3167             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3168                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3169                 return TRUE;
3170         }
3171
3172         mask = convert_state(mask);
3173         val = convert_state(val);
3174
3175         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3176
3177         drbd_send_sr_reply(mdev, rv);
3178         drbd_md_sync(mdev);
3179
3180         return TRUE;
3181 }
3182
3183 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3184 {
3185         struct p_state *p = (struct p_state *)h;
3186         enum drbd_conns nconn, oconn;
3187         union drbd_state ns, peer_state;
3188         enum drbd_disk_state real_peer_disk;
3189         int rv;
3190
3191         ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3192                 return FALSE;
3193
3194         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3195                 return FALSE;
3196
3197         peer_state.i = be32_to_cpu(p->state);
3198
3199         real_peer_disk = peer_state.disk;
3200         if (peer_state.disk == D_NEGOTIATING) {
3201                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3202                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3203         }
3204
3205         spin_lock_irq(&mdev->req_lock);
3206  retry:
3207         oconn = nconn = mdev->state.conn;
3208         spin_unlock_irq(&mdev->req_lock);
3209
3210         if (nconn == C_WF_REPORT_PARAMS)
3211                 nconn = C_CONNECTED;
3212
3213         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3214             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3215                 int cr; /* consider resync */
3216
3217                 /* if we established a new connection */
3218                 cr  = (oconn < C_CONNECTED);
3219                 /* if we had an established connection
3220                  * and one of the nodes newly attaches a disk */
3221                 cr |= (oconn == C_CONNECTED &&
3222                        (peer_state.disk == D_NEGOTIATING ||
3223                         mdev->state.disk == D_NEGOTIATING));
3224                 /* if we have both been inconsistent, and the peer has been
3225                  * forced to be UpToDate with --overwrite-data */
3226                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3227                 /* if we had been plain connected, and the admin requested to
3228                  * start a sync by "invalidate" or "invalidate-remote" */
3229                 cr |= (oconn == C_CONNECTED &&
3230                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3231                                  peer_state.conn <= C_WF_BITMAP_T));
3232
3233                 if (cr)
3234                         nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3235
3236                 put_ldev(mdev);
3237                 if (nconn == C_MASK) {
3238                         nconn = C_CONNECTED;
3239                         if (mdev->state.disk == D_NEGOTIATING) {
3240                                 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3241                         } else if (peer_state.disk == D_NEGOTIATING) {
3242                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3243                                 peer_state.disk = D_DISKLESS;
3244                                 real_peer_disk = D_DISKLESS;
3245                         } else {
3246                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3247                                         return FALSE;
3248                                 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3249                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3250                                 return FALSE;
3251                         }
3252                 }
3253         }
3254
3255         spin_lock_irq(&mdev->req_lock);
3256         if (mdev->state.conn != oconn)
3257                 goto retry;
3258         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3259         ns.i = mdev->state.i;
3260         ns.conn = nconn;
3261         ns.peer = peer_state.role;
3262         ns.pdsk = real_peer_disk;
3263         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3264         if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3265                 ns.disk = mdev->new_state_tmp.disk;
3266
3267         rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3268         ns = mdev->state;
3269         spin_unlock_irq(&mdev->req_lock);
3270
3271         if (rv < SS_SUCCESS) {
3272                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3273                 return FALSE;
3274         }
3275
3276         if (oconn > C_WF_REPORT_PARAMS) {
3277                 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3278                     peer_state.disk != D_NEGOTIATING ) {
3279                         /* we want resync, peer has not yet decided to sync... */
3280                         /* Nowadays only used when forcing a node into primary role and
3281                            setting its disk to UpToDate with that */
3282                         drbd_send_uuids(mdev);
3283                         drbd_send_state(mdev);
3284                 }
3285         }
3286
3287         mdev->net_conf->want_lose = 0;
3288
3289         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3290
3291         return TRUE;
3292 }
3293
3294 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3295 {
3296         struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3297
3298         wait_event(mdev->misc_wait,
3299                    mdev->state.conn == C_WF_SYNC_UUID ||
3300                    mdev->state.conn < C_CONNECTED ||
3301                    mdev->state.disk < D_NEGOTIATING);
3302
3303         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3304
3305         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3306         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3307                 return FALSE;
3308
3309         /* Here the _drbd_uuid_ functions are right, current should
3310            _not_ be rotated into the history */
3311         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3312                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3313                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3314
3315                 drbd_start_resync(mdev, C_SYNC_TARGET);
3316
3317                 put_ldev(mdev);
3318         } else
3319                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3320
3321         return TRUE;
3322 }
3323
3324 enum receive_bitmap_ret { OK, DONE, FAILED };
3325
3326 static enum receive_bitmap_ret
3327 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3328         unsigned long *buffer, struct bm_xfer_ctx *c)
3329 {
3330         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3331         unsigned want = num_words * sizeof(long);
3332
3333         if (want != h->length) {
3334                 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3335                 return FAILED;
3336         }
3337         if (want == 0)
3338                 return DONE;
3339         if (drbd_recv(mdev, buffer, want) != want)
3340                 return FAILED;
3341
3342         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3343
3344         c->word_offset += num_words;
3345         c->bit_offset = c->word_offset * BITS_PER_LONG;
3346         if (c->bit_offset > c->bm_bits)
3347                 c->bit_offset = c->bm_bits;
3348
3349         return OK;
3350 }
3351
3352 static enum receive_bitmap_ret
3353 recv_bm_rle_bits(struct drbd_conf *mdev,
3354                 struct p_compressed_bm *p,
3355                 struct bm_xfer_ctx *c)
3356 {
3357         struct bitstream bs;
3358         u64 look_ahead;
3359         u64 rl;
3360         u64 tmp;
3361         unsigned long s = c->bit_offset;
3362         unsigned long e;
3363         int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3364         int toggle = DCBP_get_start(p);
3365         int have;
3366         int bits;
3367
3368         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3369
3370         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3371         if (bits < 0)
3372                 return FAILED;
3373
3374         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3375                 bits = vli_decode_bits(&rl, look_ahead);
3376                 if (bits <= 0)
3377                         return FAILED;
3378
3379                 if (toggle) {
3380                         e = s + rl -1;
3381                         if (e >= c->bm_bits) {
3382                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3383                                 return FAILED;
3384                         }
3385                         _drbd_bm_set_bits(mdev, s, e);
3386                 }
3387
3388                 if (have < bits) {
3389                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3390                                 have, bits, look_ahead,
3391                                 (unsigned int)(bs.cur.b - p->code),
3392                                 (unsigned int)bs.buf_len);
3393                         return FAILED;
3394                 }
3395                 look_ahead >>= bits;
3396                 have -= bits;
3397
3398                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3399                 if (bits < 0)
3400                         return FAILED;
3401                 look_ahead |= tmp << have;
3402                 have += bits;
3403         }
3404
3405         c->bit_offset = s;
3406         bm_xfer_ctx_bit_to_word_offset(c);
3407
3408         return (s == c->bm_bits) ? DONE : OK;
3409 }
3410
3411 static enum receive_bitmap_ret
3412 decode_bitmap_c(struct drbd_conf *mdev,
3413                 struct p_compressed_bm *p,
3414                 struct bm_xfer_ctx *c)
3415 {
3416         if (DCBP_get_code(p) == RLE_VLI_Bits)
3417                 return recv_bm_rle_bits(mdev, p, c);
3418
3419         /* other variants had been implemented for evaluation,
3420          * but have been dropped as this one turned out to be "best"
3421          * during all our tests. */
3422
3423         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3424         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3425         return FAILED;
3426 }
3427
3428 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3429                 const char *direction, struct bm_xfer_ctx *c)
3430 {
3431         /* what would it take to transfer it "plaintext" */
3432         unsigned plain = sizeof(struct p_header) *
3433                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3434                 + c->bm_words * sizeof(long);
3435         unsigned total = c->bytes[0] + c->bytes[1];
3436         unsigned r;
3437
3438         /* total can not be zero. but just in case: */
3439         if (total == 0)
3440                 return;
3441
3442         /* don't report if not compressed */
3443         if (total >= plain)
3444                 return;
3445
3446         /* total < plain. check for overflow, still */
3447         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3448                                     : (1000 * total / plain);
3449
3450         if (r > 1000)
3451                 r = 1000;
3452
3453         r = 1000 - r;
3454         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3455              "total %u; compression: %u.%u%%\n",
3456                         direction,
3457                         c->bytes[1], c->packets[1],
3458                         c->bytes[0], c->packets[0],
3459                         total, r/10, r % 10);
3460 }
3461
3462 /* Since we are processing the bitfield from lower addresses to higher,
3463    it does not matter if the process it in 32 bit chunks or 64 bit
3464    chunks as long as it is little endian. (Understand it as byte stream,
3465    beginning with the lowest byte...) If we would use big endian
3466    we would need to process it from the highest address to the lowest,
3467    in order to be agnostic to the 32 vs 64 bits issue.
3468
3469    returns 0 on failure, 1 if we successfully received it. */
3470 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3471 {
3472         struct bm_xfer_ctx c;
3473         void *buffer;
3474         enum receive_bitmap_ret ret;
3475         int ok = FALSE;
3476
3477         wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3478
3479         drbd_bm_lock(mdev, "receive bitmap");
3480
3481         /* maybe we should use some per thread scratch page,
3482          * and allocate that during initial device creation? */
3483         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3484         if (!buffer) {
3485                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3486                 goto out;
3487         }
3488
3489         c = (struct bm_xfer_ctx) {
3490                 .bm_bits = drbd_bm_bits(mdev),
3491                 .bm_words = drbd_bm_words(mdev),
3492         };
3493
3494         do {
3495                 if (h->command == P_BITMAP) {
3496                         ret = receive_bitmap_plain(mdev, h, buffer, &c);
3497                 } else if (h->command == P_COMPRESSED_BITMAP) {
3498                         /* MAYBE: sanity check that we speak proto >= 90,
3499                          * and the feature is enabled! */
3500                         struct p_compressed_bm *p;
3501
3502                         if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3503                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3504                                 goto out;
3505                         }
3506                         /* use the page buff */
3507                         p = buffer;
3508                         memcpy(p, h, sizeof(*h));
3509                         if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3510                                 goto out;
3511                         if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3512                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3513                                 return FAILED;
3514                         }
3515                         ret = decode_bitmap_c(mdev, p, &c);
3516                 } else {
3517                         dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3518                         goto out;
3519                 }
3520
3521                 c.packets[h->command == P_BITMAP]++;
3522                 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3523
3524                 if (ret != OK)
3525                         break;
3526
3527                 if (!drbd_recv_header(mdev, h))
3528                         goto out;
3529         } while (ret == OK);
3530         if (ret == FAILED)
3531                 goto out;
3532
3533         INFO_bm_xfer_stats(mdev, "receive", &c);
3534
3535         if (mdev->state.conn == C_WF_BITMAP_T) {
3536                 ok = !drbd_send_bitmap(mdev);
3537                 if (!ok)
3538                         goto out;
3539                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3540                 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3541                 D_ASSERT(ok == SS_SUCCESS);
3542         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3543                 /* admin may have requested C_DISCONNECTING,
3544                  * other threads may have noticed network errors */
3545                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3546                     drbd_conn_str(mdev->state.conn));
3547         }
3548
3549         ok = TRUE;
3550  out:
3551         drbd_bm_unlock(mdev);
3552         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3553                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3554         free_page((unsigned long) buffer);
3555         return ok;
3556 }
3557
3558 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3559 {
3560         /* TODO zero copy sink :) */
3561         static char sink[128];
3562         int size, want, r;
3563
3564         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3565              h->command, h->length);
3566
3567         size = h->length;
3568         while (size > 0) {
3569                 want = min_t(int, size, sizeof(sink));
3570                 r = drbd_recv(mdev, sink, want);
3571                 ERR_IF(r <= 0) break;
3572                 size -= r;
3573         }
3574         return size == 0;
3575 }
3576
3577 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3578 {
3579         if (mdev->state.disk >= D_INCONSISTENT)
3580                 drbd_kick_lo(mdev);
3581
3582         /* Make sure we've acked all the TCP data associated
3583          * with the data requests being unplugged */
3584         drbd_tcp_quickack(mdev->data.socket);
3585
3586         return TRUE;
3587 }
3588
3589 static void timeval_sub_us(struct timeval* tv, unsigned int us)
3590 {
3591         tv->tv_sec -= us / 1000000;
3592         us = us % 1000000;
3593         if (tv->tv_usec > us) {
3594                 tv->tv_usec += 1000000;
3595                 tv->tv_sec--;
3596         }
3597         tv->tv_usec -= us;
3598 }
3599
3600 static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3601 {
3602         struct delay_probe *dp;
3603         struct list_head *le;
3604         struct timeval now;
3605         int seq_num;
3606         int offset;
3607         int data_delay;
3608
3609         seq_num = be32_to_cpu(p->seq_num);
3610         offset  = be32_to_cpu(p->offset);
3611
3612         spin_lock(&mdev->peer_seq_lock);
3613         if (!list_empty(&mdev->delay_probes)) {
3614                 if (from == USE_DATA_SOCKET)
3615                         le = mdev->delay_probes.next;
3616                 else
3617                         le = mdev->delay_probes.prev;
3618
3619                 dp = list_entry(le, struct delay_probe, list);
3620
3621                 if (dp->seq_num == seq_num) {
3622                         list_del(le);
3623                         spin_unlock(&mdev->peer_seq_lock);
3624                         do_gettimeofday(&now);
3625                         timeval_sub_us(&now, offset);
3626                         data_delay =
3627                                 now.tv_usec - dp->time.tv_usec +
3628                                 (now.tv_sec - dp->time.tv_sec) * 1000000;
3629
3630                         if (data_delay > 0)
3631                                 mdev->data_delay = data_delay;
3632
3633                         kfree(dp);
3634                         return;
3635                 }
3636
3637                 if (dp->seq_num > seq_num) {
3638                         spin_unlock(&mdev->peer_seq_lock);
3639                         dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3640                         return; /* Do not alloca a struct delay_probe.... */
3641                 }
3642         }
3643         spin_unlock(&mdev->peer_seq_lock);
3644
3645         dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3646         if (!dp) {
3647                 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3648                 return;
3649         }
3650
3651         dp->seq_num = seq_num;
3652         do_gettimeofday(&dp->time);
3653         timeval_sub_us(&dp->time, offset);
3654
3655         spin_lock(&mdev->peer_seq_lock);
3656         if (from == USE_DATA_SOCKET)
3657                 list_add(&dp->list, &mdev->delay_probes);
3658         else
3659                 list_add_tail(&dp->list, &mdev->delay_probes);
3660         spin_unlock(&mdev->peer_seq_lock);
3661 }
3662
3663 static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3664 {
3665         struct p_delay_probe *p = (struct p_delay_probe *)h;
3666
3667         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3668         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3669                 return FALSE;
3670
3671         got_delay_probe(mdev, USE_DATA_SOCKET, p);
3672         return TRUE;
3673 }
3674
3675 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3676
3677 static drbd_cmd_handler_f drbd_default_handler[] = {
3678         [P_DATA]            = receive_Data,
3679         [P_DATA_REPLY]      = receive_DataReply,
3680         [P_RS_DATA_REPLY]   = receive_RSDataReply,
3681         [P_BARRIER]         = receive_Barrier,
3682         [P_BITMAP]          = receive_bitmap,
3683         [P_COMPRESSED_BITMAP]    = receive_bitmap,
3684         [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3685         [P_DATA_REQUEST]    = receive_DataRequest,
3686         [P_RS_DATA_REQUEST] = receive_DataRequest,
3687         [P_SYNC_PARAM]      = receive_SyncParam,
3688         [P_SYNC_PARAM89]           = receive_SyncParam,
3689         [P_PROTOCOL]        = receive_protocol,
3690         [P_UUIDS]           = receive_uuids,
3691         [P_SIZES]           = receive_sizes,
3692         [P_STATE]           = receive_state,
3693         [P_STATE_CHG_REQ]   = receive_req_state,
3694         [P_SYNC_UUID]       = receive_sync_uuid,
3695         [P_OV_REQUEST]      = receive_DataRequest,
3696         [P_OV_REPLY]        = receive_DataRequest,
3697         [P_CSUM_RS_REQUEST]    = receive_DataRequest,
3698         [P_DELAY_PROBE]     = receive_delay_probe,
3699         /* anything missing from this table is in
3700          * the asender_tbl, see get_asender_cmd */
3701         [P_MAX_CMD]         = NULL,
3702 };
3703
3704 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3705 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3706
3707 static void drbdd(struct drbd_conf *mdev)
3708 {
3709         drbd_cmd_handler_f handler;
3710         struct p_header *header = &mdev->data.rbuf.header;
3711
3712         while (get_t_state(&mdev->receiver) == Running) {
3713                 drbd_thread_current_set_cpu(mdev);
3714                 if (!drbd_recv_header(mdev, header)) {
3715                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3716                         break;
3717                 }
3718
3719                 if (header->command < P_MAX_CMD)
3720                         handler = drbd_cmd_handler[header->command];
3721                 else if (P_MAY_IGNORE < header->command
3722                      && header->command < P_MAX_OPT_CMD)
3723                         handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3724                 else if (header->command > P_MAX_OPT_CMD)
3725                         handler = receive_skip;
3726                 else
3727                         handler = NULL;
3728
3729                 if (unlikely(!handler)) {
3730                         dev_err(DEV, "unknown packet type %d, l: %d!\n",
3731                             header->command, header->length);
3732                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3733                         break;
3734                 }
3735                 if (unlikely(!handler(mdev, header))) {
3736                         dev_err(DEV, "error receiving %s, l: %d!\n",
3737                             cmdname(header->command), header->length);
3738                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3739                         break;
3740                 }
3741         }
3742 }
3743
3744 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3745 {
3746         struct hlist_head *slot;
3747         struct hlist_node *pos;
3748         struct hlist_node *tmp;
3749         struct drbd_request *req;
3750         int i;
3751
3752         /*
3753          * Application READ requests
3754          */
3755         spin_lock_irq(&mdev->req_lock);
3756         for (i = 0; i < APP_R_HSIZE; i++) {
3757                 slot = mdev->app_reads_hash+i;
3758                 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3759                         /* it may (but should not any longer!)
3760                          * be on the work queue; if that assert triggers,
3761                          * we need to also grab the
3762                          * spin_lock_irq(&mdev->data.work.q_lock);
3763                          * and list_del_init here. */
3764                         D_ASSERT(list_empty(&req->w.list));
3765                         /* It would be nice to complete outside of spinlock.
3766                          * But this is easier for now. */
3767                         _req_mod(req, connection_lost_while_pending);
3768                 }
3769         }
3770         for (i = 0; i < APP_R_HSIZE; i++)
3771                 if (!hlist_empty(mdev->app_reads_hash+i))
3772                         dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3773                                 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3774
3775         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3776         spin_unlock_irq(&mdev->req_lock);
3777 }
3778
3779 void drbd_flush_workqueue(struct drbd_conf *mdev)
3780 {
3781         struct drbd_wq_barrier barr;
3782
3783         barr.w.cb = w_prev_work_done;
3784         init_completion(&barr.done);
3785         drbd_queue_work(&mdev->data.work, &barr.w);
3786         wait_for_completion(&barr.done);
3787 }
3788
3789 static void drbd_disconnect(struct drbd_conf *mdev)
3790 {
3791         enum drbd_fencing_p fp;
3792         union drbd_state os, ns;
3793         int rv = SS_UNKNOWN_ERROR;
3794         unsigned int i;
3795
3796         if (mdev->state.conn == C_STANDALONE)
3797                 return;
3798         if (mdev->state.conn >= C_WF_CONNECTION)
3799                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3800                                 drbd_conn_str(mdev->state.conn));
3801
3802         /* asender does not clean up anything. it must not interfere, either */
3803         drbd_thread_stop(&mdev->asender);
3804         drbd_free_sock(mdev);
3805
3806         spin_lock_irq(&mdev->req_lock);
3807         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3808         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3809         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3810         spin_unlock_irq(&mdev->req_lock);
3811
3812         /* We do not have data structures that would allow us to
3813          * get the rs_pending_cnt down to 0 again.
3814          *  * On C_SYNC_TARGET we do not have any data structures describing
3815          *    the pending RSDataRequest's we have sent.
3816          *  * On C_SYNC_SOURCE there is no data structure that tracks
3817          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3818          *  And no, it is not the sum of the reference counts in the
3819          *  resync_LRU. The resync_LRU tracks the whole operation including
3820          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3821          *  on the fly. */
3822         drbd_rs_cancel_all(mdev);
3823         mdev->rs_total = 0;
3824         mdev->rs_failed = 0;
3825         atomic_set(&mdev->rs_pending_cnt, 0);
3826         wake_up(&mdev->misc_wait);
3827
3828         /* make sure syncer is stopped and w_resume_next_sg queued */
3829         del_timer_sync(&mdev->resync_timer);
3830         set_bit(STOP_SYNC_TIMER, &mdev->flags);
3831         resync_timer_fn((unsigned long)mdev);
3832
3833         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3834          * w_make_resync_request etc. which may still be on the worker queue
3835          * to be "canceled" */
3836         drbd_flush_workqueue(mdev);
3837
3838         /* This also does reclaim_net_ee().  If we do this too early, we might
3839          * miss some resync ee and pages.*/
3840         drbd_process_done_ee(mdev);
3841
3842         kfree(mdev->p_uuid);
3843         mdev->p_uuid = NULL;
3844
3845         if (!mdev->state.susp)
3846                 tl_clear(mdev);
3847
3848         drbd_fail_pending_reads(mdev);
3849
3850         dev_info(DEV, "Connection closed\n");
3851
3852         drbd_md_sync(mdev);
3853
3854         fp = FP_DONT_CARE;
3855         if (get_ldev(mdev)) {
3856                 fp = mdev->ldev->dc.fencing;
3857                 put_ldev(mdev);
3858         }
3859
3860         if (mdev->state.role == R_PRIMARY) {
3861                 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3862                         enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3863                         drbd_request_state(mdev, NS(pdsk, nps));
3864                 }
3865         }
3866
3867         spin_lock_irq(&mdev->req_lock);
3868         os = mdev->state;
3869         if (os.conn >= C_UNCONNECTED) {
3870                 /* Do not restart in case we are C_DISCONNECTING */
3871                 ns = os;
3872                 ns.conn = C_UNCONNECTED;
3873                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3874         }
3875         spin_unlock_irq(&mdev->req_lock);
3876
3877         if (os.conn == C_DISCONNECTING) {
3878                 struct hlist_head *h;
3879                 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3880
3881                 /* we must not free the tl_hash
3882                  * while application io is still on the fly */
3883                 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3884
3885                 spin_lock_irq(&mdev->req_lock);
3886                 /* paranoia code */
3887                 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3888                         if (h->first)
3889                                 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3890                                                 (int)(h - mdev->ee_hash), h->first);
3891                 kfree(mdev->ee_hash);
3892                 mdev->ee_hash = NULL;
3893                 mdev->ee_hash_s = 0;
3894
3895                 /* paranoia code */
3896                 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3897                         if (h->first)
3898                                 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3899                                                 (int)(h - mdev->tl_hash), h->first);
3900                 kfree(mdev->tl_hash);
3901                 mdev->tl_hash = NULL;
3902                 mdev->tl_hash_s = 0;
3903                 spin_unlock_irq(&mdev->req_lock);
3904
3905                 crypto_free_hash(mdev->cram_hmac_tfm);
3906                 mdev->cram_hmac_tfm = NULL;
3907
3908                 kfree(mdev->net_conf);
3909                 mdev->net_conf = NULL;
3910                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3911         }
3912
3913         /* tcp_close and release of sendpage pages can be deferred.  I don't
3914          * want to use SO_LINGER, because apparently it can be deferred for
3915          * more than 20 seconds (longest time I checked).
3916          *
3917          * Actually we don't care for exactly when the network stack does its
3918          * put_page(), but release our reference on these pages right here.
3919          */
3920         i = drbd_release_ee(mdev, &mdev->net_ee);
3921         if (i)
3922                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3923         i = atomic_read(&mdev->pp_in_use);
3924         if (i)
3925                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3926
3927         D_ASSERT(list_empty(&mdev->read_ee));
3928         D_ASSERT(list_empty(&mdev->active_ee));
3929         D_ASSERT(list_empty(&mdev->sync_ee));
3930         D_ASSERT(list_empty(&mdev->done_ee));
3931
3932         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3933         atomic_set(&mdev->current_epoch->epoch_size, 0);
3934         D_ASSERT(list_empty(&mdev->current_epoch->list));
3935 }
3936
3937 /*
3938  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3939  * we can agree on is stored in agreed_pro_version.
3940  *
3941  * feature flags and the reserved array should be enough room for future
3942  * enhancements of the handshake protocol, and possible plugins...
3943  *
3944  * for now, they are expected to be zero, but ignored.
3945  */
3946 static int drbd_send_handshake(struct drbd_conf *mdev)
3947 {
3948         /* ASSERT current == mdev->receiver ... */
3949         struct p_handshake *p = &mdev->data.sbuf.handshake;
3950         int ok;
3951
3952         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3953                 dev_err(DEV, "interrupted during initial handshake\n");
3954                 return 0; /* interrupted. not ok. */
3955         }
3956
3957         if (mdev->data.socket == NULL) {
3958                 mutex_unlock(&mdev->data.mutex);
3959                 return 0;
3960         }
3961
3962         memset(p, 0, sizeof(*p));
3963         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3964         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3965         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3966                              (struct p_header *)p, sizeof(*p), 0 );
3967         mutex_unlock(&mdev->data.mutex);
3968         return ok;
3969 }
3970
3971 /*
3972  * return values:
3973  *   1 yes, we have a valid connection
3974  *   0 oops, did not work out, please try again
3975  *  -1 peer talks different language,
3976  *     no point in trying again, please go standalone.
3977  */
3978 static int drbd_do_handshake(struct drbd_conf *mdev)
3979 {
3980         /* ASSERT current == mdev->receiver ... */
3981         struct p_handshake *p = &mdev->data.rbuf.handshake;
3982         const int expect = sizeof(struct p_handshake)
3983                           -sizeof(struct p_header);
3984         int rv;
3985
3986         rv = drbd_send_handshake(mdev);
3987         if (!rv)
3988                 return 0;
3989
3990         rv = drbd_recv_header(mdev, &p->head);
3991         if (!rv)
3992                 return 0;
3993
3994         if (p->head.command != P_HAND_SHAKE) {
3995                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3996                      cmdname(p->head.command), p->head.command);
3997                 return -1;
3998         }
3999
4000         if (p->head.length != expect) {
4001                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4002                      expect, p->head.length);
4003                 return -1;
4004         }
4005
4006         rv = drbd_recv(mdev, &p->head.payload, expect);
4007
4008         if (rv != expect) {
4009                 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
4010                 return 0;
4011         }
4012
4013         p->protocol_min = be32_to_cpu(p->protocol_min);
4014         p->protocol_max = be32_to_cpu(p->protocol_max);
4015         if (p->protocol_max == 0)
4016                 p->protocol_max = p->protocol_min;
4017
4018         if (PRO_VERSION_MAX < p->protocol_min ||
4019             PRO_VERSION_MIN > p->protocol_max)
4020                 goto incompat;
4021
4022         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4023
4024         dev_info(DEV, "Handshake successful: "
4025              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4026
4027         return 1;
4028
4029  incompat:
4030         dev_err(DEV, "incompatible DRBD dialects: "
4031             "I support %d-%d, peer supports %d-%d\n",
4032             PRO_VERSION_MIN, PRO_VERSION_MAX,
4033             p->protocol_min, p->protocol_max);
4034         return -1;
4035 }
4036
4037 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4038 static int drbd_do_auth(struct drbd_conf *mdev)
4039 {
4040         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4041         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4042         return -1;
4043 }
4044 #else
4045 #define CHALLENGE_LEN 64
4046
4047 /* Return value:
4048         1 - auth succeeded,
4049         0 - failed, try again (network error),
4050         -1 - auth failed, don't try again.
4051 */
4052
4053 static int drbd_do_auth(struct drbd_conf *mdev)
4054 {
4055         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4056         struct scatterlist sg;
4057         char *response = NULL;
4058         char *right_response = NULL;
4059         char *peers_ch = NULL;
4060         struct p_header p;
4061         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4062         unsigned int resp_size;
4063         struct hash_desc desc;
4064         int rv;
4065
4066         desc.tfm = mdev->cram_hmac_tfm;
4067         desc.flags = 0;
4068
4069         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4070                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4071         if (rv) {
4072                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4073                 rv = -1;
4074                 goto fail;
4075         }
4076
4077         get_random_bytes(my_challenge, CHALLENGE_LEN);
4078
4079         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4080         if (!rv)
4081                 goto fail;
4082
4083         rv = drbd_recv_header(mdev, &p);
4084         if (!rv)
4085                 goto fail;
4086
4087         if (p.command != P_AUTH_CHALLENGE) {
4088                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4089                     cmdname(p.command), p.command);
4090                 rv = 0;
4091                 goto fail;
4092         }
4093
4094         if (p.length > CHALLENGE_LEN*2) {
4095                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4096                 rv = -1;
4097                 goto fail;
4098         }
4099
4100         peers_ch = kmalloc(p.length, GFP_NOIO);
4101         if (peers_ch == NULL) {
4102                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4103                 rv = -1;
4104                 goto fail;
4105         }
4106
4107         rv = drbd_recv(mdev, peers_ch, p.length);
4108
4109         if (rv != p.length) {
4110                 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4111                 rv = 0;
4112                 goto fail;
4113         }
4114
4115         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4116         response = kmalloc(resp_size, GFP_NOIO);
4117         if (response == NULL) {
4118                 dev_err(DEV, "kmalloc of response failed\n");
4119                 rv = -1;
4120                 goto fail;
4121         }
4122
4123         sg_init_table(&sg, 1);
4124         sg_set_buf(&sg, peers_ch, p.length);
4125
4126         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4127         if (rv) {
4128                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4129                 rv = -1;
4130                 goto fail;
4131         }
4132
4133         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4134         if (!rv)
4135                 goto fail;
4136
4137         rv = drbd_recv_header(mdev, &p);
4138         if (!rv)
4139                 goto fail;
4140
4141         if (p.command != P_AUTH_RESPONSE) {
4142                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4143                     cmdname(p.command), p.command);
4144                 rv = 0;
4145                 goto fail;
4146         }
4147
4148         if (p.length != resp_size) {
4149                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4150                 rv = 0;
4151                 goto fail;
4152         }
4153
4154         rv = drbd_recv(mdev, response , resp_size);
4155
4156         if (rv != resp_size) {
4157                 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4158                 rv = 0;
4159                 goto fail;
4160         }
4161
4162         right_response = kmalloc(resp_size, GFP_NOIO);
4163         if (right_response == NULL) {
4164                 dev_err(DEV, "kmalloc of right_response failed\n");
4165                 rv = -1;
4166                 goto fail;
4167         }
4168
4169         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4170
4171         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4172         if (rv) {
4173                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4174                 rv = -1;
4175                 goto fail;
4176         }
4177
4178         rv = !memcmp(response, right_response, resp_size);
4179
4180         if (rv)
4181                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4182                      resp_size, mdev->net_conf->cram_hmac_alg);
4183         else
4184                 rv = -1;
4185
4186  fail:
4187         kfree(peers_ch);
4188         kfree(response);
4189         kfree(right_response);
4190
4191         return rv;
4192 }
4193 #endif
4194
4195 int drbdd_init(struct drbd_thread *thi)
4196 {
4197         struct drbd_conf *mdev = thi->mdev;
4198         unsigned int minor = mdev_to_minor(mdev);
4199         int h;
4200
4201         sprintf(current->comm, "drbd%d_receiver", minor);
4202
4203         dev_info(DEV, "receiver (re)started\n");
4204
4205         do {
4206                 h = drbd_connect(mdev);
4207                 if (h == 0) {
4208                         drbd_disconnect(mdev);
4209                         __set_current_state(TASK_INTERRUPTIBLE);
4210                         schedule_timeout(HZ);
4211                 }
4212                 if (h == -1) {
4213                         dev_warn(DEV, "Discarding network configuration.\n");
4214                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4215                 }
4216         } while (h == 0);
4217
4218         if (h > 0) {
4219                 if (get_net_conf(mdev)) {
4220                         drbdd(mdev);
4221                         put_net_conf(mdev);
4222                 }
4223         }
4224
4225         drbd_disconnect(mdev);
4226
4227         dev_info(DEV, "receiver terminated\n");
4228         return 0;
4229 }
4230
4231 /* ********* acknowledge sender ******** */
4232
4233 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4234 {
4235         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4236
4237         int retcode = be32_to_cpu(p->retcode);
4238
4239         if (retcode >= SS_SUCCESS) {
4240                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4241         } else {
4242                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4243                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4244                     drbd_set_st_err_str(retcode), retcode);
4245         }
4246         wake_up(&mdev->state_wait);
4247
4248         return TRUE;
4249 }
4250
4251 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4252 {
4253         return drbd_send_ping_ack(mdev);
4254
4255 }
4256
4257 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4258 {
4259         /* restore idle timeout */
4260         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4261         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4262                 wake_up(&mdev->misc_wait);
4263
4264         return TRUE;
4265 }
4266
4267 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4268 {
4269         struct p_block_ack *p = (struct p_block_ack *)h;
4270         sector_t sector = be64_to_cpu(p->sector);
4271         int blksize = be32_to_cpu(p->blksize);
4272
4273         D_ASSERT(mdev->agreed_pro_version >= 89);
4274
4275         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4276
4277         drbd_rs_complete_io(mdev, sector);
4278         drbd_set_in_sync(mdev, sector, blksize);
4279         /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4280         mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4281         dec_rs_pending(mdev);
4282
4283         return TRUE;
4284 }
4285
4286 /* when we receive the ACK for a write request,
4287  * verify that we actually know about it */
4288 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4289         u64 id, sector_t sector)
4290 {
4291         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4292         struct hlist_node *n;
4293         struct drbd_request *req;
4294
4295         hlist_for_each_entry(req, n, slot, colision) {
4296                 if ((unsigned long)req == (unsigned long)id) {
4297                         if (req->sector != sector) {
4298                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4299                                     "wrong sector (%llus versus %llus)\n", req,
4300                                     (unsigned long long)req->sector,
4301                                     (unsigned long long)sector);
4302                                 break;
4303                         }
4304                         return req;
4305                 }
4306         }
4307         dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4308                 (void *)(unsigned long)id, (unsigned long long)sector);
4309         return NULL;
4310 }
4311
4312 typedef struct drbd_request *(req_validator_fn)
4313         (struct drbd_conf *mdev, u64 id, sector_t sector);
4314
4315 static int validate_req_change_req_state(struct drbd_conf *mdev,
4316         u64 id, sector_t sector, req_validator_fn validator,
4317         const char *func, enum drbd_req_event what)
4318 {
4319         struct drbd_request *req;
4320         struct bio_and_error m;
4321
4322         spin_lock_irq(&mdev->req_lock);
4323         req = validator(mdev, id, sector);
4324         if (unlikely(!req)) {
4325                 spin_unlock_irq(&mdev->req_lock);
4326                 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4327                 return FALSE;
4328         }
4329         __req_mod(req, what, &m);
4330         spin_unlock_irq(&mdev->req_lock);
4331
4332         if (m.bio)
4333                 complete_master_bio(mdev, &m);
4334         return TRUE;
4335 }
4336
4337 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4338 {
4339         struct p_block_ack *p = (struct p_block_ack *)h;
4340         sector_t sector = be64_to_cpu(p->sector);
4341         int blksize = be32_to_cpu(p->blksize);
4342         enum drbd_req_event what;
4343
4344         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4345
4346         if (is_syncer_block_id(p->block_id)) {
4347                 drbd_set_in_sync(mdev, sector, blksize);
4348                 dec_rs_pending(mdev);
4349                 return TRUE;
4350         }
4351         switch (be16_to_cpu(h->command)) {
4352         case P_RS_WRITE_ACK:
4353                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4354                 what = write_acked_by_peer_and_sis;
4355                 break;
4356         case P_WRITE_ACK:
4357                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4358                 what = write_acked_by_peer;
4359                 break;
4360         case P_RECV_ACK:
4361                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4362                 what = recv_acked_by_peer;
4363                 break;
4364         case P_DISCARD_ACK:
4365                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4366                 what = conflict_discarded_by_peer;
4367                 break;
4368         default:
4369                 D_ASSERT(0);
4370                 return FALSE;
4371         }
4372
4373         return validate_req_change_req_state(mdev, p->block_id, sector,
4374                 _ack_id_to_req, __func__ , what);
4375 }
4376
4377 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4378 {
4379         struct p_block_ack *p = (struct p_block_ack *)h;
4380         sector_t sector = be64_to_cpu(p->sector);
4381
4382         if (__ratelimit(&drbd_ratelimit_state))
4383                 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4384
4385         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4386
4387         if (is_syncer_block_id(p->block_id)) {
4388                 int size = be32_to_cpu(p->blksize);
4389                 dec_rs_pending(mdev);
4390                 drbd_rs_failed_io(mdev, sector, size);
4391                 return TRUE;
4392         }
4393         return validate_req_change_req_state(mdev, p->block_id, sector,
4394                 _ack_id_to_req, __func__ , neg_acked);
4395 }
4396
4397 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4398 {
4399         struct p_block_ack *p = (struct p_block_ack *)h;
4400         sector_t sector = be64_to_cpu(p->sector);
4401
4402         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4403         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4404             (unsigned long long)sector, be32_to_cpu(p->blksize));
4405
4406         return validate_req_change_req_state(mdev, p->block_id, sector,
4407                 _ar_id_to_req, __func__ , neg_acked);
4408 }
4409
4410 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4411 {
4412         sector_t sector;
4413         int size;
4414         struct p_block_ack *p = (struct p_block_ack *)h;
4415
4416         sector = be64_to_cpu(p->sector);
4417         size = be32_to_cpu(p->blksize);
4418
4419         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4420
4421         dec_rs_pending(mdev);
4422
4423         if (get_ldev_if_state(mdev, D_FAILED)) {
4424                 drbd_rs_complete_io(mdev, sector);
4425                 drbd_rs_failed_io(mdev, sector, size);
4426                 put_ldev(mdev);
4427         }
4428
4429         return TRUE;
4430 }
4431
4432 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4433 {
4434         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4435
4436         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4437
4438         return TRUE;
4439 }
4440
4441 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4442 {
4443         struct p_block_ack *p = (struct p_block_ack *)h;
4444         struct drbd_work *w;
4445         sector_t sector;
4446         int size;
4447
4448         sector = be64_to_cpu(p->sector);
4449         size = be32_to_cpu(p->blksize);
4450
4451         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4452
4453         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4454                 drbd_ov_oos_found(mdev, sector, size);
4455         else
4456                 ov_oos_print(mdev);
4457
4458         drbd_rs_complete_io(mdev, sector);
4459         dec_rs_pending(mdev);
4460
4461         if (--mdev->ov_left == 0) {
4462                 w = kmalloc(sizeof(*w), GFP_NOIO);
4463                 if (w) {
4464                         w->cb = w_ov_finished;
4465                         drbd_queue_work_front(&mdev->data.work, w);
4466                 } else {
4467                         dev_err(DEV, "kmalloc(w) failed.");
4468                         ov_oos_print(mdev);
4469                         drbd_resync_finished(mdev);
4470                 }
4471         }
4472         return TRUE;
4473 }
4474
4475 static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4476 {
4477         struct p_delay_probe *p = (struct p_delay_probe *)h;
4478
4479         got_delay_probe(mdev, USE_META_SOCKET, p);
4480         return TRUE;
4481 }
4482
4483 struct asender_cmd {
4484         size_t pkt_size;
4485         int (*process)(struct drbd_conf *mdev, struct p_header *h);
4486 };
4487
4488 static struct asender_cmd *get_asender_cmd(int cmd)
4489 {
4490         static struct asender_cmd asender_tbl[] = {
4491                 /* anything missing from this table is in
4492                  * the drbd_cmd_handler (drbd_default_handler) table,
4493                  * see the beginning of drbdd() */
4494         [P_PING]            = { sizeof(struct p_header), got_Ping },
4495         [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
4496         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4497         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4498         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4499         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4500         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4501         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4502         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4503         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4504         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4505         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4506         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4507         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_delay_probe_m },
4508         [P_MAX_CMD]         = { 0, NULL },
4509         };
4510         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4511                 return NULL;
4512         return &asender_tbl[cmd];
4513 }
4514
4515 int drbd_asender(struct drbd_thread *thi)
4516 {
4517         struct drbd_conf *mdev = thi->mdev;
4518         struct p_header *h = &mdev->meta.rbuf.header;
4519         struct asender_cmd *cmd = NULL;
4520
4521         int rv, len;
4522         void *buf    = h;
4523         int received = 0;
4524         int expect   = sizeof(struct p_header);
4525         int empty;
4526
4527         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4528
4529         current->policy = SCHED_RR;  /* Make this a realtime task! */
4530         current->rt_priority = 2;    /* more important than all other tasks */
4531
4532         while (get_t_state(thi) == Running) {
4533                 drbd_thread_current_set_cpu(mdev);
4534                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4535                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4536                         mdev->meta.socket->sk->sk_rcvtimeo =
4537                                 mdev->net_conf->ping_timeo*HZ/10;
4538                 }
4539
4540                 /* conditionally cork;
4541                  * it may hurt latency if we cork without much to send */
4542                 if (!mdev->net_conf->no_cork &&
4543                         3 < atomic_read(&mdev->unacked_cnt))
4544                         drbd_tcp_cork(mdev->meta.socket);
4545                 while (1) {
4546                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4547                         flush_signals(current);
4548                         if (!drbd_process_done_ee(mdev)) {
4549                                 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4550                                 goto reconnect;
4551                         }
4552                         /* to avoid race with newly queued ACKs */
4553                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4554                         spin_lock_irq(&mdev->req_lock);
4555                         empty = list_empty(&mdev->done_ee);
4556                         spin_unlock_irq(&mdev->req_lock);
4557                         /* new ack may have been queued right here,
4558                          * but then there is also a signal pending,
4559                          * and we start over... */
4560                         if (empty)
4561                                 break;
4562                 }
4563                 /* but unconditionally uncork unless disabled */
4564                 if (!mdev->net_conf->no_cork)
4565                         drbd_tcp_uncork(mdev->meta.socket);
4566
4567                 /* short circuit, recv_msg would return EINTR anyways. */
4568                 if (signal_pending(current))
4569                         continue;
4570
4571                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4572                                      buf, expect-received, 0);
4573                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4574
4575                 flush_signals(current);
4576
4577                 /* Note:
4578                  * -EINTR        (on meta) we got a signal
4579                  * -EAGAIN       (on meta) rcvtimeo expired
4580                  * -ECONNRESET   other side closed the connection
4581                  * -ERESTARTSYS  (on data) we got a signal
4582                  * rv <  0       other than above: unexpected error!
4583                  * rv == expected: full header or command
4584                  * rv <  expected: "woken" by signal during receive
4585                  * rv == 0       : "connection shut down by peer"
4586                  */
4587                 if (likely(rv > 0)) {
4588                         received += rv;
4589                         buf      += rv;
4590                 } else if (rv == 0) {
4591                         dev_err(DEV, "meta connection shut down by peer.\n");
4592                         goto reconnect;
4593                 } else if (rv == -EAGAIN) {
4594                         if (mdev->meta.socket->sk->sk_rcvtimeo ==
4595                             mdev->net_conf->ping_timeo*HZ/10) {
4596                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4597                                 goto reconnect;
4598                         }
4599                         set_bit(SEND_PING, &mdev->flags);
4600                         continue;
4601                 } else if (rv == -EINTR) {
4602                         continue;
4603                 } else {
4604                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4605                         goto reconnect;
4606                 }
4607
4608                 if (received == expect && cmd == NULL) {
4609                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4610                                 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4611                                     (long)be32_to_cpu(h->magic),
4612                                     h->command, h->length);
4613                                 goto reconnect;
4614                         }
4615                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4616                         len = be16_to_cpu(h->length);
4617                         if (unlikely(cmd == NULL)) {
4618                                 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4619                                     (long)be32_to_cpu(h->magic),
4620                                     h->command, h->length);
4621                                 goto disconnect;
4622                         }
4623                         expect = cmd->pkt_size;
4624                         ERR_IF(len != expect-sizeof(struct p_header))
4625                                 goto reconnect;
4626                 }
4627                 if (received == expect) {
4628                         D_ASSERT(cmd != NULL);
4629                         if (!cmd->process(mdev, h))
4630                                 goto reconnect;
4631
4632                         buf      = h;
4633                         received = 0;
4634                         expect   = sizeof(struct p_header);
4635                         cmd      = NULL;
4636                 }
4637         }
4638
4639         if (0) {
4640 reconnect:
4641                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4642         }
4643         if (0) {
4644 disconnect:
4645                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4646         }
4647         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4648
4649         D_ASSERT(mdev->state.conn < C_CONNECTED);
4650         dev_info(DEV, "asender terminated\n");
4651
4652         return 0;
4653 }