Merge branch 'for-2.6.36' of git://git.kernel.dk/linux-2.6-block
[pandora-kernel.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/string.h>
46 #include <linux/scatterlist.h>
47 #include "drbd_int.h"
48 #include "drbd_req.h"
49
50 #include "drbd_vli.h"
51
52 struct flush_work {
53         struct drbd_work w;
54         struct drbd_epoch *epoch;
55 };
56
57 enum finish_epoch {
58         FE_STILL_LIVE,
59         FE_DESTROYED,
60         FE_RECYCLED,
61 };
62
63 static int drbd_do_handshake(struct drbd_conf *mdev);
64 static int drbd_do_auth(struct drbd_conf *mdev);
65
66 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68
69 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70 {
71         struct drbd_epoch *prev;
72         spin_lock(&mdev->epoch_lock);
73         prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74         if (prev == epoch || prev == mdev->current_epoch)
75                 prev = NULL;
76         spin_unlock(&mdev->epoch_lock);
77         return prev;
78 }
79
80 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
81
82 /*
83  * some helper functions to deal with single linked page lists,
84  * page->private being our "next" pointer.
85  */
86
87 /* If at least n pages are linked at head, get n pages off.
88  * Otherwise, don't modify head, and return NULL.
89  * Locking is the responsibility of the caller.
90  */
91 static struct page *page_chain_del(struct page **head, int n)
92 {
93         struct page *page;
94         struct page *tmp;
95
96         BUG_ON(!n);
97         BUG_ON(!head);
98
99         page = *head;
100
101         if (!page)
102                 return NULL;
103
104         while (page) {
105                 tmp = page_chain_next(page);
106                 if (--n == 0)
107                         break; /* found sufficient pages */
108                 if (tmp == NULL)
109                         /* insufficient pages, don't use any of them. */
110                         return NULL;
111                 page = tmp;
112         }
113
114         /* add end of list marker for the returned list */
115         set_page_private(page, 0);
116         /* actual return value, and adjustment of head */
117         page = *head;
118         *head = tmp;
119         return page;
120 }
121
122 /* may be used outside of locks to find the tail of a (usually short)
123  * "private" page chain, before adding it back to a global chain head
124  * with page_chain_add() under a spinlock. */
125 static struct page *page_chain_tail(struct page *page, int *len)
126 {
127         struct page *tmp;
128         int i = 1;
129         while ((tmp = page_chain_next(page)))
130                 ++i, page = tmp;
131         if (len)
132                 *len = i;
133         return page;
134 }
135
136 static int page_chain_free(struct page *page)
137 {
138         struct page *tmp;
139         int i = 0;
140         page_chain_for_each_safe(page, tmp) {
141                 put_page(page);
142                 ++i;
143         }
144         return i;
145 }
146
147 static void page_chain_add(struct page **head,
148                 struct page *chain_first, struct page *chain_last)
149 {
150 #if 1
151         struct page *tmp;
152         tmp = page_chain_tail(chain_first, NULL);
153         BUG_ON(tmp != chain_last);
154 #endif
155
156         /* add chain to head */
157         set_page_private(chain_last, (unsigned long)*head);
158         *head = chain_first;
159 }
160
161 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
162 {
163         struct page *page = NULL;
164         struct page *tmp = NULL;
165         int i = 0;
166
167         /* Yes, testing drbd_pp_vacant outside the lock is racy.
168          * So what. It saves a spin_lock. */
169         if (drbd_pp_vacant >= number) {
170                 spin_lock(&drbd_pp_lock);
171                 page = page_chain_del(&drbd_pp_pool, number);
172                 if (page)
173                         drbd_pp_vacant -= number;
174                 spin_unlock(&drbd_pp_lock);
175                 if (page)
176                         return page;
177         }
178
179         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180          * "criss-cross" setup, that might cause write-out on some other DRBD,
181          * which in turn might block on the other node at this very place.  */
182         for (i = 0; i < number; i++) {
183                 tmp = alloc_page(GFP_TRY);
184                 if (!tmp)
185                         break;
186                 set_page_private(tmp, (unsigned long)page);
187                 page = tmp;
188         }
189
190         if (i == number)
191                 return page;
192
193         /* Not enough pages immediately available this time.
194          * No need to jump around here, drbd_pp_alloc will retry this
195          * function "soon". */
196         if (page) {
197                 tmp = page_chain_tail(page, NULL);
198                 spin_lock(&drbd_pp_lock);
199                 page_chain_add(&drbd_pp_pool, page, tmp);
200                 drbd_pp_vacant += i;
201                 spin_unlock(&drbd_pp_lock);
202         }
203         return NULL;
204 }
205
206 /* kick lower level device, if we have more than (arbitrary number)
207  * reference counts on it, which typically are locally submitted io
208  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
209 static void maybe_kick_lo(struct drbd_conf *mdev)
210 {
211         if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212                 drbd_kick_lo(mdev);
213 }
214
215 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216 {
217         struct drbd_epoch_entry *e;
218         struct list_head *le, *tle;
219
220         /* The EEs are always appended to the end of the list. Since
221            they are sent in order over the wire, they have to finish
222            in order. As soon as we see the first not finished we can
223            stop to examine the list... */
224
225         list_for_each_safe(le, tle, &mdev->net_ee) {
226                 e = list_entry(le, struct drbd_epoch_entry, w.list);
227                 if (drbd_ee_has_active_page(e))
228                         break;
229                 list_move(le, to_be_freed);
230         }
231 }
232
233 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234 {
235         LIST_HEAD(reclaimed);
236         struct drbd_epoch_entry *e, *t;
237
238         maybe_kick_lo(mdev);
239         spin_lock_irq(&mdev->req_lock);
240         reclaim_net_ee(mdev, &reclaimed);
241         spin_unlock_irq(&mdev->req_lock);
242
243         list_for_each_entry_safe(e, t, &reclaimed, w.list)
244                 drbd_free_ee(mdev, e);
245 }
246
247 /**
248  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
249  * @mdev:       DRBD device.
250  * @number:     number of pages requested
251  * @retry:      whether to retry, if not enough pages are available right now
252  *
253  * Tries to allocate number pages, first from our own page pool, then from
254  * the kernel, unless this allocation would exceed the max_buffers setting.
255  * Possibly retry until DRBD frees sufficient pages somewhere else.
256  *
257  * Returns a page chain linked via page->private.
258  */
259 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
260 {
261         struct page *page = NULL;
262         DEFINE_WAIT(wait);
263
264         /* Yes, we may run up to @number over max_buffers. If we
265          * follow it strictly, the admin will get it wrong anyways. */
266         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
268
269         while (page == NULL) {
270                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271
272                 drbd_kick_lo_and_reclaim_net(mdev);
273
274                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
275                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
276                         if (page)
277                                 break;
278                 }
279
280                 if (!retry)
281                         break;
282
283                 if (signal_pending(current)) {
284                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285                         break;
286                 }
287
288                 schedule();
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &mdev->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
298  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
302 {
303         int i;
304         if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
305                 i = page_chain_free(page);
306         else {
307                 struct page *tmp;
308                 tmp = page_chain_tail(page, &i);
309                 spin_lock(&drbd_pp_lock);
310                 page_chain_add(&drbd_pp_pool, page, tmp);
311                 drbd_pp_vacant += i;
312                 spin_unlock(&drbd_pp_lock);
313         }
314         atomic_sub(i, &mdev->pp_in_use);
315         i = atomic_read(&mdev->pp_in_use);
316         if (i < 0)
317                 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
318         wake_up(&drbd_pp_wait);
319 }
320
321 /*
322 You need to hold the req_lock:
323  _drbd_wait_ee_list_empty()
324
325 You must not have the req_lock:
326  drbd_free_ee()
327  drbd_alloc_ee()
328  drbd_init_ee()
329  drbd_release_ee()
330  drbd_ee_fix_bhs()
331  drbd_process_done_ee()
332  drbd_clear_done_ee()
333  drbd_wait_ee_list_empty()
334 */
335
336 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
337                                      u64 id,
338                                      sector_t sector,
339                                      unsigned int data_size,
340                                      gfp_t gfp_mask) __must_hold(local)
341 {
342         struct drbd_epoch_entry *e;
343         struct page *page;
344         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
345
346         if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
347                 return NULL;
348
349         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
350         if (!e) {
351                 if (!(gfp_mask & __GFP_NOWARN))
352                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
353                 return NULL;
354         }
355
356         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
357         if (!page)
358                 goto fail;
359
360         INIT_HLIST_NODE(&e->colision);
361         e->epoch = NULL;
362         e->mdev = mdev;
363         e->pages = page;
364         atomic_set(&e->pending_bios, 0);
365         e->size = data_size;
366         e->flags = 0;
367         e->sector = sector;
368         e->sector = sector;
369         e->block_id = id;
370
371         return e;
372
373  fail:
374         mempool_free(e, drbd_ee_mempool);
375         return NULL;
376 }
377
378 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
379 {
380         drbd_pp_free(mdev, e->pages);
381         D_ASSERT(atomic_read(&e->pending_bios) == 0);
382         D_ASSERT(hlist_unhashed(&e->colision));
383         mempool_free(e, drbd_ee_mempool);
384 }
385
386 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_epoch_entry *e, *t;
390         int count = 0;
391
392         spin_lock_irq(&mdev->req_lock);
393         list_splice_init(list, &work_list);
394         spin_unlock_irq(&mdev->req_lock);
395
396         list_for_each_entry_safe(e, t, &work_list, w.list) {
397                 drbd_free_ee(mdev, e);
398                 count++;
399         }
400         return count;
401 }
402
403
404 /*
405  * This function is called from _asender only_
406  * but see also comments in _req_mod(,barrier_acked)
407  * and receive_Barrier.
408  *
409  * Move entries from net_ee to done_ee, if ready.
410  * Grab done_ee, call all callbacks, free the entries.
411  * The callbacks typically send out ACKs.
412  */
413 static int drbd_process_done_ee(struct drbd_conf *mdev)
414 {
415         LIST_HEAD(work_list);
416         LIST_HEAD(reclaimed);
417         struct drbd_epoch_entry *e, *t;
418         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
419
420         spin_lock_irq(&mdev->req_lock);
421         reclaim_net_ee(mdev, &reclaimed);
422         list_splice_init(&mdev->done_ee, &work_list);
423         spin_unlock_irq(&mdev->req_lock);
424
425         list_for_each_entry_safe(e, t, &reclaimed, w.list)
426                 drbd_free_ee(mdev, e);
427
428         /* possible callbacks here:
429          * e_end_block, and e_end_resync_block, e_send_discard_ack.
430          * all ignore the last argument.
431          */
432         list_for_each_entry_safe(e, t, &work_list, w.list) {
433                 /* list_del not necessary, next/prev members not touched */
434                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
435                 drbd_free_ee(mdev, e);
436         }
437         wake_up(&mdev->ee_wait);
438
439         return ok;
440 }
441
442 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
443 {
444         DEFINE_WAIT(wait);
445
446         /* avoids spin_lock/unlock
447          * and calling prepare_to_wait in the fast path */
448         while (!list_empty(head)) {
449                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
450                 spin_unlock_irq(&mdev->req_lock);
451                 drbd_kick_lo(mdev);
452                 schedule();
453                 finish_wait(&mdev->ee_wait, &wait);
454                 spin_lock_irq(&mdev->req_lock);
455         }
456 }
457
458 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
459 {
460         spin_lock_irq(&mdev->req_lock);
461         _drbd_wait_ee_list_empty(mdev, head);
462         spin_unlock_irq(&mdev->req_lock);
463 }
464
465 /* see also kernel_accept; which is only present since 2.6.18.
466  * also we want to log which part of it failed, exactly */
467 static int drbd_accept(struct drbd_conf *mdev, const char **what,
468                 struct socket *sock, struct socket **newsock)
469 {
470         struct sock *sk = sock->sk;
471         int err = 0;
472
473         *what = "listen";
474         err = sock->ops->listen(sock, 5);
475         if (err < 0)
476                 goto out;
477
478         *what = "sock_create_lite";
479         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
480                                newsock);
481         if (err < 0)
482                 goto out;
483
484         *what = "accept";
485         err = sock->ops->accept(sock, *newsock, 0);
486         if (err < 0) {
487                 sock_release(*newsock);
488                 *newsock = NULL;
489                 goto out;
490         }
491         (*newsock)->ops  = sock->ops;
492
493 out:
494         return err;
495 }
496
497 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
498                     void *buf, size_t size, int flags)
499 {
500         mm_segment_t oldfs;
501         struct kvec iov = {
502                 .iov_base = buf,
503                 .iov_len = size,
504         };
505         struct msghdr msg = {
506                 .msg_iovlen = 1,
507                 .msg_iov = (struct iovec *)&iov,
508                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
509         };
510         int rv;
511
512         oldfs = get_fs();
513         set_fs(KERNEL_DS);
514         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
515         set_fs(oldfs);
516
517         return rv;
518 }
519
520 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
521 {
522         mm_segment_t oldfs;
523         struct kvec iov = {
524                 .iov_base = buf,
525                 .iov_len = size,
526         };
527         struct msghdr msg = {
528                 .msg_iovlen = 1,
529                 .msg_iov = (struct iovec *)&iov,
530                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
531         };
532         int rv;
533
534         oldfs = get_fs();
535         set_fs(KERNEL_DS);
536
537         for (;;) {
538                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
539                 if (rv == size)
540                         break;
541
542                 /* Note:
543                  * ECONNRESET   other side closed the connection
544                  * ERESTARTSYS  (on  sock) we got a signal
545                  */
546
547                 if (rv < 0) {
548                         if (rv == -ECONNRESET)
549                                 dev_info(DEV, "sock was reset by peer\n");
550                         else if (rv != -ERESTARTSYS)
551                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
552                         break;
553                 } else if (rv == 0) {
554                         dev_info(DEV, "sock was shut down by peer\n");
555                         break;
556                 } else  {
557                         /* signal came in, or peer/link went down,
558                          * after we read a partial message
559                          */
560                         /* D_ASSERT(signal_pending(current)); */
561                         break;
562                 }
563         };
564
565         set_fs(oldfs);
566
567         if (rv != size)
568                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
569
570         return rv;
571 }
572
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579                 unsigned int rcv)
580 {
581         /* open coded SO_SNDBUF, SO_RCVBUF */
582         if (snd) {
583                 sock->sk->sk_sndbuf = snd;
584                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585         }
586         if (rcv) {
587                 sock->sk->sk_rcvbuf = rcv;
588                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589         }
590 }
591
592 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
593 {
594         const char *what;
595         struct socket *sock;
596         struct sockaddr_in6 src_in6;
597         int err;
598         int disconnect_on_error = 1;
599
600         if (!get_net_conf(mdev))
601                 return NULL;
602
603         what = "sock_create_kern";
604         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
605                 SOCK_STREAM, IPPROTO_TCP, &sock);
606         if (err < 0) {
607                 sock = NULL;
608                 goto out;
609         }
610
611         sock->sk->sk_rcvtimeo =
612         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
613         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
614                         mdev->net_conf->rcvbuf_size);
615
616        /* explicitly bind to the configured IP as source IP
617         *  for the outgoing connections.
618         *  This is needed for multihomed hosts and to be
619         *  able to use lo: interfaces for drbd.
620         * Make sure to use 0 as port number, so linux selects
621         *  a free one dynamically.
622         */
623         memcpy(&src_in6, mdev->net_conf->my_addr,
624                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
625         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
626                 src_in6.sin6_port = 0;
627         else
628                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
629
630         what = "bind before connect";
631         err = sock->ops->bind(sock,
632                               (struct sockaddr *) &src_in6,
633                               mdev->net_conf->my_addr_len);
634         if (err < 0)
635                 goto out;
636
637         /* connect may fail, peer not yet available.
638          * stay C_WF_CONNECTION, don't go Disconnecting! */
639         disconnect_on_error = 0;
640         what = "connect";
641         err = sock->ops->connect(sock,
642                                  (struct sockaddr *)mdev->net_conf->peer_addr,
643                                  mdev->net_conf->peer_addr_len, 0);
644
645 out:
646         if (err < 0) {
647                 if (sock) {
648                         sock_release(sock);
649                         sock = NULL;
650                 }
651                 switch (-err) {
652                         /* timeout, busy, signal pending */
653                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
654                 case EINTR: case ERESTARTSYS:
655                         /* peer not (yet) available, network problem */
656                 case ECONNREFUSED: case ENETUNREACH:
657                 case EHOSTDOWN:    case EHOSTUNREACH:
658                         disconnect_on_error = 0;
659                         break;
660                 default:
661                         dev_err(DEV, "%s failed, err = %d\n", what, err);
662                 }
663                 if (disconnect_on_error)
664                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
665         }
666         put_net_conf(mdev);
667         return sock;
668 }
669
670 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
671 {
672         int timeo, err;
673         struct socket *s_estab = NULL, *s_listen;
674         const char *what;
675
676         if (!get_net_conf(mdev))
677                 return NULL;
678
679         what = "sock_create_kern";
680         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
681                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
682         if (err) {
683                 s_listen = NULL;
684                 goto out;
685         }
686
687         timeo = mdev->net_conf->try_connect_int * HZ;
688         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
689
690         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
691         s_listen->sk->sk_rcvtimeo = timeo;
692         s_listen->sk->sk_sndtimeo = timeo;
693         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
694                         mdev->net_conf->rcvbuf_size);
695
696         what = "bind before listen";
697         err = s_listen->ops->bind(s_listen,
698                               (struct sockaddr *) mdev->net_conf->my_addr,
699                               mdev->net_conf->my_addr_len);
700         if (err < 0)
701                 goto out;
702
703         err = drbd_accept(mdev, &what, s_listen, &s_estab);
704
705 out:
706         if (s_listen)
707                 sock_release(s_listen);
708         if (err < 0) {
709                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
710                         dev_err(DEV, "%s failed, err = %d\n", what, err);
711                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
712                 }
713         }
714         put_net_conf(mdev);
715
716         return s_estab;
717 }
718
719 static int drbd_send_fp(struct drbd_conf *mdev,
720         struct socket *sock, enum drbd_packets cmd)
721 {
722         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
723
724         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
725 }
726
727 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
728 {
729         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
730         int rr;
731
732         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
733
734         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
735                 return be16_to_cpu(h->command);
736
737         return 0xffff;
738 }
739
740 /**
741  * drbd_socket_okay() - Free the socket if its connection is not okay
742  * @mdev:       DRBD device.
743  * @sock:       pointer to the pointer to the socket.
744  */
745 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
746 {
747         int rr;
748         char tb[4];
749
750         if (!*sock)
751                 return FALSE;
752
753         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
754
755         if (rr > 0 || rr == -EAGAIN) {
756                 return TRUE;
757         } else {
758                 sock_release(*sock);
759                 *sock = NULL;
760                 return FALSE;
761         }
762 }
763
764 /*
765  * return values:
766  *   1 yes, we have a valid connection
767  *   0 oops, did not work out, please try again
768  *  -1 peer talks different language,
769  *     no point in trying again, please go standalone.
770  *  -2 We do not have a network config...
771  */
772 static int drbd_connect(struct drbd_conf *mdev)
773 {
774         struct socket *s, *sock, *msock;
775         int try, h, ok;
776
777         D_ASSERT(!mdev->data.socket);
778
779         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
780                 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
781
782         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
783                 return -2;
784
785         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
786
787         sock  = NULL;
788         msock = NULL;
789
790         do {
791                 for (try = 0;;) {
792                         /* 3 tries, this should take less than a second! */
793                         s = drbd_try_connect(mdev);
794                         if (s || ++try >= 3)
795                                 break;
796                         /* give the other side time to call bind() & listen() */
797                         __set_current_state(TASK_INTERRUPTIBLE);
798                         schedule_timeout(HZ / 10);
799                 }
800
801                 if (s) {
802                         if (!sock) {
803                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
804                                 sock = s;
805                                 s = NULL;
806                         } else if (!msock) {
807                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
808                                 msock = s;
809                                 s = NULL;
810                         } else {
811                                 dev_err(DEV, "Logic error in drbd_connect()\n");
812                                 goto out_release_sockets;
813                         }
814                 }
815
816                 if (sock && msock) {
817                         __set_current_state(TASK_INTERRUPTIBLE);
818                         schedule_timeout(HZ / 10);
819                         ok = drbd_socket_okay(mdev, &sock);
820                         ok = drbd_socket_okay(mdev, &msock) && ok;
821                         if (ok)
822                                 break;
823                 }
824
825 retry:
826                 s = drbd_wait_for_connect(mdev);
827                 if (s) {
828                         try = drbd_recv_fp(mdev, s);
829                         drbd_socket_okay(mdev, &sock);
830                         drbd_socket_okay(mdev, &msock);
831                         switch (try) {
832                         case P_HAND_SHAKE_S:
833                                 if (sock) {
834                                         dev_warn(DEV, "initial packet S crossed\n");
835                                         sock_release(sock);
836                                 }
837                                 sock = s;
838                                 break;
839                         case P_HAND_SHAKE_M:
840                                 if (msock) {
841                                         dev_warn(DEV, "initial packet M crossed\n");
842                                         sock_release(msock);
843                                 }
844                                 msock = s;
845                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
846                                 break;
847                         default:
848                                 dev_warn(DEV, "Error receiving initial packet\n");
849                                 sock_release(s);
850                                 if (random32() & 1)
851                                         goto retry;
852                         }
853                 }
854
855                 if (mdev->state.conn <= C_DISCONNECTING)
856                         goto out_release_sockets;
857                 if (signal_pending(current)) {
858                         flush_signals(current);
859                         smp_rmb();
860                         if (get_t_state(&mdev->receiver) == Exiting)
861                                 goto out_release_sockets;
862                 }
863
864                 if (sock && msock) {
865                         ok = drbd_socket_okay(mdev, &sock);
866                         ok = drbd_socket_okay(mdev, &msock) && ok;
867                         if (ok)
868                                 break;
869                 }
870         } while (1);
871
872         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
874
875         sock->sk->sk_allocation = GFP_NOIO;
876         msock->sk->sk_allocation = GFP_NOIO;
877
878         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
879         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
880
881         /* NOT YET ...
882          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
883          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
884          * first set it to the P_HAND_SHAKE timeout,
885          * which we set to 4x the configured ping_timeout. */
886         sock->sk->sk_sndtimeo =
887         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
888
889         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
890         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
891
892         /* we don't want delays.
893          * we use TCP_CORK where apropriate, though */
894         drbd_tcp_nodelay(sock);
895         drbd_tcp_nodelay(msock);
896
897         mdev->data.socket = sock;
898         mdev->meta.socket = msock;
899         mdev->last_received = jiffies;
900
901         D_ASSERT(mdev->asender.task == NULL);
902
903         h = drbd_do_handshake(mdev);
904         if (h <= 0)
905                 return h;
906
907         if (mdev->cram_hmac_tfm) {
908                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
909                 switch (drbd_do_auth(mdev)) {
910                 case -1:
911                         dev_err(DEV, "Authentication of peer failed\n");
912                         return -1;
913                 case 0:
914                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
915                         return 0;
916                 }
917         }
918
919         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
920                 return 0;
921
922         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
923         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
924
925         atomic_set(&mdev->packet_seq, 0);
926         mdev->peer_seq = 0;
927
928         drbd_thread_start(&mdev->asender);
929
930         if (!drbd_send_protocol(mdev))
931                 return -1;
932         drbd_send_sync_param(mdev, &mdev->sync_conf);
933         drbd_send_sizes(mdev, 0, 0);
934         drbd_send_uuids(mdev);
935         drbd_send_state(mdev);
936         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
937         clear_bit(RESIZE_PENDING, &mdev->flags);
938
939         return 1;
940
941 out_release_sockets:
942         if (sock)
943                 sock_release(sock);
944         if (msock)
945                 sock_release(msock);
946         return -1;
947 }
948
949 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
950 {
951         int r;
952
953         r = drbd_recv(mdev, h, sizeof(*h));
954
955         if (unlikely(r != sizeof(*h))) {
956                 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
957                 return FALSE;
958         };
959         h->command = be16_to_cpu(h->command);
960         h->length  = be16_to_cpu(h->length);
961         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
962                 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
963                     (long)be32_to_cpu(h->magic),
964                     h->command, h->length);
965                 return FALSE;
966         }
967         mdev->last_received = jiffies;
968
969         return TRUE;
970 }
971
972 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
973 {
974         int rv;
975
976         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
977                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
978                                         NULL, BLKDEV_IFL_WAIT);
979                 if (rv) {
980                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
981                         /* would rather check on EOPNOTSUPP, but that is not reliable.
982                          * don't try again for ANY return value != 0
983                          * if (rv == -EOPNOTSUPP) */
984                         drbd_bump_write_ordering(mdev, WO_drain_io);
985                 }
986                 put_ldev(mdev);
987         }
988
989         return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
990 }
991
992 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
993 {
994         struct flush_work *fw = (struct flush_work *)w;
995         struct drbd_epoch *epoch = fw->epoch;
996
997         kfree(w);
998
999         if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
1000                 drbd_flush_after_epoch(mdev, epoch);
1001
1002         drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1003                               (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1004
1005         return 1;
1006 }
1007
1008 /**
1009  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1010  * @mdev:       DRBD device.
1011  * @epoch:      Epoch object.
1012  * @ev:         Epoch event.
1013  */
1014 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1015                                                struct drbd_epoch *epoch,
1016                                                enum epoch_event ev)
1017 {
1018         int finish, epoch_size;
1019         struct drbd_epoch *next_epoch;
1020         int schedule_flush = 0;
1021         enum finish_epoch rv = FE_STILL_LIVE;
1022
1023         spin_lock(&mdev->epoch_lock);
1024         do {
1025                 next_epoch = NULL;
1026                 finish = 0;
1027
1028                 epoch_size = atomic_read(&epoch->epoch_size);
1029
1030                 switch (ev & ~EV_CLEANUP) {
1031                 case EV_PUT:
1032                         atomic_dec(&epoch->active);
1033                         break;
1034                 case EV_GOT_BARRIER_NR:
1035                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1036
1037                         /* Special case: If we just switched from WO_bio_barrier to
1038                            WO_bdev_flush we should not finish the current epoch */
1039                         if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1040                             mdev->write_ordering != WO_bio_barrier &&
1041                             epoch == mdev->current_epoch)
1042                                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1043                         break;
1044                 case EV_BARRIER_DONE:
1045                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1046                         break;
1047                 case EV_BECAME_LAST:
1048                         /* nothing to do*/
1049                         break;
1050                 }
1051
1052                 if (epoch_size != 0 &&
1053                     atomic_read(&epoch->active) == 0 &&
1054                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1055                     epoch->list.prev == &mdev->current_epoch->list &&
1056                     !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1057                         /* Nearly all conditions are met to finish that epoch... */
1058                         if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1059                             mdev->write_ordering == WO_none ||
1060                             (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1061                             ev & EV_CLEANUP) {
1062                                 finish = 1;
1063                                 set_bit(DE_IS_FINISHING, &epoch->flags);
1064                         } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1065                                  mdev->write_ordering == WO_bio_barrier) {
1066                                 atomic_inc(&epoch->active);
1067                                 schedule_flush = 1;
1068                         }
1069                 }
1070                 if (finish) {
1071                         if (!(ev & EV_CLEANUP)) {
1072                                 spin_unlock(&mdev->epoch_lock);
1073                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1074                                 spin_lock(&mdev->epoch_lock);
1075                         }
1076                         dec_unacked(mdev);
1077
1078                         if (mdev->current_epoch != epoch) {
1079                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1080                                 list_del(&epoch->list);
1081                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1082                                 mdev->epochs--;
1083                                 kfree(epoch);
1084
1085                                 if (rv == FE_STILL_LIVE)
1086                                         rv = FE_DESTROYED;
1087                         } else {
1088                                 epoch->flags = 0;
1089                                 atomic_set(&epoch->epoch_size, 0);
1090                                 /* atomic_set(&epoch->active, 0); is already zero */
1091                                 if (rv == FE_STILL_LIVE)
1092                                         rv = FE_RECYCLED;
1093                         }
1094                 }
1095
1096                 if (!next_epoch)
1097                         break;
1098
1099                 epoch = next_epoch;
1100         } while (1);
1101
1102         spin_unlock(&mdev->epoch_lock);
1103
1104         if (schedule_flush) {
1105                 struct flush_work *fw;
1106                 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1107                 if (fw) {
1108                         fw->w.cb = w_flush;
1109                         fw->epoch = epoch;
1110                         drbd_queue_work(&mdev->data.work, &fw->w);
1111                 } else {
1112                         dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1113                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1114                         /* That is not a recursion, only one level */
1115                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1116                         drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1117                 }
1118         }
1119
1120         return rv;
1121 }
1122
1123 /**
1124  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1125  * @mdev:       DRBD device.
1126  * @wo:         Write ordering method to try.
1127  */
1128 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1129 {
1130         enum write_ordering_e pwo;
1131         static char *write_ordering_str[] = {
1132                 [WO_none] = "none",
1133                 [WO_drain_io] = "drain",
1134                 [WO_bdev_flush] = "flush",
1135                 [WO_bio_barrier] = "barrier",
1136         };
1137
1138         pwo = mdev->write_ordering;
1139         wo = min(pwo, wo);
1140         if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1141                 wo = WO_bdev_flush;
1142         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1143                 wo = WO_drain_io;
1144         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1145                 wo = WO_none;
1146         mdev->write_ordering = wo;
1147         if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1148                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1149 }
1150
1151 /**
1152  * drbd_submit_ee()
1153  * @mdev:       DRBD device.
1154  * @e:          epoch entry
1155  * @rw:         flag field, see bio->bi_rw
1156  */
1157 /* TODO allocate from our own bio_set. */
1158 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1159                 const unsigned rw, const int fault_type)
1160 {
1161         struct bio *bios = NULL;
1162         struct bio *bio;
1163         struct page *page = e->pages;
1164         sector_t sector = e->sector;
1165         unsigned ds = e->size;
1166         unsigned n_bios = 0;
1167         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1168
1169         /* In most cases, we will only need one bio.  But in case the lower
1170          * level restrictions happen to be different at this offset on this
1171          * side than those of the sending peer, we may need to submit the
1172          * request in more than one bio. */
1173 next_bio:
1174         bio = bio_alloc(GFP_NOIO, nr_pages);
1175         if (!bio) {
1176                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1177                 goto fail;
1178         }
1179         /* > e->sector, unless this is the first bio */
1180         bio->bi_sector = sector;
1181         bio->bi_bdev = mdev->ldev->backing_bdev;
1182         /* we special case some flags in the multi-bio case, see below
1183          * (REQ_UNPLUG, REQ_HARDBARRIER) */
1184         bio->bi_rw = rw;
1185         bio->bi_private = e;
1186         bio->bi_end_io = drbd_endio_sec;
1187
1188         bio->bi_next = bios;
1189         bios = bio;
1190         ++n_bios;
1191
1192         page_chain_for_each(page) {
1193                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1194                 if (!bio_add_page(bio, page, len, 0)) {
1195                         /* a single page must always be possible! */
1196                         BUG_ON(bio->bi_vcnt == 0);
1197                         goto next_bio;
1198                 }
1199                 ds -= len;
1200                 sector += len >> 9;
1201                 --nr_pages;
1202         }
1203         D_ASSERT(page == NULL);
1204         D_ASSERT(ds == 0);
1205
1206         atomic_set(&e->pending_bios, n_bios);
1207         do {
1208                 bio = bios;
1209                 bios = bios->bi_next;
1210                 bio->bi_next = NULL;
1211
1212                 /* strip off REQ_UNPLUG unless it is the last bio */
1213                 if (bios)
1214                         bio->bi_rw &= ~REQ_UNPLUG;
1215
1216                 drbd_generic_make_request(mdev, fault_type, bio);
1217
1218                 /* strip off REQ_HARDBARRIER,
1219                  * unless it is the first or last bio */
1220                 if (bios && bios->bi_next)
1221                         bios->bi_rw &= ~REQ_HARDBARRIER;
1222         } while (bios);
1223         maybe_kick_lo(mdev);
1224         return 0;
1225
1226 fail:
1227         while (bios) {
1228                 bio = bios;
1229                 bios = bios->bi_next;
1230                 bio_put(bio);
1231         }
1232         return -ENOMEM;
1233 }
1234
1235 /**
1236  * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
1237  * @mdev:       DRBD device.
1238  * @w:          work object.
1239  * @cancel:     The connection will be closed anyways (unused in this callback)
1240  */
1241 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1242 {
1243         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1244         /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1245            (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1246            so that we can finish that epoch in drbd_may_finish_epoch().
1247            That is necessary if we already have a long chain of Epochs, before
1248            we realize that REQ_HARDBARRIER is actually not supported */
1249
1250         /* As long as the -ENOTSUPP on the barrier is reported immediately
1251            that will never trigger. If it is reported late, we will just
1252            print that warning and continue correctly for all future requests
1253            with WO_bdev_flush */
1254         if (previous_epoch(mdev, e->epoch))
1255                 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1256
1257         /* we still have a local reference,
1258          * get_ldev was done in receive_Data. */
1259
1260         e->w.cb = e_end_block;
1261         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1262                 /* drbd_submit_ee fails for one reason only:
1263                  * if was not able to allocate sufficient bios.
1264                  * requeue, try again later. */
1265                 e->w.cb = w_e_reissue;
1266                 drbd_queue_work(&mdev->data.work, &e->w);
1267         }
1268         return 1;
1269 }
1270
1271 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1272 {
1273         int rv, issue_flush;
1274         struct p_barrier *p = (struct p_barrier *)h;
1275         struct drbd_epoch *epoch;
1276
1277         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1278
1279         rv = drbd_recv(mdev, h->payload, h->length);
1280         ERR_IF(rv != h->length) return FALSE;
1281
1282         inc_unacked(mdev);
1283
1284         if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1285                 drbd_kick_lo(mdev);
1286
1287         mdev->current_epoch->barrier_nr = p->barrier;
1288         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1289
1290         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1291          * the activity log, which means it would not be resynced in case the
1292          * R_PRIMARY crashes now.
1293          * Therefore we must send the barrier_ack after the barrier request was
1294          * completed. */
1295         switch (mdev->write_ordering) {
1296         case WO_bio_barrier:
1297         case WO_none:
1298                 if (rv == FE_RECYCLED)
1299                         return TRUE;
1300                 break;
1301
1302         case WO_bdev_flush:
1303         case WO_drain_io:
1304                 if (rv == FE_STILL_LIVE) {
1305                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1306                         drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1307                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1308                 }
1309                 if (rv == FE_RECYCLED)
1310                         return TRUE;
1311
1312                 /* The asender will send all the ACKs and barrier ACKs out, since
1313                    all EEs moved from the active_ee to the done_ee. We need to
1314                    provide a new epoch object for the EEs that come in soon */
1315                 break;
1316         }
1317
1318         /* receiver context, in the writeout path of the other node.
1319          * avoid potential distributed deadlock */
1320         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1321         if (!epoch) {
1322                 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1323                 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1324                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1325                 if (issue_flush) {
1326                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1327                         if (rv == FE_RECYCLED)
1328                                 return TRUE;
1329                 }
1330
1331                 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1332
1333                 return TRUE;
1334         }
1335
1336         epoch->flags = 0;
1337         atomic_set(&epoch->epoch_size, 0);
1338         atomic_set(&epoch->active, 0);
1339
1340         spin_lock(&mdev->epoch_lock);
1341         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1342                 list_add(&epoch->list, &mdev->current_epoch->list);
1343                 mdev->current_epoch = epoch;
1344                 mdev->epochs++;
1345         } else {
1346                 /* The current_epoch got recycled while we allocated this one... */
1347                 kfree(epoch);
1348         }
1349         spin_unlock(&mdev->epoch_lock);
1350
1351         return TRUE;
1352 }
1353
1354 /* used from receive_RSDataReply (recv_resync_read)
1355  * and from receive_Data */
1356 static struct drbd_epoch_entry *
1357 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1358 {
1359         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1360         struct drbd_epoch_entry *e;
1361         struct page *page;
1362         int dgs, ds, rr;
1363         void *dig_in = mdev->int_dig_in;
1364         void *dig_vv = mdev->int_dig_vv;
1365         unsigned long *data;
1366
1367         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1368                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1369
1370         if (dgs) {
1371                 rr = drbd_recv(mdev, dig_in, dgs);
1372                 if (rr != dgs) {
1373                         dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1374                              rr, dgs);
1375                         return NULL;
1376                 }
1377         }
1378
1379         data_size -= dgs;
1380
1381         ERR_IF(data_size &  0x1ff) return NULL;
1382         ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1383
1384         /* even though we trust out peer,
1385          * we sometimes have to double check. */
1386         if (sector + (data_size>>9) > capacity) {
1387                 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1388                         (unsigned long long)capacity,
1389                         (unsigned long long)sector, data_size);
1390                 return NULL;
1391         }
1392
1393         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1394          * "criss-cross" setup, that might cause write-out on some other DRBD,
1395          * which in turn might block on the other node at this very place.  */
1396         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1397         if (!e)
1398                 return NULL;
1399
1400         ds = data_size;
1401         page = e->pages;
1402         page_chain_for_each(page) {
1403                 unsigned len = min_t(int, ds, PAGE_SIZE);
1404                 data = kmap(page);
1405                 rr = drbd_recv(mdev, data, len);
1406                 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1407                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1408                         data[0] = data[0] ^ (unsigned long)-1;
1409                 }
1410                 kunmap(page);
1411                 if (rr != len) {
1412                         drbd_free_ee(mdev, e);
1413                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1414                              rr, len);
1415                         return NULL;
1416                 }
1417                 ds -= rr;
1418         }
1419
1420         if (dgs) {
1421                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1422                 if (memcmp(dig_in, dig_vv, dgs)) {
1423                         dev_err(DEV, "Digest integrity check FAILED.\n");
1424                         drbd_bcast_ee(mdev, "digest failed",
1425                                         dgs, dig_in, dig_vv, e);
1426                         drbd_free_ee(mdev, e);
1427                         return NULL;
1428                 }
1429         }
1430         mdev->recv_cnt += data_size>>9;
1431         return e;
1432 }
1433
1434 /* drbd_drain_block() just takes a data block
1435  * out of the socket input buffer, and discards it.
1436  */
1437 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1438 {
1439         struct page *page;
1440         int rr, rv = 1;
1441         void *data;
1442
1443         if (!data_size)
1444                 return TRUE;
1445
1446         page = drbd_pp_alloc(mdev, 1, 1);
1447
1448         data = kmap(page);
1449         while (data_size) {
1450                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1451                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1452                         rv = 0;
1453                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1454                              rr, min_t(int, data_size, PAGE_SIZE));
1455                         break;
1456                 }
1457                 data_size -= rr;
1458         }
1459         kunmap(page);
1460         drbd_pp_free(mdev, page);
1461         return rv;
1462 }
1463
1464 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1465                            sector_t sector, int data_size)
1466 {
1467         struct bio_vec *bvec;
1468         struct bio *bio;
1469         int dgs, rr, i, expect;
1470         void *dig_in = mdev->int_dig_in;
1471         void *dig_vv = mdev->int_dig_vv;
1472
1473         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1474                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1475
1476         if (dgs) {
1477                 rr = drbd_recv(mdev, dig_in, dgs);
1478                 if (rr != dgs) {
1479                         dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1480                              rr, dgs);
1481                         return 0;
1482                 }
1483         }
1484
1485         data_size -= dgs;
1486
1487         /* optimistically update recv_cnt.  if receiving fails below,
1488          * we disconnect anyways, and counters will be reset. */
1489         mdev->recv_cnt += data_size>>9;
1490
1491         bio = req->master_bio;
1492         D_ASSERT(sector == bio->bi_sector);
1493
1494         bio_for_each_segment(bvec, bio, i) {
1495                 expect = min_t(int, data_size, bvec->bv_len);
1496                 rr = drbd_recv(mdev,
1497                              kmap(bvec->bv_page)+bvec->bv_offset,
1498                              expect);
1499                 kunmap(bvec->bv_page);
1500                 if (rr != expect) {
1501                         dev_warn(DEV, "short read receiving data reply: "
1502                              "read %d expected %d\n",
1503                              rr, expect);
1504                         return 0;
1505                 }
1506                 data_size -= rr;
1507         }
1508
1509         if (dgs) {
1510                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1511                 if (memcmp(dig_in, dig_vv, dgs)) {
1512                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1513                         return 0;
1514                 }
1515         }
1516
1517         D_ASSERT(data_size == 0);
1518         return 1;
1519 }
1520
1521 /* e_end_resync_block() is called via
1522  * drbd_process_done_ee() by asender only */
1523 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1524 {
1525         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1526         sector_t sector = e->sector;
1527         int ok;
1528
1529         D_ASSERT(hlist_unhashed(&e->colision));
1530
1531         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1532                 drbd_set_in_sync(mdev, sector, e->size);
1533                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1534         } else {
1535                 /* Record failure to sync */
1536                 drbd_rs_failed_io(mdev, sector, e->size);
1537
1538                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1539         }
1540         dec_unacked(mdev);
1541
1542         return ok;
1543 }
1544
1545 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1546 {
1547         struct drbd_epoch_entry *e;
1548
1549         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1550         if (!e)
1551                 goto fail;
1552
1553         dec_rs_pending(mdev);
1554
1555         inc_unacked(mdev);
1556         /* corresponding dec_unacked() in e_end_resync_block()
1557          * respective _drbd_clear_done_ee */
1558
1559         e->w.cb = e_end_resync_block;
1560
1561         spin_lock_irq(&mdev->req_lock);
1562         list_add(&e->w.list, &mdev->sync_ee);
1563         spin_unlock_irq(&mdev->req_lock);
1564
1565         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1566                 return TRUE;
1567
1568         drbd_free_ee(mdev, e);
1569 fail:
1570         put_ldev(mdev);
1571         return FALSE;
1572 }
1573
1574 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1575 {
1576         struct drbd_request *req;
1577         sector_t sector;
1578         unsigned int header_size, data_size;
1579         int ok;
1580         struct p_data *p = (struct p_data *)h;
1581
1582         header_size = sizeof(*p) - sizeof(*h);
1583         data_size   = h->length  - header_size;
1584
1585         ERR_IF(data_size == 0) return FALSE;
1586
1587         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1588                 return FALSE;
1589
1590         sector = be64_to_cpu(p->sector);
1591
1592         spin_lock_irq(&mdev->req_lock);
1593         req = _ar_id_to_req(mdev, p->block_id, sector);
1594         spin_unlock_irq(&mdev->req_lock);
1595         if (unlikely(!req)) {
1596                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1597                 return FALSE;
1598         }
1599
1600         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1601          * special casing it there for the various failure cases.
1602          * still no race with drbd_fail_pending_reads */
1603         ok = recv_dless_read(mdev, req, sector, data_size);
1604
1605         if (ok)
1606                 req_mod(req, data_received);
1607         /* else: nothing. handled from drbd_disconnect...
1608          * I don't think we may complete this just yet
1609          * in case we are "on-disconnect: freeze" */
1610
1611         return ok;
1612 }
1613
1614 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1615 {
1616         sector_t sector;
1617         unsigned int header_size, data_size;
1618         int ok;
1619         struct p_data *p = (struct p_data *)h;
1620
1621         header_size = sizeof(*p) - sizeof(*h);
1622         data_size   = h->length  - header_size;
1623
1624         ERR_IF(data_size == 0) return FALSE;
1625
1626         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1627                 return FALSE;
1628
1629         sector = be64_to_cpu(p->sector);
1630         D_ASSERT(p->block_id == ID_SYNCER);
1631
1632         if (get_ldev(mdev)) {
1633                 /* data is submitted to disk within recv_resync_read.
1634                  * corresponding put_ldev done below on error,
1635                  * or in drbd_endio_write_sec. */
1636                 ok = recv_resync_read(mdev, sector, data_size);
1637         } else {
1638                 if (__ratelimit(&drbd_ratelimit_state))
1639                         dev_err(DEV, "Can not write resync data to local disk.\n");
1640
1641                 ok = drbd_drain_block(mdev, data_size);
1642
1643                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1644         }
1645
1646         return ok;
1647 }
1648
1649 /* e_end_block() is called via drbd_process_done_ee().
1650  * this means this function only runs in the asender thread
1651  */
1652 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1653 {
1654         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1655         sector_t sector = e->sector;
1656         struct drbd_epoch *epoch;
1657         int ok = 1, pcmd;
1658
1659         if (e->flags & EE_IS_BARRIER) {
1660                 epoch = previous_epoch(mdev, e->epoch);
1661                 if (epoch)
1662                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1663         }
1664
1665         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1666                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1667                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1668                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1669                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1670                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1671                         ok &= drbd_send_ack(mdev, pcmd, e);
1672                         if (pcmd == P_RS_WRITE_ACK)
1673                                 drbd_set_in_sync(mdev, sector, e->size);
1674                 } else {
1675                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1676                         /* we expect it to be marked out of sync anyways...
1677                          * maybe assert this?  */
1678                 }
1679                 dec_unacked(mdev);
1680         }
1681         /* we delete from the conflict detection hash _after_ we sent out the
1682          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1683         if (mdev->net_conf->two_primaries) {
1684                 spin_lock_irq(&mdev->req_lock);
1685                 D_ASSERT(!hlist_unhashed(&e->colision));
1686                 hlist_del_init(&e->colision);
1687                 spin_unlock_irq(&mdev->req_lock);
1688         } else {
1689                 D_ASSERT(hlist_unhashed(&e->colision));
1690         }
1691
1692         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1693
1694         return ok;
1695 }
1696
1697 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1698 {
1699         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1700         int ok = 1;
1701
1702         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1703         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1704
1705         spin_lock_irq(&mdev->req_lock);
1706         D_ASSERT(!hlist_unhashed(&e->colision));
1707         hlist_del_init(&e->colision);
1708         spin_unlock_irq(&mdev->req_lock);
1709
1710         dec_unacked(mdev);
1711
1712         return ok;
1713 }
1714
1715 /* Called from receive_Data.
1716  * Synchronize packets on sock with packets on msock.
1717  *
1718  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1719  * packet traveling on msock, they are still processed in the order they have
1720  * been sent.
1721  *
1722  * Note: we don't care for Ack packets overtaking P_DATA packets.
1723  *
1724  * In case packet_seq is larger than mdev->peer_seq number, there are
1725  * outstanding packets on the msock. We wait for them to arrive.
1726  * In case we are the logically next packet, we update mdev->peer_seq
1727  * ourselves. Correctly handles 32bit wrap around.
1728  *
1729  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1730  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1731  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1732  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1733  *
1734  * returns 0 if we may process the packet,
1735  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1736 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1737 {
1738         DEFINE_WAIT(wait);
1739         unsigned int p_seq;
1740         long timeout;
1741         int ret = 0;
1742         spin_lock(&mdev->peer_seq_lock);
1743         for (;;) {
1744                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1745                 if (seq_le(packet_seq, mdev->peer_seq+1))
1746                         break;
1747                 if (signal_pending(current)) {
1748                         ret = -ERESTARTSYS;
1749                         break;
1750                 }
1751                 p_seq = mdev->peer_seq;
1752                 spin_unlock(&mdev->peer_seq_lock);
1753                 timeout = schedule_timeout(30*HZ);
1754                 spin_lock(&mdev->peer_seq_lock);
1755                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1756                         ret = -ETIMEDOUT;
1757                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1758                         break;
1759                 }
1760         }
1761         finish_wait(&mdev->seq_wait, &wait);
1762         if (mdev->peer_seq+1 == packet_seq)
1763                 mdev->peer_seq++;
1764         spin_unlock(&mdev->peer_seq_lock);
1765         return ret;
1766 }
1767
1768 /* mirrored write */
1769 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1770 {
1771         sector_t sector;
1772         struct drbd_epoch_entry *e;
1773         struct p_data *p = (struct p_data *)h;
1774         int header_size, data_size;
1775         int rw = WRITE;
1776         u32 dp_flags;
1777
1778         header_size = sizeof(*p) - sizeof(*h);
1779         data_size   = h->length  - header_size;
1780
1781         ERR_IF(data_size == 0) return FALSE;
1782
1783         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1784                 return FALSE;
1785
1786         if (!get_ldev(mdev)) {
1787                 if (__ratelimit(&drbd_ratelimit_state))
1788                         dev_err(DEV, "Can not write mirrored data block "
1789                             "to local disk.\n");
1790                 spin_lock(&mdev->peer_seq_lock);
1791                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1792                         mdev->peer_seq++;
1793                 spin_unlock(&mdev->peer_seq_lock);
1794
1795                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1796                 atomic_inc(&mdev->current_epoch->epoch_size);
1797                 return drbd_drain_block(mdev, data_size);
1798         }
1799
1800         /* get_ldev(mdev) successful.
1801          * Corresponding put_ldev done either below (on various errors),
1802          * or in drbd_endio_write_sec, if we successfully submit the data at
1803          * the end of this function. */
1804
1805         sector = be64_to_cpu(p->sector);
1806         e = read_in_block(mdev, p->block_id, sector, data_size);
1807         if (!e) {
1808                 put_ldev(mdev);
1809                 return FALSE;
1810         }
1811
1812         e->w.cb = e_end_block;
1813
1814         spin_lock(&mdev->epoch_lock);
1815         e->epoch = mdev->current_epoch;
1816         atomic_inc(&e->epoch->epoch_size);
1817         atomic_inc(&e->epoch->active);
1818
1819         if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1820                 struct drbd_epoch *epoch;
1821                 /* Issue a barrier if we start a new epoch, and the previous epoch
1822                    was not a epoch containing a single request which already was
1823                    a Barrier. */
1824                 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1825                 if (epoch == e->epoch) {
1826                         set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1827                         rw |= REQ_HARDBARRIER;
1828                         e->flags |= EE_IS_BARRIER;
1829                 } else {
1830                         if (atomic_read(&epoch->epoch_size) > 1 ||
1831                             !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1832                                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1833                                 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1834                                 rw |= REQ_HARDBARRIER;
1835                                 e->flags |= EE_IS_BARRIER;
1836                         }
1837                 }
1838         }
1839         spin_unlock(&mdev->epoch_lock);
1840
1841         dp_flags = be32_to_cpu(p->dp_flags);
1842         if (dp_flags & DP_HARDBARRIER) {
1843                 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1844                 /* rw |= REQ_HARDBARRIER; */
1845         }
1846         if (dp_flags & DP_RW_SYNC)
1847                 rw |= REQ_SYNC | REQ_UNPLUG;
1848         if (dp_flags & DP_MAY_SET_IN_SYNC)
1849                 e->flags |= EE_MAY_SET_IN_SYNC;
1850
1851         /* I'm the receiver, I do hold a net_cnt reference. */
1852         if (!mdev->net_conf->two_primaries) {
1853                 spin_lock_irq(&mdev->req_lock);
1854         } else {
1855                 /* don't get the req_lock yet,
1856                  * we may sleep in drbd_wait_peer_seq */
1857                 const int size = e->size;
1858                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1859                 DEFINE_WAIT(wait);
1860                 struct drbd_request *i;
1861                 struct hlist_node *n;
1862                 struct hlist_head *slot;
1863                 int first;
1864
1865                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1866                 BUG_ON(mdev->ee_hash == NULL);
1867                 BUG_ON(mdev->tl_hash == NULL);
1868
1869                 /* conflict detection and handling:
1870                  * 1. wait on the sequence number,
1871                  *    in case this data packet overtook ACK packets.
1872                  * 2. check our hash tables for conflicting requests.
1873                  *    we only need to walk the tl_hash, since an ee can not
1874                  *    have a conflict with an other ee: on the submitting
1875                  *    node, the corresponding req had already been conflicting,
1876                  *    and a conflicting req is never sent.
1877                  *
1878                  * Note: for two_primaries, we are protocol C,
1879                  * so there cannot be any request that is DONE
1880                  * but still on the transfer log.
1881                  *
1882                  * unconditionally add to the ee_hash.
1883                  *
1884                  * if no conflicting request is found:
1885                  *    submit.
1886                  *
1887                  * if any conflicting request is found
1888                  * that has not yet been acked,
1889                  * AND I have the "discard concurrent writes" flag:
1890                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1891                  *
1892                  * if any conflicting request is found:
1893                  *       block the receiver, waiting on misc_wait
1894                  *       until no more conflicting requests are there,
1895                  *       or we get interrupted (disconnect).
1896                  *
1897                  *       we do not just write after local io completion of those
1898                  *       requests, but only after req is done completely, i.e.
1899                  *       we wait for the P_DISCARD_ACK to arrive!
1900                  *
1901                  *       then proceed normally, i.e. submit.
1902                  */
1903                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1904                         goto out_interrupted;
1905
1906                 spin_lock_irq(&mdev->req_lock);
1907
1908                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1909
1910 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1911                 slot = tl_hash_slot(mdev, sector);
1912                 first = 1;
1913                 for (;;) {
1914                         int have_unacked = 0;
1915                         int have_conflict = 0;
1916                         prepare_to_wait(&mdev->misc_wait, &wait,
1917                                 TASK_INTERRUPTIBLE);
1918                         hlist_for_each_entry(i, n, slot, colision) {
1919                                 if (OVERLAPS) {
1920                                         /* only ALERT on first iteration,
1921                                          * we may be woken up early... */
1922                                         if (first)
1923                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1924                                                       " new: %llus +%u; pending: %llus +%u\n",
1925                                                       current->comm, current->pid,
1926                                                       (unsigned long long)sector, size,
1927                                                       (unsigned long long)i->sector, i->size);
1928                                         if (i->rq_state & RQ_NET_PENDING)
1929                                                 ++have_unacked;
1930                                         ++have_conflict;
1931                                 }
1932                         }
1933 #undef OVERLAPS
1934                         if (!have_conflict)
1935                                 break;
1936
1937                         /* Discard Ack only for the _first_ iteration */
1938                         if (first && discard && have_unacked) {
1939                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1940                                      (unsigned long long)sector);
1941                                 inc_unacked(mdev);
1942                                 e->w.cb = e_send_discard_ack;
1943                                 list_add_tail(&e->w.list, &mdev->done_ee);
1944
1945                                 spin_unlock_irq(&mdev->req_lock);
1946
1947                                 /* we could probably send that P_DISCARD_ACK ourselves,
1948                                  * but I don't like the receiver using the msock */
1949
1950                                 put_ldev(mdev);
1951                                 wake_asender(mdev);
1952                                 finish_wait(&mdev->misc_wait, &wait);
1953                                 return TRUE;
1954                         }
1955
1956                         if (signal_pending(current)) {
1957                                 hlist_del_init(&e->colision);
1958
1959                                 spin_unlock_irq(&mdev->req_lock);
1960
1961                                 finish_wait(&mdev->misc_wait, &wait);
1962                                 goto out_interrupted;
1963                         }
1964
1965                         spin_unlock_irq(&mdev->req_lock);
1966                         if (first) {
1967                                 first = 0;
1968                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1969                                      "sec=%llus\n", (unsigned long long)sector);
1970                         } else if (discard) {
1971                                 /* we had none on the first iteration.
1972                                  * there must be none now. */
1973                                 D_ASSERT(have_unacked == 0);
1974                         }
1975                         schedule();
1976                         spin_lock_irq(&mdev->req_lock);
1977                 }
1978                 finish_wait(&mdev->misc_wait, &wait);
1979         }
1980
1981         list_add(&e->w.list, &mdev->active_ee);
1982         spin_unlock_irq(&mdev->req_lock);
1983
1984         switch (mdev->net_conf->wire_protocol) {
1985         case DRBD_PROT_C:
1986                 inc_unacked(mdev);
1987                 /* corresponding dec_unacked() in e_end_block()
1988                  * respective _drbd_clear_done_ee */
1989                 break;
1990         case DRBD_PROT_B:
1991                 /* I really don't like it that the receiver thread
1992                  * sends on the msock, but anyways */
1993                 drbd_send_ack(mdev, P_RECV_ACK, e);
1994                 break;
1995         case DRBD_PROT_A:
1996                 /* nothing to do */
1997                 break;
1998         }
1999
2000         if (mdev->state.pdsk == D_DISKLESS) {
2001                 /* In case we have the only disk of the cluster, */
2002                 drbd_set_out_of_sync(mdev, e->sector, e->size);
2003                 e->flags |= EE_CALL_AL_COMPLETE_IO;
2004                 drbd_al_begin_io(mdev, e->sector);
2005         }
2006
2007         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2008                 return TRUE;
2009
2010 out_interrupted:
2011         /* yes, the epoch_size now is imbalanced.
2012          * but we drop the connection anyways, so we don't have a chance to
2013          * receive a barrier... atomic_inc(&mdev->epoch_size); */
2014         put_ldev(mdev);
2015         drbd_free_ee(mdev, e);
2016         return FALSE;
2017 }
2018
2019 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2020 {
2021         sector_t sector;
2022         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2023         struct drbd_epoch_entry *e;
2024         struct digest_info *di = NULL;
2025         int size, digest_size;
2026         unsigned int fault_type;
2027         struct p_block_req *p =
2028                 (struct p_block_req *)h;
2029         const int brps = sizeof(*p)-sizeof(*h);
2030
2031         if (drbd_recv(mdev, h->payload, brps) != brps)
2032                 return FALSE;
2033
2034         sector = be64_to_cpu(p->sector);
2035         size   = be32_to_cpu(p->blksize);
2036
2037         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2038                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2039                                 (unsigned long long)sector, size);
2040                 return FALSE;
2041         }
2042         if (sector + (size>>9) > capacity) {
2043                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2044                                 (unsigned long long)sector, size);
2045                 return FALSE;
2046         }
2047
2048         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2049                 if (__ratelimit(&drbd_ratelimit_state))
2050                         dev_err(DEV, "Can not satisfy peer's read request, "
2051                             "no local data.\n");
2052                 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2053                                  P_NEG_RS_DREPLY , p);
2054                 return drbd_drain_block(mdev, h->length - brps);
2055         }
2056
2057         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2058          * "criss-cross" setup, that might cause write-out on some other DRBD,
2059          * which in turn might block on the other node at this very place.  */
2060         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2061         if (!e) {
2062                 put_ldev(mdev);
2063                 return FALSE;
2064         }
2065
2066         switch (h->command) {
2067         case P_DATA_REQUEST:
2068                 e->w.cb = w_e_end_data_req;
2069                 fault_type = DRBD_FAULT_DT_RD;
2070                 break;
2071         case P_RS_DATA_REQUEST:
2072                 e->w.cb = w_e_end_rsdata_req;
2073                 fault_type = DRBD_FAULT_RS_RD;
2074                 /* Eventually this should become asynchronously. Currently it
2075                  * blocks the whole receiver just to delay the reading of a
2076                  * resync data block.
2077                  * the drbd_work_queue mechanism is made for this...
2078                  */
2079                 if (!drbd_rs_begin_io(mdev, sector)) {
2080                         /* we have been interrupted,
2081                          * probably connection lost! */
2082                         D_ASSERT(signal_pending(current));
2083                         goto out_free_e;
2084                 }
2085                 break;
2086
2087         case P_OV_REPLY:
2088         case P_CSUM_RS_REQUEST:
2089                 fault_type = DRBD_FAULT_RS_RD;
2090                 digest_size = h->length - brps ;
2091                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2092                 if (!di)
2093                         goto out_free_e;
2094
2095                 di->digest_size = digest_size;
2096                 di->digest = (((char *)di)+sizeof(struct digest_info));
2097
2098                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2099                         goto out_free_e;
2100
2101                 e->block_id = (u64)(unsigned long)di;
2102                 if (h->command == P_CSUM_RS_REQUEST) {
2103                         D_ASSERT(mdev->agreed_pro_version >= 89);
2104                         e->w.cb = w_e_end_csum_rs_req;
2105                 } else if (h->command == P_OV_REPLY) {
2106                         e->w.cb = w_e_end_ov_reply;
2107                         dec_rs_pending(mdev);
2108                         break;
2109                 }
2110
2111                 if (!drbd_rs_begin_io(mdev, sector)) {
2112                         /* we have been interrupted, probably connection lost! */
2113                         D_ASSERT(signal_pending(current));
2114                         goto out_free_e;
2115                 }
2116                 break;
2117
2118         case P_OV_REQUEST:
2119                 if (mdev->state.conn >= C_CONNECTED &&
2120                     mdev->state.conn != C_VERIFY_T)
2121                         dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2122                                 drbd_conn_str(mdev->state.conn));
2123                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2124                     mdev->agreed_pro_version >= 90) {
2125                         mdev->ov_start_sector = sector;
2126                         mdev->ov_position = sector;
2127                         mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2128                         dev_info(DEV, "Online Verify start sector: %llu\n",
2129                                         (unsigned long long)sector);
2130                 }
2131                 e->w.cb = w_e_end_ov_req;
2132                 fault_type = DRBD_FAULT_RS_RD;
2133                 /* Eventually this should become asynchronous. Currently it
2134                  * blocks the whole receiver just to delay the reading of a
2135                  * resync data block.
2136                  * the drbd_work_queue mechanism is made for this...
2137                  */
2138                 if (!drbd_rs_begin_io(mdev, sector)) {
2139                         /* we have been interrupted,
2140                          * probably connection lost! */
2141                         D_ASSERT(signal_pending(current));
2142                         goto out_free_e;
2143                 }
2144                 break;
2145
2146
2147         default:
2148                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2149                     cmdname(h->command));
2150                 fault_type = DRBD_FAULT_MAX;
2151         }
2152
2153         spin_lock_irq(&mdev->req_lock);
2154         list_add(&e->w.list, &mdev->read_ee);
2155         spin_unlock_irq(&mdev->req_lock);
2156
2157         inc_unacked(mdev);
2158
2159         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2160                 return TRUE;
2161
2162 out_free_e:
2163         kfree(di);
2164         put_ldev(mdev);
2165         drbd_free_ee(mdev, e);
2166         return FALSE;
2167 }
2168
2169 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2170 {
2171         int self, peer, rv = -100;
2172         unsigned long ch_self, ch_peer;
2173
2174         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2175         peer = mdev->p_uuid[UI_BITMAP] & 1;
2176
2177         ch_peer = mdev->p_uuid[UI_SIZE];
2178         ch_self = mdev->comm_bm_set;
2179
2180         switch (mdev->net_conf->after_sb_0p) {
2181         case ASB_CONSENSUS:
2182         case ASB_DISCARD_SECONDARY:
2183         case ASB_CALL_HELPER:
2184                 dev_err(DEV, "Configuration error.\n");
2185                 break;
2186         case ASB_DISCONNECT:
2187                 break;
2188         case ASB_DISCARD_YOUNGER_PRI:
2189                 if (self == 0 && peer == 1) {
2190                         rv = -1;
2191                         break;
2192                 }
2193                 if (self == 1 && peer == 0) {
2194                         rv =  1;
2195                         break;
2196                 }
2197                 /* Else fall through to one of the other strategies... */
2198         case ASB_DISCARD_OLDER_PRI:
2199                 if (self == 0 && peer == 1) {
2200                         rv = 1;
2201                         break;
2202                 }
2203                 if (self == 1 && peer == 0) {
2204                         rv = -1;
2205                         break;
2206                 }
2207                 /* Else fall through to one of the other strategies... */
2208                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2209                      "Using discard-least-changes instead\n");
2210         case ASB_DISCARD_ZERO_CHG:
2211                 if (ch_peer == 0 && ch_self == 0) {
2212                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2213                                 ? -1 : 1;
2214                         break;
2215                 } else {
2216                         if (ch_peer == 0) { rv =  1; break; }
2217                         if (ch_self == 0) { rv = -1; break; }
2218                 }
2219                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2220                         break;
2221         case ASB_DISCARD_LEAST_CHG:
2222                 if      (ch_self < ch_peer)
2223                         rv = -1;
2224                 else if (ch_self > ch_peer)
2225                         rv =  1;
2226                 else /* ( ch_self == ch_peer ) */
2227                      /* Well, then use something else. */
2228                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2229                                 ? -1 : 1;
2230                 break;
2231         case ASB_DISCARD_LOCAL:
2232                 rv = -1;
2233                 break;
2234         case ASB_DISCARD_REMOTE:
2235                 rv =  1;
2236         }
2237
2238         return rv;
2239 }
2240
2241 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2242 {
2243         int self, peer, hg, rv = -100;
2244
2245         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2246         peer = mdev->p_uuid[UI_BITMAP] & 1;
2247
2248         switch (mdev->net_conf->after_sb_1p) {
2249         case ASB_DISCARD_YOUNGER_PRI:
2250         case ASB_DISCARD_OLDER_PRI:
2251         case ASB_DISCARD_LEAST_CHG:
2252         case ASB_DISCARD_LOCAL:
2253         case ASB_DISCARD_REMOTE:
2254                 dev_err(DEV, "Configuration error.\n");
2255                 break;
2256         case ASB_DISCONNECT:
2257                 break;
2258         case ASB_CONSENSUS:
2259                 hg = drbd_asb_recover_0p(mdev);
2260                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2261                         rv = hg;
2262                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2263                         rv = hg;
2264                 break;
2265         case ASB_VIOLENTLY:
2266                 rv = drbd_asb_recover_0p(mdev);
2267                 break;
2268         case ASB_DISCARD_SECONDARY:
2269                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2270         case ASB_CALL_HELPER:
2271                 hg = drbd_asb_recover_0p(mdev);
2272                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2273                         self = drbd_set_role(mdev, R_SECONDARY, 0);
2274                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2275                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2276                           * we do not need to wait for the after state change work either. */
2277                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2278                         if (self != SS_SUCCESS) {
2279                                 drbd_khelper(mdev, "pri-lost-after-sb");
2280                         } else {
2281                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2282                                 rv = hg;
2283                         }
2284                 } else
2285                         rv = hg;
2286         }
2287
2288         return rv;
2289 }
2290
2291 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2292 {
2293         int self, peer, hg, rv = -100;
2294
2295         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2296         peer = mdev->p_uuid[UI_BITMAP] & 1;
2297
2298         switch (mdev->net_conf->after_sb_2p) {
2299         case ASB_DISCARD_YOUNGER_PRI:
2300         case ASB_DISCARD_OLDER_PRI:
2301         case ASB_DISCARD_LEAST_CHG:
2302         case ASB_DISCARD_LOCAL:
2303         case ASB_DISCARD_REMOTE:
2304         case ASB_CONSENSUS:
2305         case ASB_DISCARD_SECONDARY:
2306                 dev_err(DEV, "Configuration error.\n");
2307                 break;
2308         case ASB_VIOLENTLY:
2309                 rv = drbd_asb_recover_0p(mdev);
2310                 break;
2311         case ASB_DISCONNECT:
2312                 break;
2313         case ASB_CALL_HELPER:
2314                 hg = drbd_asb_recover_0p(mdev);
2315                 if (hg == -1) {
2316                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2318                           * we do not need to wait for the after state change work either. */
2319                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320                         if (self != SS_SUCCESS) {
2321                                 drbd_khelper(mdev, "pri-lost-after-sb");
2322                         } else {
2323                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2324                                 rv = hg;
2325                         }
2326                 } else
2327                         rv = hg;
2328         }
2329
2330         return rv;
2331 }
2332
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334                            u64 bits, u64 flags)
2335 {
2336         if (!uuid) {
2337                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338                 return;
2339         }
2340         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341              text,
2342              (unsigned long long)uuid[UI_CURRENT],
2343              (unsigned long long)uuid[UI_BITMAP],
2344              (unsigned long long)uuid[UI_HISTORY_START],
2345              (unsigned long long)uuid[UI_HISTORY_END],
2346              (unsigned long long)bits,
2347              (unsigned long long)flags);
2348 }
2349
2350 /*
2351   100   after split brain try auto recover
2352     2   C_SYNC_SOURCE set BitMap
2353     1   C_SYNC_SOURCE use BitMap
2354     0   no Sync
2355    -1   C_SYNC_TARGET use BitMap
2356    -2   C_SYNC_TARGET set BitMap
2357  -100   after split brain, disconnect
2358 -1000   unrelated data
2359  */
2360 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2361 {
2362         u64 self, peer;
2363         int i, j;
2364
2365         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2366         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2367
2368         *rule_nr = 10;
2369         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2370                 return 0;
2371
2372         *rule_nr = 20;
2373         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2374              peer != UUID_JUST_CREATED)
2375                 return -2;
2376
2377         *rule_nr = 30;
2378         if (self != UUID_JUST_CREATED &&
2379             (peer == UUID_JUST_CREATED || peer == (u64)0))
2380                 return 2;
2381
2382         if (self == peer) {
2383                 int rct, dc; /* roles at crash time */
2384
2385                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2386
2387                         if (mdev->agreed_pro_version < 91)
2388                                 return -1001;
2389
2390                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2391                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2392                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2393                                 drbd_uuid_set_bm(mdev, 0UL);
2394
2395                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2396                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2397                                 *rule_nr = 34;
2398                         } else {
2399                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2400                                 *rule_nr = 36;
2401                         }
2402
2403                         return 1;
2404                 }
2405
2406                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2407
2408                         if (mdev->agreed_pro_version < 91)
2409                                 return -1001;
2410
2411                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2412                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2413                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2414
2415                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2416                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2417                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2418
2419                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2420                                 *rule_nr = 35;
2421                         } else {
2422                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2423                                 *rule_nr = 37;
2424                         }
2425
2426                         return -1;
2427                 }
2428
2429                 /* Common power [off|failure] */
2430                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2431                         (mdev->p_uuid[UI_FLAGS] & 2);
2432                 /* lowest bit is set when we were primary,
2433                  * next bit (weight 2) is set when peer was primary */
2434                 *rule_nr = 40;
2435
2436                 switch (rct) {
2437                 case 0: /* !self_pri && !peer_pri */ return 0;
2438                 case 1: /*  self_pri && !peer_pri */ return 1;
2439                 case 2: /* !self_pri &&  peer_pri */ return -1;
2440                 case 3: /*  self_pri &&  peer_pri */
2441                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2442                         return dc ? -1 : 1;
2443                 }
2444         }
2445
2446         *rule_nr = 50;
2447         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2448         if (self == peer)
2449                 return -1;
2450
2451         *rule_nr = 51;
2452         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2453         if (self == peer) {
2454                 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2455                 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2456                 if (self == peer) {
2457                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2458                            resync as sync source modifications of the peer's UUIDs. */
2459
2460                         if (mdev->agreed_pro_version < 91)
2461                                 return -1001;
2462
2463                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2464                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2465                         return -1;
2466                 }
2467         }
2468
2469         *rule_nr = 60;
2470         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2471         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472                 peer = mdev->p_uuid[i] & ~((u64)1);
2473                 if (self == peer)
2474                         return -2;
2475         }
2476
2477         *rule_nr = 70;
2478         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2480         if (self == peer)
2481                 return 1;
2482
2483         *rule_nr = 71;
2484         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2485         if (self == peer) {
2486                 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2487                 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2488                 if (self == peer) {
2489                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2490                            resync as sync source modifications of our UUIDs. */
2491
2492                         if (mdev->agreed_pro_version < 91)
2493                                 return -1001;
2494
2495                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2496                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2497
2498                         dev_info(DEV, "Undid last start of resync:\n");
2499
2500                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2501                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2502
2503                         return 1;
2504                 }
2505         }
2506
2507
2508         *rule_nr = 80;
2509         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2510         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2511                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2512                 if (self == peer)
2513                         return 2;
2514         }
2515
2516         *rule_nr = 90;
2517         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2518         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2519         if (self == peer && self != ((u64)0))
2520                 return 100;
2521
2522         *rule_nr = 100;
2523         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2524                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2525                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2526                         peer = mdev->p_uuid[j] & ~((u64)1);
2527                         if (self == peer)
2528                                 return -100;
2529                 }
2530         }
2531
2532         return -1000;
2533 }
2534
2535 /* drbd_sync_handshake() returns the new conn state on success, or
2536    CONN_MASK (-1) on failure.
2537  */
2538 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2539                                            enum drbd_disk_state peer_disk) __must_hold(local)
2540 {
2541         int hg, rule_nr;
2542         enum drbd_conns rv = C_MASK;
2543         enum drbd_disk_state mydisk;
2544
2545         mydisk = mdev->state.disk;
2546         if (mydisk == D_NEGOTIATING)
2547                 mydisk = mdev->new_state_tmp.disk;
2548
2549         dev_info(DEV, "drbd_sync_handshake:\n");
2550         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2551         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2552                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2553
2554         hg = drbd_uuid_compare(mdev, &rule_nr);
2555
2556         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2557
2558         if (hg == -1000) {
2559                 dev_alert(DEV, "Unrelated data, aborting!\n");
2560                 return C_MASK;
2561         }
2562         if (hg == -1001) {
2563                 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2564                 return C_MASK;
2565         }
2566
2567         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2568             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2569                 int f = (hg == -100) || abs(hg) == 2;
2570                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2571                 if (f)
2572                         hg = hg*2;
2573                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2574                      hg > 0 ? "source" : "target");
2575         }
2576
2577         if (abs(hg) == 100)
2578                 drbd_khelper(mdev, "initial-split-brain");
2579
2580         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2581                 int pcount = (mdev->state.role == R_PRIMARY)
2582                            + (peer_role == R_PRIMARY);
2583                 int forced = (hg == -100);
2584
2585                 switch (pcount) {
2586                 case 0:
2587                         hg = drbd_asb_recover_0p(mdev);
2588                         break;
2589                 case 1:
2590                         hg = drbd_asb_recover_1p(mdev);
2591                         break;
2592                 case 2:
2593                         hg = drbd_asb_recover_2p(mdev);
2594                         break;
2595                 }
2596                 if (abs(hg) < 100) {
2597                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2598                              "automatically solved. Sync from %s node\n",
2599                              pcount, (hg < 0) ? "peer" : "this");
2600                         if (forced) {
2601                                 dev_warn(DEV, "Doing a full sync, since"
2602                                      " UUIDs where ambiguous.\n");
2603                                 hg = hg*2;
2604                         }
2605                 }
2606         }
2607
2608         if (hg == -100) {
2609                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2610                         hg = -1;
2611                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2612                         hg = 1;
2613
2614                 if (abs(hg) < 100)
2615                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2616                              "Sync from %s node\n",
2617                              (hg < 0) ? "peer" : "this");
2618         }
2619
2620         if (hg == -100) {
2621                 /* FIXME this log message is not correct if we end up here
2622                  * after an attempted attach on a diskless node.
2623                  * We just refuse to attach -- well, we drop the "connection"
2624                  * to that disk, in a way... */
2625                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2626                 drbd_khelper(mdev, "split-brain");
2627                 return C_MASK;
2628         }
2629
2630         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2631                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2632                 return C_MASK;
2633         }
2634
2635         if (hg < 0 && /* by intention we do not use mydisk here. */
2636             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2637                 switch (mdev->net_conf->rr_conflict) {
2638                 case ASB_CALL_HELPER:
2639                         drbd_khelper(mdev, "pri-lost");
2640                         /* fall through */
2641                 case ASB_DISCONNECT:
2642                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2643                         return C_MASK;
2644                 case ASB_VIOLENTLY:
2645                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2646                              "assumption\n");
2647                 }
2648         }
2649
2650         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2651                 if (hg == 0)
2652                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2653                 else
2654                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2655                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2656                                  abs(hg) >= 2 ? "full" : "bit-map based");
2657                 return C_MASK;
2658         }
2659
2660         if (abs(hg) >= 2) {
2661                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2662                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2663                         return C_MASK;
2664         }
2665
2666         if (hg > 0) { /* become sync source. */
2667                 rv = C_WF_BITMAP_S;
2668         } else if (hg < 0) { /* become sync target */
2669                 rv = C_WF_BITMAP_T;
2670         } else {
2671                 rv = C_CONNECTED;
2672                 if (drbd_bm_total_weight(mdev)) {
2673                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2674                              drbd_bm_total_weight(mdev));
2675                 }
2676         }
2677
2678         return rv;
2679 }
2680
2681 /* returns 1 if invalid */
2682 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2683 {
2684         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2685         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2686             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2687                 return 0;
2688
2689         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2690         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2691             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2692                 return 1;
2693
2694         /* everything else is valid if they are equal on both sides. */
2695         if (peer == self)
2696                 return 0;
2697
2698         /* everything es is invalid. */
2699         return 1;
2700 }
2701
2702 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2703 {
2704         struct p_protocol *p = (struct p_protocol *)h;
2705         int header_size, data_size;
2706         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2707         int p_want_lose, p_two_primaries, cf;
2708         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2709
2710         header_size = sizeof(*p) - sizeof(*h);
2711         data_size   = h->length  - header_size;
2712
2713         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2714                 return FALSE;
2715
2716         p_proto         = be32_to_cpu(p->protocol);
2717         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2718         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2719         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2720         p_two_primaries = be32_to_cpu(p->two_primaries);
2721         cf              = be32_to_cpu(p->conn_flags);
2722         p_want_lose = cf & CF_WANT_LOSE;
2723
2724         clear_bit(CONN_DRY_RUN, &mdev->flags);
2725
2726         if (cf & CF_DRY_RUN)
2727                 set_bit(CONN_DRY_RUN, &mdev->flags);
2728
2729         if (p_proto != mdev->net_conf->wire_protocol) {
2730                 dev_err(DEV, "incompatible communication protocols\n");
2731                 goto disconnect;
2732         }
2733
2734         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2735                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2736                 goto disconnect;
2737         }
2738
2739         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2740                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2741                 goto disconnect;
2742         }
2743
2744         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2745                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2746                 goto disconnect;
2747         }
2748
2749         if (p_want_lose && mdev->net_conf->want_lose) {
2750                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2751                 goto disconnect;
2752         }
2753
2754         if (p_two_primaries != mdev->net_conf->two_primaries) {
2755                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2756                 goto disconnect;
2757         }
2758
2759         if (mdev->agreed_pro_version >= 87) {
2760                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2761
2762                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2763                         return FALSE;
2764
2765                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2766                 if (strcmp(p_integrity_alg, my_alg)) {
2767                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2768                         goto disconnect;
2769                 }
2770                 dev_info(DEV, "data-integrity-alg: %s\n",
2771                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2772         }
2773
2774         return TRUE;
2775
2776 disconnect:
2777         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2778         return FALSE;
2779 }
2780
2781 /* helper function
2782  * input: alg name, feature name
2783  * return: NULL (alg name was "")
2784  *         ERR_PTR(error) if something goes wrong
2785  *         or the crypto hash ptr, if it worked out ok. */
2786 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2787                 const char *alg, const char *name)
2788 {
2789         struct crypto_hash *tfm;
2790
2791         if (!alg[0])
2792                 return NULL;
2793
2794         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2795         if (IS_ERR(tfm)) {
2796                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2797                         alg, name, PTR_ERR(tfm));
2798                 return tfm;
2799         }
2800         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2801                 crypto_free_hash(tfm);
2802                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2803                 return ERR_PTR(-EINVAL);
2804         }
2805         return tfm;
2806 }
2807
2808 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2809 {
2810         int ok = TRUE;
2811         struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2812         unsigned int header_size, data_size, exp_max_sz;
2813         struct crypto_hash *verify_tfm = NULL;
2814         struct crypto_hash *csums_tfm = NULL;
2815         const int apv = mdev->agreed_pro_version;
2816
2817         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2818                     : apv == 88 ? sizeof(struct p_rs_param)
2819                                         + SHARED_SECRET_MAX
2820                     : /* 89 */    sizeof(struct p_rs_param_89);
2821
2822         if (h->length > exp_max_sz) {
2823                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2824                     h->length, exp_max_sz);
2825                 return FALSE;
2826         }
2827
2828         if (apv <= 88) {
2829                 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2830                 data_size   = h->length  - header_size;
2831         } else /* apv >= 89 */ {
2832                 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2833                 data_size   = h->length  - header_size;
2834                 D_ASSERT(data_size == 0);
2835         }
2836
2837         /* initialize verify_alg and csums_alg */
2838         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2839
2840         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2841                 return FALSE;
2842
2843         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2844
2845         if (apv >= 88) {
2846                 if (apv == 88) {
2847                         if (data_size > SHARED_SECRET_MAX) {
2848                                 dev_err(DEV, "verify-alg too long, "
2849                                     "peer wants %u, accepting only %u byte\n",
2850                                                 data_size, SHARED_SECRET_MAX);
2851                                 return FALSE;
2852                         }
2853
2854                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2855                                 return FALSE;
2856
2857                         /* we expect NUL terminated string */
2858                         /* but just in case someone tries to be evil */
2859                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2860                         p->verify_alg[data_size-1] = 0;
2861
2862                 } else /* apv >= 89 */ {
2863                         /* we still expect NUL terminated strings */
2864                         /* but just in case someone tries to be evil */
2865                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2866                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2867                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2868                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2869                 }
2870
2871                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2872                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2873                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2874                                     mdev->sync_conf.verify_alg, p->verify_alg);
2875                                 goto disconnect;
2876                         }
2877                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2878                                         p->verify_alg, "verify-alg");
2879                         if (IS_ERR(verify_tfm)) {
2880                                 verify_tfm = NULL;
2881                                 goto disconnect;
2882                         }
2883                 }
2884
2885                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2886                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2887                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2888                                     mdev->sync_conf.csums_alg, p->csums_alg);
2889                                 goto disconnect;
2890                         }
2891                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2892                                         p->csums_alg, "csums-alg");
2893                         if (IS_ERR(csums_tfm)) {
2894                                 csums_tfm = NULL;
2895                                 goto disconnect;
2896                         }
2897                 }
2898
2899
2900                 spin_lock(&mdev->peer_seq_lock);
2901                 /* lock against drbd_nl_syncer_conf() */
2902                 if (verify_tfm) {
2903                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2904                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2905                         crypto_free_hash(mdev->verify_tfm);
2906                         mdev->verify_tfm = verify_tfm;
2907                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2908                 }
2909                 if (csums_tfm) {
2910                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2911                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2912                         crypto_free_hash(mdev->csums_tfm);
2913                         mdev->csums_tfm = csums_tfm;
2914                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2915                 }
2916                 spin_unlock(&mdev->peer_seq_lock);
2917         }
2918
2919         return ok;
2920 disconnect:
2921         /* just for completeness: actually not needed,
2922          * as this is not reached if csums_tfm was ok. */
2923         crypto_free_hash(csums_tfm);
2924         /* but free the verify_tfm again, if csums_tfm did not work out */
2925         crypto_free_hash(verify_tfm);
2926         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2927         return FALSE;
2928 }
2929
2930 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2931 {
2932         /* sorry, we currently have no working implementation
2933          * of distributed TCQ */
2934 }
2935
2936 /* warn if the arguments differ by more than 12.5% */
2937 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2938         const char *s, sector_t a, sector_t b)
2939 {
2940         sector_t d;
2941         if (a == 0 || b == 0)
2942                 return;
2943         d = (a > b) ? (a - b) : (b - a);
2944         if (d > (a>>3) || d > (b>>3))
2945                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2946                      (unsigned long long)a, (unsigned long long)b);
2947 }
2948
2949 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2950 {
2951         struct p_sizes *p = (struct p_sizes *)h;
2952         enum determine_dev_size dd = unchanged;
2953         unsigned int max_seg_s;
2954         sector_t p_size, p_usize, my_usize;
2955         int ldsc = 0; /* local disk size changed */
2956         enum dds_flags ddsf;
2957
2958         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2959         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2960                 return FALSE;
2961
2962         p_size = be64_to_cpu(p->d_size);
2963         p_usize = be64_to_cpu(p->u_size);
2964
2965         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2966                 dev_err(DEV, "some backing storage is needed\n");
2967                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968                 return FALSE;
2969         }
2970
2971         /* just store the peer's disk size for now.
2972          * we still need to figure out whether we accept that. */
2973         mdev->p_size = p_size;
2974
2975 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2976         if (get_ldev(mdev)) {
2977                 warn_if_differ_considerably(mdev, "lower level device sizes",
2978                            p_size, drbd_get_max_capacity(mdev->ldev));
2979                 warn_if_differ_considerably(mdev, "user requested size",
2980                                             p_usize, mdev->ldev->dc.disk_size);
2981
2982                 /* if this is the first connect, or an otherwise expected
2983                  * param exchange, choose the minimum */
2984                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2985                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2986                                              p_usize);
2987
2988                 my_usize = mdev->ldev->dc.disk_size;
2989
2990                 if (mdev->ldev->dc.disk_size != p_usize) {
2991                         mdev->ldev->dc.disk_size = p_usize;
2992                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2993                              (unsigned long)mdev->ldev->dc.disk_size);
2994                 }
2995
2996                 /* Never shrink a device with usable data during connect.
2997                    But allow online shrinking if we are connected. */
2998                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2999                    drbd_get_capacity(mdev->this_bdev) &&
3000                    mdev->state.disk >= D_OUTDATED &&
3001                    mdev->state.conn < C_CONNECTED) {
3002                         dev_err(DEV, "The peer's disk size is too small!\n");
3003                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3004                         mdev->ldev->dc.disk_size = my_usize;
3005                         put_ldev(mdev);
3006                         return FALSE;
3007                 }
3008                 put_ldev(mdev);
3009         }
3010 #undef min_not_zero
3011
3012         ddsf = be16_to_cpu(p->dds_flags);
3013         if (get_ldev(mdev)) {
3014                 dd = drbd_determin_dev_size(mdev, ddsf);
3015                 put_ldev(mdev);
3016                 if (dd == dev_size_error)
3017                         return FALSE;
3018                 drbd_md_sync(mdev);
3019         } else {
3020                 /* I am diskless, need to accept the peer's size. */
3021                 drbd_set_my_capacity(mdev, p_size);
3022         }
3023
3024         if (get_ldev(mdev)) {
3025                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3026                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3027                         ldsc = 1;
3028                 }
3029
3030                 if (mdev->agreed_pro_version < 94)
3031                         max_seg_s = be32_to_cpu(p->max_segment_size);
3032                 else /* drbd 8.3.8 onwards */
3033                         max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3034
3035                 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3036                         drbd_setup_queue_param(mdev, max_seg_s);
3037
3038                 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3039                 put_ldev(mdev);
3040         }
3041
3042         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043                 if (be64_to_cpu(p->c_size) !=
3044                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045                         /* we have different sizes, probably peer
3046                          * needs to know my new size... */
3047                         drbd_send_sizes(mdev, 0, ddsf);
3048                 }
3049                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3052                             mdev->state.disk >= D_INCONSISTENT) {
3053                                 if (ddsf & DDSF_NO_RESYNC)
3054                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055                                 else
3056                                         resync_after_online_grow(mdev);
3057                         } else
3058                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059                 }
3060         }
3061
3062         return TRUE;
3063 }
3064
3065 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3066 {
3067         struct p_uuids *p = (struct p_uuids *)h;
3068         u64 *p_uuid;
3069         int i;
3070
3071         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3072         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073                 return FALSE;
3074
3075         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3076
3077         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3078                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3079
3080         kfree(mdev->p_uuid);
3081         mdev->p_uuid = p_uuid;
3082
3083         if (mdev->state.conn < C_CONNECTED &&
3084             mdev->state.disk < D_INCONSISTENT &&
3085             mdev->state.role == R_PRIMARY &&
3086             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3087                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3088                     (unsigned long long)mdev->ed_uuid);
3089                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3090                 return FALSE;
3091         }
3092
3093         if (get_ldev(mdev)) {
3094                 int skip_initial_sync =
3095                         mdev->state.conn == C_CONNECTED &&
3096                         mdev->agreed_pro_version >= 90 &&
3097                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3098                         (p_uuid[UI_FLAGS] & 8);
3099                 if (skip_initial_sync) {
3100                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3101                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3102                                         "clear_n_write from receive_uuids");
3103                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3104                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3105                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3106                                         CS_VERBOSE, NULL);
3107                         drbd_md_sync(mdev);
3108                 }
3109                 put_ldev(mdev);
3110         }
3111
3112         /* Before we test for the disk state, we should wait until an eventually
3113            ongoing cluster wide state change is finished. That is important if
3114            we are primary and are detaching from our disk. We need to see the
3115            new disk state... */
3116         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3117         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3118                 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3119
3120         return TRUE;
3121 }
3122
3123 /**
3124  * convert_state() - Converts the peer's view of the cluster state to our point of view
3125  * @ps:         The state as seen by the peer.
3126  */
3127 static union drbd_state convert_state(union drbd_state ps)
3128 {
3129         union drbd_state ms;
3130
3131         static enum drbd_conns c_tab[] = {
3132                 [C_CONNECTED] = C_CONNECTED,
3133
3134                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3135                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3136                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3137                 [C_VERIFY_S]       = C_VERIFY_T,
3138                 [C_MASK]   = C_MASK,
3139         };
3140
3141         ms.i = ps.i;
3142
3143         ms.conn = c_tab[ps.conn];
3144         ms.peer = ps.role;
3145         ms.role = ps.peer;
3146         ms.pdsk = ps.disk;
3147         ms.disk = ps.pdsk;
3148         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3149
3150         return ms;
3151 }
3152
3153 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3154 {
3155         struct p_req_state *p = (struct p_req_state *)h;
3156         union drbd_state mask, val;
3157         int rv;
3158
3159         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3160         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3161                 return FALSE;
3162
3163         mask.i = be32_to_cpu(p->mask);
3164         val.i = be32_to_cpu(p->val);
3165
3166         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3167             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3168                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3169                 return TRUE;
3170         }
3171
3172         mask = convert_state(mask);
3173         val = convert_state(val);
3174
3175         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3176
3177         drbd_send_sr_reply(mdev, rv);
3178         drbd_md_sync(mdev);
3179
3180         return TRUE;
3181 }
3182
3183 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3184 {
3185         struct p_state *p = (struct p_state *)h;
3186         enum drbd_conns nconn, oconn;
3187         union drbd_state ns, peer_state;
3188         enum drbd_disk_state real_peer_disk;
3189         int rv;
3190
3191         ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3192                 return FALSE;
3193
3194         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3195                 return FALSE;
3196
3197         peer_state.i = be32_to_cpu(p->state);
3198
3199         real_peer_disk = peer_state.disk;
3200         if (peer_state.disk == D_NEGOTIATING) {
3201                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3202                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3203         }
3204
3205         spin_lock_irq(&mdev->req_lock);
3206  retry:
3207         oconn = nconn = mdev->state.conn;
3208         spin_unlock_irq(&mdev->req_lock);
3209
3210         if (nconn == C_WF_REPORT_PARAMS)
3211                 nconn = C_CONNECTED;
3212
3213         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3214             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3215                 int cr; /* consider resync */
3216
3217                 /* if we established a new connection */
3218                 cr  = (oconn < C_CONNECTED);
3219                 /* if we had an established connection
3220                  * and one of the nodes newly attaches a disk */
3221                 cr |= (oconn == C_CONNECTED &&
3222                        (peer_state.disk == D_NEGOTIATING ||
3223                         mdev->state.disk == D_NEGOTIATING));
3224                 /* if we have both been inconsistent, and the peer has been
3225                  * forced to be UpToDate with --overwrite-data */
3226                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3227                 /* if we had been plain connected, and the admin requested to
3228                  * start a sync by "invalidate" or "invalidate-remote" */
3229                 cr |= (oconn == C_CONNECTED &&
3230                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3231                                  peer_state.conn <= C_WF_BITMAP_T));
3232
3233                 if (cr)
3234                         nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3235
3236                 put_ldev(mdev);
3237                 if (nconn == C_MASK) {
3238                         nconn = C_CONNECTED;
3239                         if (mdev->state.disk == D_NEGOTIATING) {
3240                                 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3241                         } else if (peer_state.disk == D_NEGOTIATING) {
3242                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3243                                 peer_state.disk = D_DISKLESS;
3244                                 real_peer_disk = D_DISKLESS;
3245                         } else {
3246                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3247                                         return FALSE;
3248                                 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3249                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3250                                 return FALSE;
3251                         }
3252                 }
3253         }
3254
3255         spin_lock_irq(&mdev->req_lock);
3256         if (mdev->state.conn != oconn)
3257                 goto retry;
3258         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3259         ns.i = mdev->state.i;
3260         ns.conn = nconn;
3261         ns.peer = peer_state.role;
3262         ns.pdsk = real_peer_disk;
3263         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3264         if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3265                 ns.disk = mdev->new_state_tmp.disk;
3266
3267         rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3268         ns = mdev->state;
3269         spin_unlock_irq(&mdev->req_lock);
3270
3271         if (rv < SS_SUCCESS) {
3272                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3273                 return FALSE;
3274         }
3275
3276         if (oconn > C_WF_REPORT_PARAMS) {
3277                 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3278                     peer_state.disk != D_NEGOTIATING ) {
3279                         /* we want resync, peer has not yet decided to sync... */
3280                         /* Nowadays only used when forcing a node into primary role and
3281                            setting its disk to UpToDate with that */
3282                         drbd_send_uuids(mdev);
3283                         drbd_send_state(mdev);
3284                 }
3285         }
3286
3287         mdev->net_conf->want_lose = 0;
3288
3289         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3290
3291         return TRUE;
3292 }
3293
3294 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3295 {
3296         struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3297
3298         wait_event(mdev->misc_wait,
3299                    mdev->state.conn == C_WF_SYNC_UUID ||
3300                    mdev->state.conn < C_CONNECTED ||
3301                    mdev->state.disk < D_NEGOTIATING);
3302
3303         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3304
3305         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3306         if (drbd_recv(mdev, h->payload, h->length) != h->length)
3307                 return FALSE;
3308
3309         /* Here the _drbd_uuid_ functions are right, current should
3310            _not_ be rotated into the history */
3311         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3312                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3313                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3314
3315                 drbd_start_resync(mdev, C_SYNC_TARGET);
3316
3317                 put_ldev(mdev);
3318         } else
3319                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3320
3321         return TRUE;
3322 }
3323
3324 enum receive_bitmap_ret { OK, DONE, FAILED };
3325
3326 static enum receive_bitmap_ret
3327 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3328         unsigned long *buffer, struct bm_xfer_ctx *c)
3329 {
3330         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3331         unsigned want = num_words * sizeof(long);
3332
3333         if (want != h->length) {
3334                 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3335                 return FAILED;
3336         }
3337         if (want == 0)
3338                 return DONE;
3339         if (drbd_recv(mdev, buffer, want) != want)
3340                 return FAILED;
3341
3342         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3343
3344         c->word_offset += num_words;
3345         c->bit_offset = c->word_offset * BITS_PER_LONG;
3346         if (c->bit_offset > c->bm_bits)
3347                 c->bit_offset = c->bm_bits;
3348
3349         return OK;
3350 }
3351
3352 static enum receive_bitmap_ret
3353 recv_bm_rle_bits(struct drbd_conf *mdev,
3354                 struct p_compressed_bm *p,
3355                 struct bm_xfer_ctx *c)
3356 {
3357         struct bitstream bs;
3358         u64 look_ahead;
3359         u64 rl;
3360         u64 tmp;
3361         unsigned long s = c->bit_offset;
3362         unsigned long e;
3363         int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3364         int toggle = DCBP_get_start(p);
3365         int have;
3366         int bits;
3367
3368         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3369
3370         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3371         if (bits < 0)
3372                 return FAILED;
3373
3374         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3375                 bits = vli_decode_bits(&rl, look_ahead);
3376                 if (bits <= 0)
3377                         return FAILED;
3378
3379                 if (toggle) {
3380                         e = s + rl -1;
3381                         if (e >= c->bm_bits) {
3382                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3383                                 return FAILED;
3384                         }
3385                         _drbd_bm_set_bits(mdev, s, e);
3386                 }
3387
3388                 if (have < bits) {
3389                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3390                                 have, bits, look_ahead,
3391                                 (unsigned int)(bs.cur.b - p->code),
3392                                 (unsigned int)bs.buf_len);
3393                         return FAILED;
3394                 }
3395                 look_ahead >>= bits;
3396                 have -= bits;
3397
3398                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3399                 if (bits < 0)
3400                         return FAILED;
3401                 look_ahead |= tmp << have;
3402                 have += bits;
3403         }
3404
3405         c->bit_offset = s;
3406         bm_xfer_ctx_bit_to_word_offset(c);
3407
3408         return (s == c->bm_bits) ? DONE : OK;
3409 }
3410
3411 static enum receive_bitmap_ret
3412 decode_bitmap_c(struct drbd_conf *mdev,
3413                 struct p_compressed_bm *p,
3414                 struct bm_xfer_ctx *c)
3415 {
3416         if (DCBP_get_code(p) == RLE_VLI_Bits)
3417                 return recv_bm_rle_bits(mdev, p, c);
3418
3419         /* other variants had been implemented for evaluation,
3420          * but have been dropped as this one turned out to be "best"
3421          * during all our tests. */
3422
3423         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3424         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3425         return FAILED;
3426 }
3427
3428 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3429                 const char *direction, struct bm_xfer_ctx *c)
3430 {
3431         /* what would it take to transfer it "plaintext" */
3432         unsigned plain = sizeof(struct p_header) *
3433                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3434                 + c->bm_words * sizeof(long);
3435         unsigned total = c->bytes[0] + c->bytes[1];
3436         unsigned r;
3437
3438         /* total can not be zero. but just in case: */
3439         if (total == 0)
3440                 return;
3441
3442         /* don't report if not compressed */
3443         if (total >= plain)
3444                 return;
3445
3446         /* total < plain. check for overflow, still */
3447         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3448                                     : (1000 * total / plain);
3449
3450         if (r > 1000)
3451                 r = 1000;
3452
3453         r = 1000 - r;
3454         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3455              "total %u; compression: %u.%u%%\n",
3456                         direction,
3457                         c->bytes[1], c->packets[1],
3458                         c->bytes[0], c->packets[0],
3459                         total, r/10, r % 10);
3460 }
3461
3462 /* Since we are processing the bitfield from lower addresses to higher,
3463    it does not matter if the process it in 32 bit chunks or 64 bit
3464    chunks as long as it is little endian. (Understand it as byte stream,
3465    beginning with the lowest byte...) If we would use big endian
3466    we would need to process it from the highest address to the lowest,
3467    in order to be agnostic to the 32 vs 64 bits issue.
3468
3469    returns 0 on failure, 1 if we successfully received it. */
3470 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3471 {
3472         struct bm_xfer_ctx c;
3473         void *buffer;
3474         enum receive_bitmap_ret ret;
3475         int ok = FALSE;
3476
3477         wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3478
3479         drbd_bm_lock(mdev, "receive bitmap");
3480
3481         /* maybe we should use some per thread scratch page,
3482          * and allocate that during initial device creation? */
3483         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3484         if (!buffer) {
3485                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3486                 goto out;
3487         }
3488
3489         c = (struct bm_xfer_ctx) {
3490                 .bm_bits = drbd_bm_bits(mdev),
3491                 .bm_words = drbd_bm_words(mdev),
3492         };
3493
3494         do {
3495                 if (h->command == P_BITMAP) {
3496                         ret = receive_bitmap_plain(mdev, h, buffer, &c);
3497                 } else if (h->command == P_COMPRESSED_BITMAP) {
3498                         /* MAYBE: sanity check that we speak proto >= 90,
3499                          * and the feature is enabled! */
3500                         struct p_compressed_bm *p;
3501
3502                         if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3503                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3504                                 goto out;
3505                         }
3506                         /* use the page buff */
3507                         p = buffer;
3508                         memcpy(p, h, sizeof(*h));
3509                         if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3510                                 goto out;
3511                         if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3512                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3513                                 return FAILED;
3514                         }
3515                         ret = decode_bitmap_c(mdev, p, &c);
3516                 } else {
3517                         dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3518                         goto out;
3519                 }
3520
3521                 c.packets[h->command == P_BITMAP]++;
3522                 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3523
3524                 if (ret != OK)
3525                         break;
3526
3527                 if (!drbd_recv_header(mdev, h))
3528                         goto out;
3529         } while (ret == OK);
3530         if (ret == FAILED)
3531                 goto out;
3532
3533         INFO_bm_xfer_stats(mdev, "receive", &c);
3534
3535         if (mdev->state.conn == C_WF_BITMAP_T) {
3536                 ok = !drbd_send_bitmap(mdev);
3537                 if (!ok)
3538                         goto out;
3539                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3540                 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3541                 D_ASSERT(ok == SS_SUCCESS);
3542         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3543                 /* admin may have requested C_DISCONNECTING,
3544                  * other threads may have noticed network errors */
3545                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3546                     drbd_conn_str(mdev->state.conn));
3547         }
3548
3549         ok = TRUE;
3550  out:
3551         drbd_bm_unlock(mdev);
3552         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3553                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3554         free_page((unsigned long) buffer);
3555         return ok;
3556 }
3557
3558 static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
3559 {
3560         /* TODO zero copy sink :) */
3561         static char sink[128];
3562         int size, want, r;
3563
3564         if (!silent)
3565                 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3566                      h->command, h->length);
3567
3568         size = h->length;
3569         while (size > 0) {
3570                 want = min_t(int, size, sizeof(sink));
3571                 r = drbd_recv(mdev, sink, want);
3572                 ERR_IF(r <= 0) break;
3573                 size -= r;
3574         }
3575         return size == 0;
3576 }
3577
3578 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3579 {
3580         return receive_skip_(mdev, h, 0);
3581 }
3582
3583 static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
3584 {
3585         return receive_skip_(mdev, h, 1);
3586 }
3587
3588 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3589 {
3590         if (mdev->state.disk >= D_INCONSISTENT)
3591                 drbd_kick_lo(mdev);
3592
3593         /* Make sure we've acked all the TCP data associated
3594          * with the data requests being unplugged */
3595         drbd_tcp_quickack(mdev->data.socket);
3596
3597         return TRUE;
3598 }
3599
3600 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3601
3602 static drbd_cmd_handler_f drbd_default_handler[] = {
3603         [P_DATA]            = receive_Data,
3604         [P_DATA_REPLY]      = receive_DataReply,
3605         [P_RS_DATA_REPLY]   = receive_RSDataReply,
3606         [P_BARRIER]         = receive_Barrier,
3607         [P_BITMAP]          = receive_bitmap,
3608         [P_COMPRESSED_BITMAP]    = receive_bitmap,
3609         [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3610         [P_DATA_REQUEST]    = receive_DataRequest,
3611         [P_RS_DATA_REQUEST] = receive_DataRequest,
3612         [P_SYNC_PARAM]      = receive_SyncParam,
3613         [P_SYNC_PARAM89]           = receive_SyncParam,
3614         [P_PROTOCOL]        = receive_protocol,
3615         [P_UUIDS]           = receive_uuids,
3616         [P_SIZES]           = receive_sizes,
3617         [P_STATE]           = receive_state,
3618         [P_STATE_CHG_REQ]   = receive_req_state,
3619         [P_SYNC_UUID]       = receive_sync_uuid,
3620         [P_OV_REQUEST]      = receive_DataRequest,
3621         [P_OV_REPLY]        = receive_DataRequest,
3622         [P_CSUM_RS_REQUEST]    = receive_DataRequest,
3623         [P_DELAY_PROBE]     = receive_skip_silent,
3624         /* anything missing from this table is in
3625          * the asender_tbl, see get_asender_cmd */
3626         [P_MAX_CMD]         = NULL,
3627 };
3628
3629 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3630 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3631
3632 static void drbdd(struct drbd_conf *mdev)
3633 {
3634         drbd_cmd_handler_f handler;
3635         struct p_header *header = &mdev->data.rbuf.header;
3636
3637         while (get_t_state(&mdev->receiver) == Running) {
3638                 drbd_thread_current_set_cpu(mdev);
3639                 if (!drbd_recv_header(mdev, header)) {
3640                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3641                         break;
3642                 }
3643
3644                 if (header->command < P_MAX_CMD)
3645                         handler = drbd_cmd_handler[header->command];
3646                 else if (P_MAY_IGNORE < header->command
3647                      && header->command < P_MAX_OPT_CMD)
3648                         handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3649                 else if (header->command > P_MAX_OPT_CMD)
3650                         handler = receive_skip;
3651                 else
3652                         handler = NULL;
3653
3654                 if (unlikely(!handler)) {
3655                         dev_err(DEV, "unknown packet type %d, l: %d!\n",
3656                             header->command, header->length);
3657                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3658                         break;
3659                 }
3660                 if (unlikely(!handler(mdev, header))) {
3661                         dev_err(DEV, "error receiving %s, l: %d!\n",
3662                             cmdname(header->command), header->length);
3663                         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3664                         break;
3665                 }
3666         }
3667 }
3668
3669 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3670 {
3671         struct hlist_head *slot;
3672         struct hlist_node *pos;
3673         struct hlist_node *tmp;
3674         struct drbd_request *req;
3675         int i;
3676
3677         /*
3678          * Application READ requests
3679          */
3680         spin_lock_irq(&mdev->req_lock);
3681         for (i = 0; i < APP_R_HSIZE; i++) {
3682                 slot = mdev->app_reads_hash+i;
3683                 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3684                         /* it may (but should not any longer!)
3685                          * be on the work queue; if that assert triggers,
3686                          * we need to also grab the
3687                          * spin_lock_irq(&mdev->data.work.q_lock);
3688                          * and list_del_init here. */
3689                         D_ASSERT(list_empty(&req->w.list));
3690                         /* It would be nice to complete outside of spinlock.
3691                          * But this is easier for now. */
3692                         _req_mod(req, connection_lost_while_pending);
3693                 }
3694         }
3695         for (i = 0; i < APP_R_HSIZE; i++)
3696                 if (!hlist_empty(mdev->app_reads_hash+i))
3697                         dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3698                                 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3699
3700         memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3701         spin_unlock_irq(&mdev->req_lock);
3702 }
3703
3704 void drbd_flush_workqueue(struct drbd_conf *mdev)
3705 {
3706         struct drbd_wq_barrier barr;
3707
3708         barr.w.cb = w_prev_work_done;
3709         init_completion(&barr.done);
3710         drbd_queue_work(&mdev->data.work, &barr.w);
3711         wait_for_completion(&barr.done);
3712 }
3713
3714 static void drbd_disconnect(struct drbd_conf *mdev)
3715 {
3716         enum drbd_fencing_p fp;
3717         union drbd_state os, ns;
3718         int rv = SS_UNKNOWN_ERROR;
3719         unsigned int i;
3720
3721         if (mdev->state.conn == C_STANDALONE)
3722                 return;
3723         if (mdev->state.conn >= C_WF_CONNECTION)
3724                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3725                                 drbd_conn_str(mdev->state.conn));
3726
3727         /* asender does not clean up anything. it must not interfere, either */
3728         drbd_thread_stop(&mdev->asender);
3729         drbd_free_sock(mdev);
3730
3731         spin_lock_irq(&mdev->req_lock);
3732         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3733         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3734         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3735         spin_unlock_irq(&mdev->req_lock);
3736
3737         /* We do not have data structures that would allow us to
3738          * get the rs_pending_cnt down to 0 again.
3739          *  * On C_SYNC_TARGET we do not have any data structures describing
3740          *    the pending RSDataRequest's we have sent.
3741          *  * On C_SYNC_SOURCE there is no data structure that tracks
3742          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3743          *  And no, it is not the sum of the reference counts in the
3744          *  resync_LRU. The resync_LRU tracks the whole operation including
3745          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3746          *  on the fly. */
3747         drbd_rs_cancel_all(mdev);
3748         mdev->rs_total = 0;
3749         mdev->rs_failed = 0;
3750         atomic_set(&mdev->rs_pending_cnt, 0);
3751         wake_up(&mdev->misc_wait);
3752
3753         /* make sure syncer is stopped and w_resume_next_sg queued */
3754         del_timer_sync(&mdev->resync_timer);
3755         set_bit(STOP_SYNC_TIMER, &mdev->flags);
3756         resync_timer_fn((unsigned long)mdev);
3757
3758         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3759          * w_make_resync_request etc. which may still be on the worker queue
3760          * to be "canceled" */
3761         drbd_flush_workqueue(mdev);
3762
3763         /* This also does reclaim_net_ee().  If we do this too early, we might
3764          * miss some resync ee and pages.*/
3765         drbd_process_done_ee(mdev);
3766
3767         kfree(mdev->p_uuid);
3768         mdev->p_uuid = NULL;
3769
3770         if (!mdev->state.susp)
3771                 tl_clear(mdev);
3772
3773         drbd_fail_pending_reads(mdev);
3774
3775         dev_info(DEV, "Connection closed\n");
3776
3777         drbd_md_sync(mdev);
3778
3779         fp = FP_DONT_CARE;
3780         if (get_ldev(mdev)) {
3781                 fp = mdev->ldev->dc.fencing;
3782                 put_ldev(mdev);
3783         }
3784
3785         if (mdev->state.role == R_PRIMARY) {
3786                 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3787                         enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3788                         drbd_request_state(mdev, NS(pdsk, nps));
3789                 }
3790         }
3791
3792         spin_lock_irq(&mdev->req_lock);
3793         os = mdev->state;
3794         if (os.conn >= C_UNCONNECTED) {
3795                 /* Do not restart in case we are C_DISCONNECTING */
3796                 ns = os;
3797                 ns.conn = C_UNCONNECTED;
3798                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3799         }
3800         spin_unlock_irq(&mdev->req_lock);
3801
3802         if (os.conn == C_DISCONNECTING) {
3803                 struct hlist_head *h;
3804                 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3805
3806                 /* we must not free the tl_hash
3807                  * while application io is still on the fly */
3808                 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3809
3810                 spin_lock_irq(&mdev->req_lock);
3811                 /* paranoia code */
3812                 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3813                         if (h->first)
3814                                 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3815                                                 (int)(h - mdev->ee_hash), h->first);
3816                 kfree(mdev->ee_hash);
3817                 mdev->ee_hash = NULL;
3818                 mdev->ee_hash_s = 0;
3819
3820                 /* paranoia code */
3821                 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3822                         if (h->first)
3823                                 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3824                                                 (int)(h - mdev->tl_hash), h->first);
3825                 kfree(mdev->tl_hash);
3826                 mdev->tl_hash = NULL;
3827                 mdev->tl_hash_s = 0;
3828                 spin_unlock_irq(&mdev->req_lock);
3829
3830                 crypto_free_hash(mdev->cram_hmac_tfm);
3831                 mdev->cram_hmac_tfm = NULL;
3832
3833                 kfree(mdev->net_conf);
3834                 mdev->net_conf = NULL;
3835                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3836         }
3837
3838         /* tcp_close and release of sendpage pages can be deferred.  I don't
3839          * want to use SO_LINGER, because apparently it can be deferred for
3840          * more than 20 seconds (longest time I checked).
3841          *
3842          * Actually we don't care for exactly when the network stack does its
3843          * put_page(), but release our reference on these pages right here.
3844          */
3845         i = drbd_release_ee(mdev, &mdev->net_ee);
3846         if (i)
3847                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3848         i = atomic_read(&mdev->pp_in_use);
3849         if (i)
3850                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3851
3852         D_ASSERT(list_empty(&mdev->read_ee));
3853         D_ASSERT(list_empty(&mdev->active_ee));
3854         D_ASSERT(list_empty(&mdev->sync_ee));
3855         D_ASSERT(list_empty(&mdev->done_ee));
3856
3857         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3858         atomic_set(&mdev->current_epoch->epoch_size, 0);
3859         D_ASSERT(list_empty(&mdev->current_epoch->list));
3860 }
3861
3862 /*
3863  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3864  * we can agree on is stored in agreed_pro_version.
3865  *
3866  * feature flags and the reserved array should be enough room for future
3867  * enhancements of the handshake protocol, and possible plugins...
3868  *
3869  * for now, they are expected to be zero, but ignored.
3870  */
3871 static int drbd_send_handshake(struct drbd_conf *mdev)
3872 {
3873         /* ASSERT current == mdev->receiver ... */
3874         struct p_handshake *p = &mdev->data.sbuf.handshake;
3875         int ok;
3876
3877         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3878                 dev_err(DEV, "interrupted during initial handshake\n");
3879                 return 0; /* interrupted. not ok. */
3880         }
3881
3882         if (mdev->data.socket == NULL) {
3883                 mutex_unlock(&mdev->data.mutex);
3884                 return 0;
3885         }
3886
3887         memset(p, 0, sizeof(*p));
3888         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3889         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3890         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3891                              (struct p_header *)p, sizeof(*p), 0 );
3892         mutex_unlock(&mdev->data.mutex);
3893         return ok;
3894 }
3895
3896 /*
3897  * return values:
3898  *   1 yes, we have a valid connection
3899  *   0 oops, did not work out, please try again
3900  *  -1 peer talks different language,
3901  *     no point in trying again, please go standalone.
3902  */
3903 static int drbd_do_handshake(struct drbd_conf *mdev)
3904 {
3905         /* ASSERT current == mdev->receiver ... */
3906         struct p_handshake *p = &mdev->data.rbuf.handshake;
3907         const int expect = sizeof(struct p_handshake)
3908                           -sizeof(struct p_header);
3909         int rv;
3910
3911         rv = drbd_send_handshake(mdev);
3912         if (!rv)
3913                 return 0;
3914
3915         rv = drbd_recv_header(mdev, &p->head);
3916         if (!rv)
3917                 return 0;
3918
3919         if (p->head.command != P_HAND_SHAKE) {
3920                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3921                      cmdname(p->head.command), p->head.command);
3922                 return -1;
3923         }
3924
3925         if (p->head.length != expect) {
3926                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3927                      expect, p->head.length);
3928                 return -1;
3929         }
3930
3931         rv = drbd_recv(mdev, &p->head.payload, expect);
3932
3933         if (rv != expect) {
3934                 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3935                 return 0;
3936         }
3937
3938         p->protocol_min = be32_to_cpu(p->protocol_min);
3939         p->protocol_max = be32_to_cpu(p->protocol_max);
3940         if (p->protocol_max == 0)
3941                 p->protocol_max = p->protocol_min;
3942
3943         if (PRO_VERSION_MAX < p->protocol_min ||
3944             PRO_VERSION_MIN > p->protocol_max)
3945                 goto incompat;
3946
3947         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3948
3949         dev_info(DEV, "Handshake successful: "
3950              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3951
3952         return 1;
3953
3954  incompat:
3955         dev_err(DEV, "incompatible DRBD dialects: "
3956             "I support %d-%d, peer supports %d-%d\n",
3957             PRO_VERSION_MIN, PRO_VERSION_MAX,
3958             p->protocol_min, p->protocol_max);
3959         return -1;
3960 }
3961
3962 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3963 static int drbd_do_auth(struct drbd_conf *mdev)
3964 {
3965         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3966         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3967         return -1;
3968 }
3969 #else
3970 #define CHALLENGE_LEN 64
3971
3972 /* Return value:
3973         1 - auth succeeded,
3974         0 - failed, try again (network error),
3975         -1 - auth failed, don't try again.
3976 */
3977
3978 static int drbd_do_auth(struct drbd_conf *mdev)
3979 {
3980         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3981         struct scatterlist sg;
3982         char *response = NULL;
3983         char *right_response = NULL;
3984         char *peers_ch = NULL;
3985         struct p_header p;
3986         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3987         unsigned int resp_size;
3988         struct hash_desc desc;
3989         int rv;
3990
3991         desc.tfm = mdev->cram_hmac_tfm;
3992         desc.flags = 0;
3993
3994         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3995                                 (u8 *)mdev->net_conf->shared_secret, key_len);
3996         if (rv) {
3997                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3998                 rv = -1;
3999                 goto fail;
4000         }
4001
4002         get_random_bytes(my_challenge, CHALLENGE_LEN);
4003
4004         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4005         if (!rv)
4006                 goto fail;
4007
4008         rv = drbd_recv_header(mdev, &p);
4009         if (!rv)
4010                 goto fail;
4011
4012         if (p.command != P_AUTH_CHALLENGE) {
4013                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4014                     cmdname(p.command), p.command);
4015                 rv = 0;
4016                 goto fail;
4017         }
4018
4019         if (p.length > CHALLENGE_LEN*2) {
4020                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4021                 rv = -1;
4022                 goto fail;
4023         }
4024
4025         peers_ch = kmalloc(p.length, GFP_NOIO);
4026         if (peers_ch == NULL) {
4027                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4028                 rv = -1;
4029                 goto fail;
4030         }
4031
4032         rv = drbd_recv(mdev, peers_ch, p.length);
4033
4034         if (rv != p.length) {
4035                 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4036                 rv = 0;
4037                 goto fail;
4038         }
4039
4040         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4041         response = kmalloc(resp_size, GFP_NOIO);
4042         if (response == NULL) {
4043                 dev_err(DEV, "kmalloc of response failed\n");
4044                 rv = -1;
4045                 goto fail;
4046         }
4047
4048         sg_init_table(&sg, 1);
4049         sg_set_buf(&sg, peers_ch, p.length);
4050
4051         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4052         if (rv) {
4053                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4054                 rv = -1;
4055                 goto fail;
4056         }
4057
4058         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4059         if (!rv)
4060                 goto fail;
4061
4062         rv = drbd_recv_header(mdev, &p);
4063         if (!rv)
4064                 goto fail;
4065
4066         if (p.command != P_AUTH_RESPONSE) {
4067                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4068                     cmdname(p.command), p.command);
4069                 rv = 0;
4070                 goto fail;
4071         }
4072
4073         if (p.length != resp_size) {
4074                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4075                 rv = 0;
4076                 goto fail;
4077         }
4078
4079         rv = drbd_recv(mdev, response , resp_size);
4080
4081         if (rv != resp_size) {
4082                 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4083                 rv = 0;
4084                 goto fail;
4085         }
4086
4087         right_response = kmalloc(resp_size, GFP_NOIO);
4088         if (right_response == NULL) {
4089                 dev_err(DEV, "kmalloc of right_response failed\n");
4090                 rv = -1;
4091                 goto fail;
4092         }
4093
4094         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4095
4096         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4097         if (rv) {
4098                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4099                 rv = -1;
4100                 goto fail;
4101         }
4102
4103         rv = !memcmp(response, right_response, resp_size);
4104
4105         if (rv)
4106                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4107                      resp_size, mdev->net_conf->cram_hmac_alg);
4108         else
4109                 rv = -1;
4110
4111  fail:
4112         kfree(peers_ch);
4113         kfree(response);
4114         kfree(right_response);
4115
4116         return rv;
4117 }
4118 #endif
4119
4120 int drbdd_init(struct drbd_thread *thi)
4121 {
4122         struct drbd_conf *mdev = thi->mdev;
4123         unsigned int minor = mdev_to_minor(mdev);
4124         int h;
4125
4126         sprintf(current->comm, "drbd%d_receiver", minor);
4127
4128         dev_info(DEV, "receiver (re)started\n");
4129
4130         do {
4131                 h = drbd_connect(mdev);
4132                 if (h == 0) {
4133                         drbd_disconnect(mdev);
4134                         __set_current_state(TASK_INTERRUPTIBLE);
4135                         schedule_timeout(HZ);
4136                 }
4137                 if (h == -1) {
4138                         dev_warn(DEV, "Discarding network configuration.\n");
4139                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4140                 }
4141         } while (h == 0);
4142
4143         if (h > 0) {
4144                 if (get_net_conf(mdev)) {
4145                         drbdd(mdev);
4146                         put_net_conf(mdev);
4147                 }
4148         }
4149
4150         drbd_disconnect(mdev);
4151
4152         dev_info(DEV, "receiver terminated\n");
4153         return 0;
4154 }
4155
4156 /* ********* acknowledge sender ******** */
4157
4158 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4159 {
4160         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4161
4162         int retcode = be32_to_cpu(p->retcode);
4163
4164         if (retcode >= SS_SUCCESS) {
4165                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4166         } else {
4167                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4168                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4169                     drbd_set_st_err_str(retcode), retcode);
4170         }
4171         wake_up(&mdev->state_wait);
4172
4173         return TRUE;
4174 }
4175
4176 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4177 {
4178         return drbd_send_ping_ack(mdev);
4179
4180 }
4181
4182 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4183 {
4184         /* restore idle timeout */
4185         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4186         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4187                 wake_up(&mdev->misc_wait);
4188
4189         return TRUE;
4190 }
4191
4192 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4193 {
4194         struct p_block_ack *p = (struct p_block_ack *)h;
4195         sector_t sector = be64_to_cpu(p->sector);
4196         int blksize = be32_to_cpu(p->blksize);
4197
4198         D_ASSERT(mdev->agreed_pro_version >= 89);
4199
4200         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4201
4202         drbd_rs_complete_io(mdev, sector);
4203         drbd_set_in_sync(mdev, sector, blksize);
4204         /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4205         mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4206         dec_rs_pending(mdev);
4207
4208         return TRUE;
4209 }
4210
4211 /* when we receive the ACK for a write request,
4212  * verify that we actually know about it */
4213 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4214         u64 id, sector_t sector)
4215 {
4216         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4217         struct hlist_node *n;
4218         struct drbd_request *req;
4219
4220         hlist_for_each_entry(req, n, slot, colision) {
4221                 if ((unsigned long)req == (unsigned long)id) {
4222                         if (req->sector != sector) {
4223                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4224                                     "wrong sector (%llus versus %llus)\n", req,
4225                                     (unsigned long long)req->sector,
4226                                     (unsigned long long)sector);
4227                                 break;
4228                         }
4229                         return req;
4230                 }
4231         }
4232         dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4233                 (void *)(unsigned long)id, (unsigned long long)sector);
4234         return NULL;
4235 }
4236
4237 typedef struct drbd_request *(req_validator_fn)
4238         (struct drbd_conf *mdev, u64 id, sector_t sector);
4239
4240 static int validate_req_change_req_state(struct drbd_conf *mdev,
4241         u64 id, sector_t sector, req_validator_fn validator,
4242         const char *func, enum drbd_req_event what)
4243 {
4244         struct drbd_request *req;
4245         struct bio_and_error m;
4246
4247         spin_lock_irq(&mdev->req_lock);
4248         req = validator(mdev, id, sector);
4249         if (unlikely(!req)) {
4250                 spin_unlock_irq(&mdev->req_lock);
4251                 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4252                 return FALSE;
4253         }
4254         __req_mod(req, what, &m);
4255         spin_unlock_irq(&mdev->req_lock);
4256
4257         if (m.bio)
4258                 complete_master_bio(mdev, &m);
4259         return TRUE;
4260 }
4261
4262 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4263 {
4264         struct p_block_ack *p = (struct p_block_ack *)h;
4265         sector_t sector = be64_to_cpu(p->sector);
4266         int blksize = be32_to_cpu(p->blksize);
4267         enum drbd_req_event what;
4268
4269         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4270
4271         if (is_syncer_block_id(p->block_id)) {
4272                 drbd_set_in_sync(mdev, sector, blksize);
4273                 dec_rs_pending(mdev);
4274                 return TRUE;
4275         }
4276         switch (be16_to_cpu(h->command)) {
4277         case P_RS_WRITE_ACK:
4278                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4279                 what = write_acked_by_peer_and_sis;
4280                 break;
4281         case P_WRITE_ACK:
4282                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4283                 what = write_acked_by_peer;
4284                 break;
4285         case P_RECV_ACK:
4286                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4287                 what = recv_acked_by_peer;
4288                 break;
4289         case P_DISCARD_ACK:
4290                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4291                 what = conflict_discarded_by_peer;
4292                 break;
4293         default:
4294                 D_ASSERT(0);
4295                 return FALSE;
4296         }
4297
4298         return validate_req_change_req_state(mdev, p->block_id, sector,
4299                 _ack_id_to_req, __func__ , what);
4300 }
4301
4302 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4303 {
4304         struct p_block_ack *p = (struct p_block_ack *)h;
4305         sector_t sector = be64_to_cpu(p->sector);
4306
4307         if (__ratelimit(&drbd_ratelimit_state))
4308                 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4309
4310         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4311
4312         if (is_syncer_block_id(p->block_id)) {
4313                 int size = be32_to_cpu(p->blksize);
4314                 dec_rs_pending(mdev);
4315                 drbd_rs_failed_io(mdev, sector, size);
4316                 return TRUE;
4317         }
4318         return validate_req_change_req_state(mdev, p->block_id, sector,
4319                 _ack_id_to_req, __func__ , neg_acked);
4320 }
4321
4322 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4323 {
4324         struct p_block_ack *p = (struct p_block_ack *)h;
4325         sector_t sector = be64_to_cpu(p->sector);
4326
4327         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4328         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4329             (unsigned long long)sector, be32_to_cpu(p->blksize));
4330
4331         return validate_req_change_req_state(mdev, p->block_id, sector,
4332                 _ar_id_to_req, __func__ , neg_acked);
4333 }
4334
4335 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4336 {
4337         sector_t sector;
4338         int size;
4339         struct p_block_ack *p = (struct p_block_ack *)h;
4340
4341         sector = be64_to_cpu(p->sector);
4342         size = be32_to_cpu(p->blksize);
4343
4344         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4345
4346         dec_rs_pending(mdev);
4347
4348         if (get_ldev_if_state(mdev, D_FAILED)) {
4349                 drbd_rs_complete_io(mdev, sector);
4350                 drbd_rs_failed_io(mdev, sector, size);
4351                 put_ldev(mdev);
4352         }
4353
4354         return TRUE;
4355 }
4356
4357 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4358 {
4359         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4360
4361         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4362
4363         return TRUE;
4364 }
4365
4366 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4367 {
4368         struct p_block_ack *p = (struct p_block_ack *)h;
4369         struct drbd_work *w;
4370         sector_t sector;
4371         int size;
4372
4373         sector = be64_to_cpu(p->sector);
4374         size = be32_to_cpu(p->blksize);
4375
4376         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377
4378         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4379                 drbd_ov_oos_found(mdev, sector, size);
4380         else
4381                 ov_oos_print(mdev);
4382
4383         drbd_rs_complete_io(mdev, sector);
4384         dec_rs_pending(mdev);
4385
4386         if (--mdev->ov_left == 0) {
4387                 w = kmalloc(sizeof(*w), GFP_NOIO);
4388                 if (w) {
4389                         w->cb = w_ov_finished;
4390                         drbd_queue_work_front(&mdev->data.work, w);
4391                 } else {
4392                         dev_err(DEV, "kmalloc(w) failed.");
4393                         ov_oos_print(mdev);
4394                         drbd_resync_finished(mdev);
4395                 }
4396         }
4397         return TRUE;
4398 }
4399
4400 static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
4401 {
4402         /* IGNORE */
4403         return TRUE;
4404 }
4405
4406 struct asender_cmd {
4407         size_t pkt_size;
4408         int (*process)(struct drbd_conf *mdev, struct p_header *h);
4409 };
4410
4411 static struct asender_cmd *get_asender_cmd(int cmd)
4412 {
4413         static struct asender_cmd asender_tbl[] = {
4414                 /* anything missing from this table is in
4415                  * the drbd_cmd_handler (drbd_default_handler) table,
4416                  * see the beginning of drbdd() */
4417         [P_PING]            = { sizeof(struct p_header), got_Ping },
4418         [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
4419         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4420         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4421         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4422         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4423         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4424         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4425         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4426         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4427         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4428         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4429         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4430         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
4431         [P_MAX_CMD]         = { 0, NULL },
4432         };
4433         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4434                 return NULL;
4435         return &asender_tbl[cmd];
4436 }
4437
4438 int drbd_asender(struct drbd_thread *thi)
4439 {
4440         struct drbd_conf *mdev = thi->mdev;
4441         struct p_header *h = &mdev->meta.rbuf.header;
4442         struct asender_cmd *cmd = NULL;
4443
4444         int rv, len;
4445         void *buf    = h;
4446         int received = 0;
4447         int expect   = sizeof(struct p_header);
4448         int empty;
4449
4450         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4451
4452         current->policy = SCHED_RR;  /* Make this a realtime task! */
4453         current->rt_priority = 2;    /* more important than all other tasks */
4454
4455         while (get_t_state(thi) == Running) {
4456                 drbd_thread_current_set_cpu(mdev);
4457                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4458                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4459                         mdev->meta.socket->sk->sk_rcvtimeo =
4460                                 mdev->net_conf->ping_timeo*HZ/10;
4461                 }
4462
4463                 /* conditionally cork;
4464                  * it may hurt latency if we cork without much to send */
4465                 if (!mdev->net_conf->no_cork &&
4466                         3 < atomic_read(&mdev->unacked_cnt))
4467                         drbd_tcp_cork(mdev->meta.socket);
4468                 while (1) {
4469                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4470                         flush_signals(current);
4471                         if (!drbd_process_done_ee(mdev)) {
4472                                 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4473                                 goto reconnect;
4474                         }
4475                         /* to avoid race with newly queued ACKs */
4476                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4477                         spin_lock_irq(&mdev->req_lock);
4478                         empty = list_empty(&mdev->done_ee);
4479                         spin_unlock_irq(&mdev->req_lock);
4480                         /* new ack may have been queued right here,
4481                          * but then there is also a signal pending,
4482                          * and we start over... */
4483                         if (empty)
4484                                 break;
4485                 }
4486                 /* but unconditionally uncork unless disabled */
4487                 if (!mdev->net_conf->no_cork)
4488                         drbd_tcp_uncork(mdev->meta.socket);
4489
4490                 /* short circuit, recv_msg would return EINTR anyways. */
4491                 if (signal_pending(current))
4492                         continue;
4493
4494                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4495                                      buf, expect-received, 0);
4496                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4497
4498                 flush_signals(current);
4499
4500                 /* Note:
4501                  * -EINTR        (on meta) we got a signal
4502                  * -EAGAIN       (on meta) rcvtimeo expired
4503                  * -ECONNRESET   other side closed the connection
4504                  * -ERESTARTSYS  (on data) we got a signal
4505                  * rv <  0       other than above: unexpected error!
4506                  * rv == expected: full header or command
4507                  * rv <  expected: "woken" by signal during receive
4508                  * rv == 0       : "connection shut down by peer"
4509                  */
4510                 if (likely(rv > 0)) {
4511                         received += rv;
4512                         buf      += rv;
4513                 } else if (rv == 0) {
4514                         dev_err(DEV, "meta connection shut down by peer.\n");
4515                         goto reconnect;
4516                 } else if (rv == -EAGAIN) {
4517                         if (mdev->meta.socket->sk->sk_rcvtimeo ==
4518                             mdev->net_conf->ping_timeo*HZ/10) {
4519                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4520                                 goto reconnect;
4521                         }
4522                         set_bit(SEND_PING, &mdev->flags);
4523                         continue;
4524                 } else if (rv == -EINTR) {
4525                         continue;
4526                 } else {
4527                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4528                         goto reconnect;
4529                 }
4530
4531                 if (received == expect && cmd == NULL) {
4532                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4533                                 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4534                                     (long)be32_to_cpu(h->magic),
4535                                     h->command, h->length);
4536                                 goto reconnect;
4537                         }
4538                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4539                         len = be16_to_cpu(h->length);
4540                         if (unlikely(cmd == NULL)) {
4541                                 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4542                                     (long)be32_to_cpu(h->magic),
4543                                     h->command, h->length);
4544                                 goto disconnect;
4545                         }
4546                         expect = cmd->pkt_size;
4547                         ERR_IF(len != expect-sizeof(struct p_header))
4548                                 goto reconnect;
4549                 }
4550                 if (received == expect) {
4551                         D_ASSERT(cmd != NULL);
4552                         if (!cmd->process(mdev, h))
4553                                 goto reconnect;
4554
4555                         buf      = h;
4556                         received = 0;
4557                         expect   = sizeof(struct p_header);
4558                         cmd      = NULL;
4559                 }
4560         }
4561
4562         if (0) {
4563 reconnect:
4564                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4565         }
4566         if (0) {
4567 disconnect:
4568                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4569         }
4570         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4571
4572         D_ASSERT(mdev->state.conn < C_CONNECTED);
4573         dev_info(DEV, "asender terminated\n");
4574
4575         return 0;
4576 }