4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
67 * some helper functions to deal with single linked page lists,
68 * page->private being our "next" pointer.
71 /* If at least n pages are linked at head, get n pages off.
72 * Otherwise, don't modify head, and return NULL.
73 * Locking is the responsibility of the caller.
75 static struct page *page_chain_del(struct page **head, int n)
89 tmp = page_chain_next(page);
91 break; /* found sufficient pages */
93 /* insufficient pages, don't use any of them. */
98 /* add end of list marker for the returned list */
99 set_page_private(page, 0);
100 /* actual return value, and adjustment of head */
106 /* may be used outside of locks to find the tail of a (usually short)
107 * "private" page chain, before adding it back to a global chain head
108 * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
113 while ((tmp = page_chain_next(page)))
120 static int page_chain_free(struct page *page)
124 page_chain_for_each_safe(page, tmp) {
131 static void page_chain_add(struct page **head,
132 struct page *chain_first, struct page *chain_last)
136 tmp = page_chain_tail(chain_first, NULL);
137 BUG_ON(tmp != chain_last);
140 /* add chain to head */
141 set_page_private(chain_last, (unsigned long)*head);
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
147 struct page *page = NULL;
148 struct page *tmp = NULL;
151 /* Yes, testing drbd_pp_vacant outside the lock is racy.
152 * So what. It saves a spin_lock. */
153 if (drbd_pp_vacant >= number) {
154 spin_lock(&drbd_pp_lock);
155 page = page_chain_del(&drbd_pp_pool, number);
157 drbd_pp_vacant -= number;
158 spin_unlock(&drbd_pp_lock);
163 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 * which in turn might block on the other node at this very place. */
166 for (i = 0; i < number; i++) {
167 tmp = alloc_page(GFP_TRY);
170 set_page_private(tmp, (unsigned long)page);
177 /* Not enough pages immediately available this time.
178 * No need to jump around here, drbd_pp_alloc will retry this
179 * function "soon". */
181 tmp = page_chain_tail(page, NULL);
182 spin_lock(&drbd_pp_lock);
183 page_chain_add(&drbd_pp_pool, page, tmp);
185 spin_unlock(&drbd_pp_lock);
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
192 struct drbd_epoch_entry *e;
193 struct list_head *le, *tle;
195 /* The EEs are always appended to the end of the list. Since
196 they are sent in order over the wire, they have to finish
197 in order. As soon as we see the first not finished we can
198 stop to examine the list... */
200 list_for_each_safe(le, tle, &mdev->net_ee) {
201 e = list_entry(le, struct drbd_epoch_entry, w.list);
202 if (drbd_ee_has_active_page(e))
204 list_move(le, to_be_freed);
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
210 LIST_HEAD(reclaimed);
211 struct drbd_epoch_entry *e, *t;
213 spin_lock_irq(&mdev->req_lock);
214 reclaim_net_ee(mdev, &reclaimed);
215 spin_unlock_irq(&mdev->req_lock);
217 list_for_each_entry_safe(e, t, &reclaimed, w.list)
218 drbd_free_net_ee(mdev, e);
222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223 * @mdev: DRBD device.
224 * @number: number of pages requested
225 * @retry: whether to retry, if not enough pages are available right now
227 * Tries to allocate number pages, first from our own page pool, then from
228 * the kernel, unless this allocation would exceed the max_buffers setting.
229 * Possibly retry until DRBD frees sufficient pages somewhere else.
231 * Returns a page chain linked via page->private.
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
235 struct page *page = NULL;
238 /* Yes, we may run up to @number over max_buffers. If we
239 * follow it strictly, the admin will get it wrong anyways. */
240 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
243 while (page == NULL) {
244 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
246 drbd_kick_lo_and_reclaim_net(mdev);
248 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
257 if (signal_pending(current)) {
258 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
264 finish_wait(&drbd_pp_wait, &wait);
267 atomic_add(number, &mdev->pp_in_use);
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273 * Either links the page chain back to the global pool,
274 * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
277 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
283 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
284 i = page_chain_free(page);
287 tmp = page_chain_tail(page, &i);
288 spin_lock(&drbd_pp_lock);
289 page_chain_add(&drbd_pp_pool, page, tmp);
291 spin_unlock(&drbd_pp_lock);
293 i = atomic_sub_return(i, a);
295 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
296 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
297 wake_up(&drbd_pp_wait);
301 You need to hold the req_lock:
302 _drbd_wait_ee_list_empty()
304 You must not have the req_lock:
310 drbd_process_done_ee()
312 drbd_wait_ee_list_empty()
315 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
318 unsigned int data_size,
319 gfp_t gfp_mask) __must_hold(local)
321 struct drbd_epoch_entry *e;
322 struct page *page = NULL;
323 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
325 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
328 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
330 if (!(gfp_mask & __GFP_NOWARN))
331 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
336 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
341 INIT_HLIST_NODE(&e->collision);
345 atomic_set(&e->pending_bios, 0);
354 mempool_free(e, drbd_ee_mempool);
358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
360 if (e->flags & EE_HAS_DIGEST)
362 drbd_pp_free(mdev, e->pages, is_net);
363 D_ASSERT(atomic_read(&e->pending_bios) == 0);
364 D_ASSERT(hlist_unhashed(&e->collision));
365 mempool_free(e, drbd_ee_mempool);
368 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
370 LIST_HEAD(work_list);
371 struct drbd_epoch_entry *e, *t;
373 int is_net = list == &mdev->net_ee;
375 spin_lock_irq(&mdev->req_lock);
376 list_splice_init(list, &work_list);
377 spin_unlock_irq(&mdev->req_lock);
379 list_for_each_entry_safe(e, t, &work_list, w.list) {
380 drbd_free_some_ee(mdev, e, is_net);
388 * This function is called from _asender only_
389 * but see also comments in _req_mod(,barrier_acked)
390 * and receive_Barrier.
392 * Move entries from net_ee to done_ee, if ready.
393 * Grab done_ee, call all callbacks, free the entries.
394 * The callbacks typically send out ACKs.
396 static int drbd_process_done_ee(struct drbd_conf *mdev)
398 LIST_HEAD(work_list);
399 LIST_HEAD(reclaimed);
400 struct drbd_epoch_entry *e, *t;
401 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
403 spin_lock_irq(&mdev->req_lock);
404 reclaim_net_ee(mdev, &reclaimed);
405 list_splice_init(&mdev->done_ee, &work_list);
406 spin_unlock_irq(&mdev->req_lock);
408 list_for_each_entry_safe(e, t, &reclaimed, w.list)
409 drbd_free_net_ee(mdev, e);
411 /* possible callbacks here:
412 * e_end_block, and e_end_resync_block, e_send_discard_ack.
413 * all ignore the last argument.
415 list_for_each_entry_safe(e, t, &work_list, w.list) {
416 /* list_del not necessary, next/prev members not touched */
417 ok = e->w.cb(mdev, &e->w, !ok) && ok;
418 drbd_free_ee(mdev, e);
420 wake_up(&mdev->ee_wait);
425 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
429 /* avoids spin_lock/unlock
430 * and calling prepare_to_wait in the fast path */
431 while (!list_empty(head)) {
432 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
433 spin_unlock_irq(&mdev->req_lock);
435 finish_wait(&mdev->ee_wait, &wait);
436 spin_lock_irq(&mdev->req_lock);
440 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
442 spin_lock_irq(&mdev->req_lock);
443 _drbd_wait_ee_list_empty(mdev, head);
444 spin_unlock_irq(&mdev->req_lock);
447 /* see also kernel_accept; which is only present since 2.6.18.
448 * also we want to log which part of it failed, exactly */
449 static int drbd_accept(struct drbd_conf *mdev, const char **what,
450 struct socket *sock, struct socket **newsock)
452 struct sock *sk = sock->sk;
456 err = sock->ops->listen(sock, 5);
460 *what = "sock_create_lite";
461 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 err = sock->ops->accept(sock, *newsock, 0);
469 sock_release(*newsock);
473 (*newsock)->ops = sock->ops;
474 __module_get((*newsock)->ops->owner);
480 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
481 void *buf, size_t size, int flags)
488 struct msghdr msg = {
490 .msg_iov = (struct iovec *)&iov,
491 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
497 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
503 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
510 struct msghdr msg = {
512 .msg_iov = (struct iovec *)&iov,
513 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
521 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
526 * ECONNRESET other side closed the connection
527 * ERESTARTSYS (on sock) we got a signal
531 if (rv == -ECONNRESET)
532 dev_info(DEV, "sock was reset by peer\n");
533 else if (rv != -ERESTARTSYS)
534 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
536 } else if (rv == 0) {
537 dev_info(DEV, "sock was shut down by peer\n");
540 /* signal came in, or peer/link went down,
541 * after we read a partial message
543 /* D_ASSERT(signal_pending(current)); */
551 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
557 * On individual connections, the socket buffer size must be set prior to the
558 * listen(2) or connect(2) calls in order to have it take effect.
559 * This is our wrapper to do so.
561 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
564 /* open coded SO_SNDBUF, SO_RCVBUF */
566 sock->sk->sk_sndbuf = snd;
567 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
570 sock->sk->sk_rcvbuf = rcv;
571 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
575 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
579 struct sockaddr_in6 src_in6;
581 int disconnect_on_error = 1;
583 if (!get_net_conf(mdev))
586 what = "sock_create_kern";
587 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
588 SOCK_STREAM, IPPROTO_TCP, &sock);
594 sock->sk->sk_rcvtimeo =
595 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
596 drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
597 mdev->net_conf->rcvbuf_size);
599 /* explicitly bind to the configured IP as source IP
600 * for the outgoing connections.
601 * This is needed for multihomed hosts and to be
602 * able to use lo: interfaces for drbd.
603 * Make sure to use 0 as port number, so linux selects
604 * a free one dynamically.
606 memcpy(&src_in6, mdev->net_conf->my_addr,
607 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
608 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
609 src_in6.sin6_port = 0;
611 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
613 what = "bind before connect";
614 err = sock->ops->bind(sock,
615 (struct sockaddr *) &src_in6,
616 mdev->net_conf->my_addr_len);
620 /* connect may fail, peer not yet available.
621 * stay C_WF_CONNECTION, don't go Disconnecting! */
622 disconnect_on_error = 0;
624 err = sock->ops->connect(sock,
625 (struct sockaddr *)mdev->net_conf->peer_addr,
626 mdev->net_conf->peer_addr_len, 0);
635 /* timeout, busy, signal pending */
636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 case EINTR: case ERESTARTSYS:
638 /* peer not (yet) available, network problem */
639 case ECONNREFUSED: case ENETUNREACH:
640 case EHOSTDOWN: case EHOSTUNREACH:
641 disconnect_on_error = 0;
644 dev_err(DEV, "%s failed, err = %d\n", what, err);
646 if (disconnect_on_error)
647 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
653 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
656 struct socket *s_estab = NULL, *s_listen;
659 if (!get_net_conf(mdev))
662 what = "sock_create_kern";
663 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
664 SOCK_STREAM, IPPROTO_TCP, &s_listen);
670 timeo = mdev->net_conf->try_connect_int * HZ;
671 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
673 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
674 s_listen->sk->sk_rcvtimeo = timeo;
675 s_listen->sk->sk_sndtimeo = timeo;
676 drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
677 mdev->net_conf->rcvbuf_size);
679 what = "bind before listen";
680 err = s_listen->ops->bind(s_listen,
681 (struct sockaddr *) mdev->net_conf->my_addr,
682 mdev->net_conf->my_addr_len);
686 err = drbd_accept(mdev, &what, s_listen, &s_estab);
690 sock_release(s_listen);
692 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
693 dev_err(DEV, "%s failed, err = %d\n", what, err);
694 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
702 static int drbd_send_fp(struct drbd_conf *mdev,
703 struct socket *sock, enum drbd_packets cmd)
705 struct p_header80 *h = &mdev->data.sbuf.header.h80;
707 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
710 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
712 struct p_header80 *h = &mdev->data.rbuf.header.h80;
715 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
717 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
718 return be16_to_cpu(h->command);
724 * drbd_socket_okay() - Free the socket if its connection is not okay
725 * @mdev: DRBD device.
726 * @sock: pointer to the pointer to the socket.
728 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
736 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
738 if (rr > 0 || rr == -EAGAIN) {
749 * 1 yes, we have a valid connection
750 * 0 oops, did not work out, please try again
751 * -1 peer talks different language,
752 * no point in trying again, please go standalone.
753 * -2 We do not have a network config...
755 static int drbd_connect(struct drbd_conf *mdev)
757 struct socket *s, *sock, *msock;
759 enum drbd_state_rv rv;
761 D_ASSERT(!mdev->data.socket);
763 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
766 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
773 /* 3 tries, this should take less than a second! */
774 s = drbd_try_connect(mdev);
777 /* give the other side time to call bind() & listen() */
778 schedule_timeout_interruptible(HZ / 10);
783 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
787 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
791 dev_err(DEV, "Logic error in drbd_connect()\n");
792 goto out_release_sockets;
797 schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
798 ok = drbd_socket_okay(mdev, &sock);
799 ok = drbd_socket_okay(mdev, &msock) && ok;
805 s = drbd_wait_for_connect(mdev);
807 try = drbd_recv_fp(mdev, s);
808 drbd_socket_okay(mdev, &sock);
809 drbd_socket_okay(mdev, &msock);
813 dev_warn(DEV, "initial packet S crossed\n");
820 dev_warn(DEV, "initial packet M crossed\n");
824 set_bit(DISCARD_CONCURRENT, &mdev->flags);
827 dev_warn(DEV, "Error receiving initial packet\n");
834 if (mdev->state.conn <= C_DISCONNECTING)
835 goto out_release_sockets;
836 if (signal_pending(current)) {
837 flush_signals(current);
839 if (get_t_state(&mdev->receiver) == Exiting)
840 goto out_release_sockets;
844 ok = drbd_socket_okay(mdev, &sock);
845 ok = drbd_socket_okay(mdev, &msock) && ok;
851 msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
852 sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
854 sock->sk->sk_allocation = GFP_NOIO;
855 msock->sk->sk_allocation = GFP_NOIO;
857 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
858 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
861 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863 * first set it to the P_HAND_SHAKE timeout,
864 * which we set to 4x the configured ping_timeout. */
865 sock->sk->sk_sndtimeo =
866 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
868 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
871 /* we don't want delays.
872 * we use TCP_CORK where appropriate, though */
873 drbd_tcp_nodelay(sock);
874 drbd_tcp_nodelay(msock);
876 mdev->data.socket = sock;
877 mdev->meta.socket = msock;
878 mdev->last_received = jiffies;
880 D_ASSERT(mdev->asender.task == NULL);
882 h = drbd_do_handshake(mdev);
886 if (mdev->cram_hmac_tfm) {
887 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
888 switch (drbd_do_auth(mdev)) {
890 dev_err(DEV, "Authentication of peer failed\n");
893 dev_err(DEV, "Authentication of peer failed, trying again.\n");
898 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
899 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
901 atomic_set(&mdev->packet_seq, 0);
904 if (drbd_send_protocol(mdev) == -1)
906 set_bit(STATE_SENT, &mdev->flags);
907 drbd_send_sync_param(mdev, &mdev->sync_conf);
908 drbd_send_sizes(mdev, 0, 0);
909 drbd_send_uuids(mdev);
910 drbd_send_current_state(mdev);
911 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
912 clear_bit(RESIZE_PENDING, &mdev->flags);
914 spin_lock_irq(&mdev->req_lock);
915 rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
916 if (mdev->state.conn != C_WF_REPORT_PARAMS)
917 clear_bit(STATE_SENT, &mdev->flags);
918 spin_unlock_irq(&mdev->req_lock);
923 drbd_thread_start(&mdev->asender);
924 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
936 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
938 union p_header *h = &mdev->data.rbuf.header;
941 r = drbd_recv(mdev, h, sizeof(*h));
942 if (unlikely(r != sizeof(*h))) {
943 if (!signal_pending(current))
944 dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
948 if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
949 *cmd = be16_to_cpu(h->h80.command);
950 *packet_size = be16_to_cpu(h->h80.length);
951 } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
952 *cmd = be16_to_cpu(h->h95.command);
953 *packet_size = be32_to_cpu(h->h95.length);
955 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
956 be32_to_cpu(h->h80.magic),
957 be16_to_cpu(h->h80.command),
958 be16_to_cpu(h->h80.length));
961 mdev->last_received = jiffies;
966 static void drbd_flush(struct drbd_conf *mdev)
970 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
971 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
974 dev_info(DEV, "local disk flush failed with status %d\n", rv);
975 /* would rather check on EOPNOTSUPP, but that is not reliable.
976 * don't try again for ANY return value != 0
977 * if (rv == -EOPNOTSUPP) */
978 drbd_bump_write_ordering(mdev, WO_drain_io);
985 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
986 * @mdev: DRBD device.
987 * @epoch: Epoch object.
990 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
991 struct drbd_epoch *epoch,
995 struct drbd_epoch *next_epoch;
996 enum finish_epoch rv = FE_STILL_LIVE;
998 spin_lock(&mdev->epoch_lock);
1002 epoch_size = atomic_read(&epoch->epoch_size);
1004 switch (ev & ~EV_CLEANUP) {
1006 atomic_dec(&epoch->active);
1008 case EV_GOT_BARRIER_NR:
1009 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1011 case EV_BECAME_LAST:
1016 if (epoch_size != 0 &&
1017 atomic_read(&epoch->active) == 0 &&
1018 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1019 if (!(ev & EV_CLEANUP)) {
1020 spin_unlock(&mdev->epoch_lock);
1021 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1022 spin_lock(&mdev->epoch_lock);
1024 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1027 if (mdev->current_epoch != epoch) {
1028 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1029 list_del(&epoch->list);
1030 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1034 if (rv == FE_STILL_LIVE)
1038 atomic_set(&epoch->epoch_size, 0);
1039 /* atomic_set(&epoch->active, 0); is already zero */
1040 if (rv == FE_STILL_LIVE)
1042 wake_up(&mdev->ee_wait);
1052 spin_unlock(&mdev->epoch_lock);
1058 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1059 * @mdev: DRBD device.
1060 * @wo: Write ordering method to try.
1062 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1064 enum write_ordering_e pwo;
1065 static char *write_ordering_str[] = {
1067 [WO_drain_io] = "drain",
1068 [WO_bdev_flush] = "flush",
1071 pwo = mdev->write_ordering;
1073 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1075 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1077 mdev->write_ordering = wo;
1078 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1079 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1084 * @mdev: DRBD device.
1086 * @rw: flag field, see bio->bi_rw
1088 * May spread the pages to multiple bios,
1089 * depending on bio_add_page restrictions.
1091 * Returns 0 if all bios have been submitted,
1092 * -ENOMEM if we could not allocate enough bios,
1093 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1094 * single page to an empty bio (which should never happen and likely indicates
1095 * that the lower level IO stack is in some way broken). This has been observed
1096 * on certain Xen deployments.
1098 /* TODO allocate from our own bio_set. */
1099 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1100 const unsigned rw, const int fault_type)
1102 struct bio *bios = NULL;
1104 struct page *page = e->pages;
1105 sector_t sector = e->sector;
1106 unsigned ds = e->size;
1107 unsigned n_bios = 0;
1108 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1111 /* In most cases, we will only need one bio. But in case the lower
1112 * level restrictions happen to be different at this offset on this
1113 * side than those of the sending peer, we may need to submit the
1114 * request in more than one bio.
1116 * Plain bio_alloc is good enough here, this is no DRBD internally
1117 * generated bio, but a bio allocated on behalf of the peer.
1120 bio = bio_alloc(GFP_NOIO, nr_pages);
1122 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1125 /* > e->sector, unless this is the first bio */
1126 bio->bi_sector = sector;
1127 bio->bi_bdev = mdev->ldev->backing_bdev;
1129 bio->bi_private = e;
1130 bio->bi_end_io = drbd_endio_sec;
1132 bio->bi_next = bios;
1136 page_chain_for_each(page) {
1137 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1138 if (!bio_add_page(bio, page, len, 0)) {
1139 /* A single page must always be possible!
1140 * But in case it fails anyways,
1141 * we deal with it, and complain (below). */
1142 if (bio->bi_vcnt == 0) {
1144 "bio_add_page failed for len=%u, "
1145 "bi_vcnt=0 (bi_sector=%llu)\n",
1146 len, (unsigned long long)bio->bi_sector);
1156 D_ASSERT(page == NULL);
1159 atomic_set(&e->pending_bios, n_bios);
1162 bios = bios->bi_next;
1163 bio->bi_next = NULL;
1165 drbd_generic_make_request(mdev, fault_type, bio);
1172 bios = bios->bi_next;
1178 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1181 struct p_barrier *p = &mdev->data.rbuf.barrier;
1182 struct drbd_epoch *epoch;
1186 mdev->current_epoch->barrier_nr = p->barrier;
1187 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1189 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1190 * the activity log, which means it would not be resynced in case the
1191 * R_PRIMARY crashes now.
1192 * Therefore we must send the barrier_ack after the barrier request was
1194 switch (mdev->write_ordering) {
1196 if (rv == FE_RECYCLED)
1199 /* receiver context, in the writeout path of the other node.
1200 * avoid potential distributed deadlock */
1201 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1205 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1210 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1213 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1214 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1219 epoch = mdev->current_epoch;
1220 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1222 D_ASSERT(atomic_read(&epoch->active) == 0);
1223 D_ASSERT(epoch->flags == 0);
1227 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1232 atomic_set(&epoch->epoch_size, 0);
1233 atomic_set(&epoch->active, 0);
1235 spin_lock(&mdev->epoch_lock);
1236 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237 list_add(&epoch->list, &mdev->current_epoch->list);
1238 mdev->current_epoch = epoch;
1241 /* The current_epoch got recycled while we allocated this one... */
1244 spin_unlock(&mdev->epoch_lock);
1249 /* used from receive_RSDataReply (recv_resync_read)
1250 * and from receive_Data */
1251 static struct drbd_epoch_entry *
1252 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1254 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1255 struct drbd_epoch_entry *e;
1258 void *dig_in = mdev->int_dig_in;
1259 void *dig_vv = mdev->int_dig_vv;
1260 unsigned long *data;
1262 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1263 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1266 rr = drbd_recv(mdev, dig_in, dgs);
1268 if (!signal_pending(current))
1270 "short read receiving data digest: read %d expected %d\n",
1278 ERR_IF(data_size & 0x1ff) return NULL;
1279 ERR_IF(data_size > DRBD_MAX_BIO_SIZE) return NULL;
1281 /* even though we trust out peer,
1282 * we sometimes have to double check. */
1283 if (sector + (data_size>>9) > capacity) {
1284 dev_err(DEV, "request from peer beyond end of local disk: "
1285 "capacity: %llus < sector: %llus + size: %u\n",
1286 (unsigned long long)capacity,
1287 (unsigned long long)sector, data_size);
1291 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1292 * "criss-cross" setup, that might cause write-out on some other DRBD,
1293 * which in turn might block on the other node at this very place. */
1294 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1303 page_chain_for_each(page) {
1304 unsigned len = min_t(int, ds, PAGE_SIZE);
1306 rr = drbd_recv(mdev, data, len);
1307 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1308 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1309 data[0] = data[0] ^ (unsigned long)-1;
1313 drbd_free_ee(mdev, e);
1314 if (!signal_pending(current))
1315 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1323 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1324 if (memcmp(dig_in, dig_vv, dgs)) {
1325 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1326 (unsigned long long)sector, data_size);
1327 drbd_bcast_ee(mdev, "digest failed",
1328 dgs, dig_in, dig_vv, e);
1329 drbd_free_ee(mdev, e);
1333 mdev->recv_cnt += data_size>>9;
1337 /* drbd_drain_block() just takes a data block
1338 * out of the socket input buffer, and discards it.
1340 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1349 page = drbd_pp_alloc(mdev, 1, 1);
1353 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1354 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1356 if (!signal_pending(current))
1358 "short read receiving data: read %d expected %d\n",
1359 rr, min_t(int, data_size, PAGE_SIZE));
1365 drbd_pp_free(mdev, page, 0);
1369 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1370 sector_t sector, int data_size)
1372 struct bio_vec *bvec;
1374 int dgs, rr, i, expect;
1375 void *dig_in = mdev->int_dig_in;
1376 void *dig_vv = mdev->int_dig_vv;
1378 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1379 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1382 rr = drbd_recv(mdev, dig_in, dgs);
1384 if (!signal_pending(current))
1386 "short read receiving data reply digest: read %d expected %d\n",
1394 /* optimistically update recv_cnt. if receiving fails below,
1395 * we disconnect anyways, and counters will be reset. */
1396 mdev->recv_cnt += data_size>>9;
1398 bio = req->master_bio;
1399 D_ASSERT(sector == bio->bi_sector);
1401 bio_for_each_segment(bvec, bio, i) {
1402 expect = min_t(int, data_size, bvec->bv_len);
1403 rr = drbd_recv(mdev,
1404 kmap(bvec->bv_page)+bvec->bv_offset,
1406 kunmap(bvec->bv_page);
1408 if (!signal_pending(current))
1409 dev_warn(DEV, "short read receiving data reply: "
1410 "read %d expected %d\n",
1418 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1419 if (memcmp(dig_in, dig_vv, dgs)) {
1420 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1425 D_ASSERT(data_size == 0);
1429 /* e_end_resync_block() is called via
1430 * drbd_process_done_ee() by asender only */
1431 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1433 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1434 sector_t sector = e->sector;
1437 D_ASSERT(hlist_unhashed(&e->collision));
1439 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1440 drbd_set_in_sync(mdev, sector, e->size);
1441 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1443 /* Record failure to sync */
1444 drbd_rs_failed_io(mdev, sector, e->size);
1446 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1453 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1455 struct drbd_epoch_entry *e;
1457 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1461 dec_rs_pending(mdev);
1464 /* corresponding dec_unacked() in e_end_resync_block()
1465 * respective _drbd_clear_done_ee */
1467 e->w.cb = e_end_resync_block;
1469 spin_lock_irq(&mdev->req_lock);
1470 list_add(&e->w.list, &mdev->sync_ee);
1471 spin_unlock_irq(&mdev->req_lock);
1473 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1474 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1477 /* don't care for the reason here */
1478 dev_err(DEV, "submit failed, triggering re-connect\n");
1479 spin_lock_irq(&mdev->req_lock);
1480 list_del(&e->w.list);
1481 spin_unlock_irq(&mdev->req_lock);
1483 drbd_free_ee(mdev, e);
1489 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1491 struct drbd_request *req;
1494 struct p_data *p = &mdev->data.rbuf.data;
1496 sector = be64_to_cpu(p->sector);
1498 spin_lock_irq(&mdev->req_lock);
1499 req = _ar_id_to_req(mdev, p->block_id, sector);
1500 spin_unlock_irq(&mdev->req_lock);
1501 if (unlikely(!req)) {
1502 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1506 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1507 * special casing it there for the various failure cases.
1508 * still no race with drbd_fail_pending_reads */
1509 ok = recv_dless_read(mdev, req, sector, data_size);
1512 req_mod(req, data_received);
1513 /* else: nothing. handled from drbd_disconnect...
1514 * I don't think we may complete this just yet
1515 * in case we are "on-disconnect: freeze" */
1520 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1524 struct p_data *p = &mdev->data.rbuf.data;
1526 sector = be64_to_cpu(p->sector);
1527 D_ASSERT(p->block_id == ID_SYNCER);
1529 if (get_ldev(mdev)) {
1530 /* data is submitted to disk within recv_resync_read.
1531 * corresponding put_ldev done below on error,
1532 * or in drbd_endio_write_sec. */
1533 ok = recv_resync_read(mdev, sector, data_size);
1535 if (__ratelimit(&drbd_ratelimit_state))
1536 dev_err(DEV, "Can not write resync data to local disk.\n");
1538 ok = drbd_drain_block(mdev, data_size);
1540 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1543 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1548 /* e_end_block() is called via drbd_process_done_ee().
1549 * this means this function only runs in the asender thread
1551 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1553 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1554 sector_t sector = e->sector;
1557 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1558 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1559 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1560 mdev->state.conn <= C_PAUSED_SYNC_T &&
1561 e->flags & EE_MAY_SET_IN_SYNC) ?
1562 P_RS_WRITE_ACK : P_WRITE_ACK;
1563 ok &= drbd_send_ack(mdev, pcmd, e);
1564 if (pcmd == P_RS_WRITE_ACK)
1565 drbd_set_in_sync(mdev, sector, e->size);
1567 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1568 /* we expect it to be marked out of sync anyways...
1569 * maybe assert this? */
1573 /* we delete from the conflict detection hash _after_ we sent out the
1574 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1575 if (mdev->net_conf->two_primaries) {
1576 spin_lock_irq(&mdev->req_lock);
1577 D_ASSERT(!hlist_unhashed(&e->collision));
1578 hlist_del_init(&e->collision);
1579 spin_unlock_irq(&mdev->req_lock);
1581 D_ASSERT(hlist_unhashed(&e->collision));
1584 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1589 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1591 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1594 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1595 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1597 spin_lock_irq(&mdev->req_lock);
1598 D_ASSERT(!hlist_unhashed(&e->collision));
1599 hlist_del_init(&e->collision);
1600 spin_unlock_irq(&mdev->req_lock);
1607 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1610 struct drbd_epoch_entry *rs_e;
1613 spin_lock_irq(&mdev->req_lock);
1614 list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1615 if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1620 spin_unlock_irq(&mdev->req_lock);
1625 /* Called from receive_Data.
1626 * Synchronize packets on sock with packets on msock.
1628 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1629 * packet traveling on msock, they are still processed in the order they have
1632 * Note: we don't care for Ack packets overtaking P_DATA packets.
1634 * In case packet_seq is larger than mdev->peer_seq number, there are
1635 * outstanding packets on the msock. We wait for them to arrive.
1636 * In case we are the logically next packet, we update mdev->peer_seq
1637 * ourselves. Correctly handles 32bit wrap around.
1639 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1640 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1641 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1642 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1644 * returns 0 if we may process the packet,
1645 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1646 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1652 spin_lock(&mdev->peer_seq_lock);
1654 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1655 if (seq_le(packet_seq, mdev->peer_seq+1))
1657 if (signal_pending(current)) {
1661 p_seq = mdev->peer_seq;
1662 spin_unlock(&mdev->peer_seq_lock);
1663 timeout = schedule_timeout(30*HZ);
1664 spin_lock(&mdev->peer_seq_lock);
1665 if (timeout == 0 && p_seq == mdev->peer_seq) {
1667 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1671 finish_wait(&mdev->seq_wait, &wait);
1672 if (mdev->peer_seq+1 == packet_seq)
1674 spin_unlock(&mdev->peer_seq_lock);
1678 /* see also bio_flags_to_wire()
1679 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1680 * flags and back. We may replicate to other kernel versions. */
1681 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1683 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1684 (dpf & DP_FUA ? REQ_FUA : 0) |
1685 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1686 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1689 /* mirrored write */
1690 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1693 struct drbd_epoch_entry *e;
1694 struct p_data *p = &mdev->data.rbuf.data;
1698 if (!get_ldev(mdev)) {
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1702 spin_unlock(&mdev->peer_seq_lock);
1704 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1705 atomic_inc(&mdev->current_epoch->epoch_size);
1706 return drbd_drain_block(mdev, data_size);
1709 /* get_ldev(mdev) successful.
1710 * Corresponding put_ldev done either below (on various errors),
1711 * or in drbd_endio_write_sec, if we successfully submit the data at
1712 * the end of this function. */
1714 sector = be64_to_cpu(p->sector);
1715 e = read_in_block(mdev, p->block_id, sector, data_size);
1721 e->w.cb = e_end_block;
1723 dp_flags = be32_to_cpu(p->dp_flags);
1724 rw |= wire_flags_to_bio(mdev, dp_flags);
1725 if (e->pages == NULL) {
1726 D_ASSERT(e->size == 0);
1727 D_ASSERT(dp_flags & DP_FLUSH);
1730 if (dp_flags & DP_MAY_SET_IN_SYNC)
1731 e->flags |= EE_MAY_SET_IN_SYNC;
1733 spin_lock(&mdev->epoch_lock);
1734 e->epoch = mdev->current_epoch;
1735 atomic_inc(&e->epoch->epoch_size);
1736 atomic_inc(&e->epoch->active);
1737 spin_unlock(&mdev->epoch_lock);
1739 /* I'm the receiver, I do hold a net_cnt reference. */
1740 if (!mdev->net_conf->two_primaries) {
1741 spin_lock_irq(&mdev->req_lock);
1743 /* don't get the req_lock yet,
1744 * we may sleep in drbd_wait_peer_seq */
1745 const int size = e->size;
1746 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1748 struct drbd_request *i;
1749 struct hlist_node *n;
1750 struct hlist_head *slot;
1753 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1754 BUG_ON(mdev->ee_hash == NULL);
1755 BUG_ON(mdev->tl_hash == NULL);
1757 /* conflict detection and handling:
1758 * 1. wait on the sequence number,
1759 * in case this data packet overtook ACK packets.
1760 * 2. check our hash tables for conflicting requests.
1761 * we only need to walk the tl_hash, since an ee can not
1762 * have a conflict with an other ee: on the submitting
1763 * node, the corresponding req had already been conflicting,
1764 * and a conflicting req is never sent.
1766 * Note: for two_primaries, we are protocol C,
1767 * so there cannot be any request that is DONE
1768 * but still on the transfer log.
1770 * unconditionally add to the ee_hash.
1772 * if no conflicting request is found:
1775 * if any conflicting request is found
1776 * that has not yet been acked,
1777 * AND I have the "discard concurrent writes" flag:
1778 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1780 * if any conflicting request is found:
1781 * block the receiver, waiting on misc_wait
1782 * until no more conflicting requests are there,
1783 * or we get interrupted (disconnect).
1785 * we do not just write after local io completion of those
1786 * requests, but only after req is done completely, i.e.
1787 * we wait for the P_DISCARD_ACK to arrive!
1789 * then proceed normally, i.e. submit.
1791 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1792 goto out_interrupted;
1794 spin_lock_irq(&mdev->req_lock);
1796 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1798 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1799 slot = tl_hash_slot(mdev, sector);
1802 int have_unacked = 0;
1803 int have_conflict = 0;
1804 prepare_to_wait(&mdev->misc_wait, &wait,
1805 TASK_INTERRUPTIBLE);
1806 hlist_for_each_entry(i, n, slot, collision) {
1808 /* only ALERT on first iteration,
1809 * we may be woken up early... */
1811 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1812 " new: %llus +%u; pending: %llus +%u\n",
1813 current->comm, current->pid,
1814 (unsigned long long)sector, size,
1815 (unsigned long long)i->sector, i->size);
1816 if (i->rq_state & RQ_NET_PENDING)
1825 /* Discard Ack only for the _first_ iteration */
1826 if (first && discard && have_unacked) {
1827 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1828 (unsigned long long)sector);
1830 e->w.cb = e_send_discard_ack;
1831 list_add_tail(&e->w.list, &mdev->done_ee);
1833 spin_unlock_irq(&mdev->req_lock);
1835 /* we could probably send that P_DISCARD_ACK ourselves,
1836 * but I don't like the receiver using the msock */
1840 finish_wait(&mdev->misc_wait, &wait);
1844 if (signal_pending(current)) {
1845 hlist_del_init(&e->collision);
1847 spin_unlock_irq(&mdev->req_lock);
1849 finish_wait(&mdev->misc_wait, &wait);
1850 goto out_interrupted;
1853 spin_unlock_irq(&mdev->req_lock);
1856 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1857 "sec=%llus\n", (unsigned long long)sector);
1858 } else if (discard) {
1859 /* we had none on the first iteration.
1860 * there must be none now. */
1861 D_ASSERT(have_unacked == 0);
1864 spin_lock_irq(&mdev->req_lock);
1866 finish_wait(&mdev->misc_wait, &wait);
1869 list_add(&e->w.list, &mdev->active_ee);
1870 spin_unlock_irq(&mdev->req_lock);
1872 if (mdev->state.conn == C_SYNC_TARGET)
1873 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1875 switch (mdev->net_conf->wire_protocol) {
1878 /* corresponding dec_unacked() in e_end_block()
1879 * respective _drbd_clear_done_ee */
1882 /* I really don't like it that the receiver thread
1883 * sends on the msock, but anyways */
1884 drbd_send_ack(mdev, P_RECV_ACK, e);
1891 if (mdev->state.pdsk < D_INCONSISTENT) {
1892 /* In case we have the only disk of the cluster, */
1893 drbd_set_out_of_sync(mdev, e->sector, e->size);
1894 e->flags |= EE_CALL_AL_COMPLETE_IO;
1895 e->flags &= ~EE_MAY_SET_IN_SYNC;
1896 drbd_al_begin_io(mdev, e->sector);
1899 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1902 /* don't care for the reason here */
1903 dev_err(DEV, "submit failed, triggering re-connect\n");
1904 spin_lock_irq(&mdev->req_lock);
1905 list_del(&e->w.list);
1906 hlist_del_init(&e->collision);
1907 spin_unlock_irq(&mdev->req_lock);
1908 if (e->flags & EE_CALL_AL_COMPLETE_IO)
1909 drbd_al_complete_io(mdev, e->sector);
1912 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1914 drbd_free_ee(mdev, e);
1918 /* We may throttle resync, if the lower device seems to be busy,
1919 * and current sync rate is above c_min_rate.
1921 * To decide whether or not the lower device is busy, we use a scheme similar
1922 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1923 * (more than 64 sectors) of activity we cannot account for with our own resync
1924 * activity, it obviously is "busy".
1926 * The current sync rate used here uses only the most recent two step marks,
1927 * to have a short time average so we can react faster.
1929 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1931 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1932 unsigned long db, dt, dbdt;
1933 struct lc_element *tmp;
1937 /* feature disabled? */
1938 if (mdev->sync_conf.c_min_rate == 0)
1941 spin_lock_irq(&mdev->al_lock);
1942 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1944 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1945 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1946 spin_unlock_irq(&mdev->al_lock);
1949 /* Do not slow down if app IO is already waiting for this extent */
1951 spin_unlock_irq(&mdev->al_lock);
1953 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1954 (int)part_stat_read(&disk->part0, sectors[1]) -
1955 atomic_read(&mdev->rs_sect_ev);
1957 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1958 unsigned long rs_left;
1961 mdev->rs_last_events = curr_events;
1963 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1965 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1967 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1968 rs_left = mdev->ov_left;
1970 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1972 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1975 db = mdev->rs_mark_left[i] - rs_left;
1976 dbdt = Bit2KB(db/dt);
1978 if (dbdt > mdev->sync_conf.c_min_rate)
1985 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1988 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1989 struct drbd_epoch_entry *e;
1990 struct digest_info *di = NULL;
1992 unsigned int fault_type;
1993 struct p_block_req *p = &mdev->data.rbuf.block_req;
1995 sector = be64_to_cpu(p->sector);
1996 size = be32_to_cpu(p->blksize);
1998 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1999 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2000 (unsigned long long)sector, size);
2003 if (sector + (size>>9) > capacity) {
2004 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2005 (unsigned long long)sector, size);
2009 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2012 case P_DATA_REQUEST:
2013 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2015 case P_RS_DATA_REQUEST:
2016 case P_CSUM_RS_REQUEST:
2018 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2022 dec_rs_pending(mdev);
2023 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2026 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2029 if (verb && __ratelimit(&drbd_ratelimit_state))
2030 dev_err(DEV, "Can not satisfy peer's read request, "
2031 "no local data.\n");
2033 /* drain possibly payload */
2034 return drbd_drain_block(mdev, digest_size);
2037 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2038 * "criss-cross" setup, that might cause write-out on some other DRBD,
2039 * which in turn might block on the other node at this very place. */
2040 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2047 case P_DATA_REQUEST:
2048 e->w.cb = w_e_end_data_req;
2049 fault_type = DRBD_FAULT_DT_RD;
2050 /* application IO, don't drbd_rs_begin_io */
2053 case P_RS_DATA_REQUEST:
2054 e->w.cb = w_e_end_rsdata_req;
2055 fault_type = DRBD_FAULT_RS_RD;
2056 /* used in the sector offset progress display */
2057 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2061 case P_CSUM_RS_REQUEST:
2062 fault_type = DRBD_FAULT_RS_RD;
2063 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2067 di->digest_size = digest_size;
2068 di->digest = (((char *)di)+sizeof(struct digest_info));
2071 e->flags |= EE_HAS_DIGEST;
2073 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2076 if (cmd == P_CSUM_RS_REQUEST) {
2077 D_ASSERT(mdev->agreed_pro_version >= 89);
2078 e->w.cb = w_e_end_csum_rs_req;
2079 /* used in the sector offset progress display */
2080 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2081 } else if (cmd == P_OV_REPLY) {
2082 /* track progress, we may need to throttle */
2083 atomic_add(size >> 9, &mdev->rs_sect_in);
2084 e->w.cb = w_e_end_ov_reply;
2085 dec_rs_pending(mdev);
2086 /* drbd_rs_begin_io done when we sent this request,
2087 * but accounting still needs to be done. */
2088 goto submit_for_resync;
2093 if (mdev->ov_start_sector == ~(sector_t)0 &&
2094 mdev->agreed_pro_version >= 90) {
2095 unsigned long now = jiffies;
2097 mdev->ov_start_sector = sector;
2098 mdev->ov_position = sector;
2099 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2100 mdev->rs_total = mdev->ov_left;
2101 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2102 mdev->rs_mark_left[i] = mdev->ov_left;
2103 mdev->rs_mark_time[i] = now;
2105 dev_info(DEV, "Online Verify start sector: %llu\n",
2106 (unsigned long long)sector);
2108 e->w.cb = w_e_end_ov_req;
2109 fault_type = DRBD_FAULT_RS_RD;
2113 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2115 fault_type = DRBD_FAULT_MAX;
2119 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2120 * wrt the receiver, but it is not as straightforward as it may seem.
2121 * Various places in the resync start and stop logic assume resync
2122 * requests are processed in order, requeuing this on the worker thread
2123 * introduces a bunch of new code for synchronization between threads.
2125 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2126 * "forever", throttling after drbd_rs_begin_io will lock that extent
2127 * for application writes for the same time. For now, just throttle
2128 * here, where the rest of the code expects the receiver to sleep for
2132 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2133 * this defers syncer requests for some time, before letting at least
2134 * on request through. The resync controller on the receiving side
2135 * will adapt to the incoming rate accordingly.
2137 * We cannot throttle here if remote is Primary/SyncTarget:
2138 * we would also throttle its application reads.
2139 * In that case, throttling is done on the SyncTarget only.
2141 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2142 schedule_timeout_uninterruptible(HZ/10);
2143 if (drbd_rs_begin_io(mdev, sector))
2147 atomic_add(size >> 9, &mdev->rs_sect_ev);
2151 spin_lock_irq(&mdev->req_lock);
2152 list_add_tail(&e->w.list, &mdev->read_ee);
2153 spin_unlock_irq(&mdev->req_lock);
2155 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2158 /* don't care for the reason here */
2159 dev_err(DEV, "submit failed, triggering re-connect\n");
2160 spin_lock_irq(&mdev->req_lock);
2161 list_del(&e->w.list);
2162 spin_unlock_irq(&mdev->req_lock);
2163 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2167 drbd_free_ee(mdev, e);
2171 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2173 int self, peer, rv = -100;
2174 unsigned long ch_self, ch_peer;
2176 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2177 peer = mdev->p_uuid[UI_BITMAP] & 1;
2179 ch_peer = mdev->p_uuid[UI_SIZE];
2180 ch_self = mdev->comm_bm_set;
2182 switch (mdev->net_conf->after_sb_0p) {
2184 case ASB_DISCARD_SECONDARY:
2185 case ASB_CALL_HELPER:
2186 dev_err(DEV, "Configuration error.\n");
2188 case ASB_DISCONNECT:
2190 case ASB_DISCARD_YOUNGER_PRI:
2191 if (self == 0 && peer == 1) {
2195 if (self == 1 && peer == 0) {
2199 /* Else fall through to one of the other strategies... */
2200 case ASB_DISCARD_OLDER_PRI:
2201 if (self == 0 && peer == 1) {
2205 if (self == 1 && peer == 0) {
2209 /* Else fall through to one of the other strategies... */
2210 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2211 "Using discard-least-changes instead\n");
2212 case ASB_DISCARD_ZERO_CHG:
2213 if (ch_peer == 0 && ch_self == 0) {
2214 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2218 if (ch_peer == 0) { rv = 1; break; }
2219 if (ch_self == 0) { rv = -1; break; }
2221 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2223 case ASB_DISCARD_LEAST_CHG:
2224 if (ch_self < ch_peer)
2226 else if (ch_self > ch_peer)
2228 else /* ( ch_self == ch_peer ) */
2229 /* Well, then use something else. */
2230 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2233 case ASB_DISCARD_LOCAL:
2236 case ASB_DISCARD_REMOTE:
2243 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2247 switch (mdev->net_conf->after_sb_1p) {
2248 case ASB_DISCARD_YOUNGER_PRI:
2249 case ASB_DISCARD_OLDER_PRI:
2250 case ASB_DISCARD_LEAST_CHG:
2251 case ASB_DISCARD_LOCAL:
2252 case ASB_DISCARD_REMOTE:
2253 dev_err(DEV, "Configuration error.\n");
2255 case ASB_DISCONNECT:
2258 hg = drbd_asb_recover_0p(mdev);
2259 if (hg == -1 && mdev->state.role == R_SECONDARY)
2261 if (hg == 1 && mdev->state.role == R_PRIMARY)
2265 rv = drbd_asb_recover_0p(mdev);
2267 case ASB_DISCARD_SECONDARY:
2268 return mdev->state.role == R_PRIMARY ? 1 : -1;
2269 case ASB_CALL_HELPER:
2270 hg = drbd_asb_recover_0p(mdev);
2271 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2272 enum drbd_state_rv rv2;
2274 drbd_set_role(mdev, R_SECONDARY, 0);
2275 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276 * we might be here in C_WF_REPORT_PARAMS which is transient.
2277 * we do not need to wait for the after state change work either. */
2278 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2279 if (rv2 != SS_SUCCESS) {
2280 drbd_khelper(mdev, "pri-lost-after-sb");
2282 dev_warn(DEV, "Successfully gave up primary role.\n");
2292 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2296 switch (mdev->net_conf->after_sb_2p) {
2297 case ASB_DISCARD_YOUNGER_PRI:
2298 case ASB_DISCARD_OLDER_PRI:
2299 case ASB_DISCARD_LEAST_CHG:
2300 case ASB_DISCARD_LOCAL:
2301 case ASB_DISCARD_REMOTE:
2303 case ASB_DISCARD_SECONDARY:
2304 dev_err(DEV, "Configuration error.\n");
2307 rv = drbd_asb_recover_0p(mdev);
2309 case ASB_DISCONNECT:
2311 case ASB_CALL_HELPER:
2312 hg = drbd_asb_recover_0p(mdev);
2314 enum drbd_state_rv rv2;
2316 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317 * we might be here in C_WF_REPORT_PARAMS which is transient.
2318 * we do not need to wait for the after state change work either. */
2319 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320 if (rv2 != SS_SUCCESS) {
2321 drbd_khelper(mdev, "pri-lost-after-sb");
2323 dev_warn(DEV, "Successfully gave up primary role.\n");
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334 u64 bits, u64 flags)
2337 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2340 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2342 (unsigned long long)uuid[UI_CURRENT],
2343 (unsigned long long)uuid[UI_BITMAP],
2344 (unsigned long long)uuid[UI_HISTORY_START],
2345 (unsigned long long)uuid[UI_HISTORY_END],
2346 (unsigned long long)bits,
2347 (unsigned long long)flags);
2351 100 after split brain try auto recover
2352 2 C_SYNC_SOURCE set BitMap
2353 1 C_SYNC_SOURCE use BitMap
2355 -1 C_SYNC_TARGET use BitMap
2356 -2 C_SYNC_TARGET set BitMap
2357 -100 after split brain, disconnect
2358 -1000 unrelated data
2359 -1091 requires proto 91
2360 -1096 requires proto 96
2362 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2367 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2368 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2371 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2375 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2376 peer != UUID_JUST_CREATED)
2380 if (self != UUID_JUST_CREATED &&
2381 (peer == UUID_JUST_CREATED || peer == (u64)0))
2385 int rct, dc; /* roles at crash time */
2387 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2389 if (mdev->agreed_pro_version < 91)
2392 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2393 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2394 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2395 drbd_uuid_set_bm(mdev, 0UL);
2397 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2398 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2401 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2408 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2410 if (mdev->agreed_pro_version < 91)
2413 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2414 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2415 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2417 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2418 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2419 mdev->p_uuid[UI_BITMAP] = 0UL;
2421 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2424 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2431 /* Common power [off|failure] */
2432 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2433 (mdev->p_uuid[UI_FLAGS] & 2);
2434 /* lowest bit is set when we were primary,
2435 * next bit (weight 2) is set when peer was primary */
2439 case 0: /* !self_pri && !peer_pri */ return 0;
2440 case 1: /* self_pri && !peer_pri */ return 1;
2441 case 2: /* !self_pri && peer_pri */ return -1;
2442 case 3: /* self_pri && peer_pri */
2443 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2449 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2454 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2456 if (mdev->agreed_pro_version < 96 ?
2457 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2458 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2459 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2460 /* The last P_SYNC_UUID did not get though. Undo the last start of
2461 resync as sync source modifications of the peer's UUIDs. */
2463 if (mdev->agreed_pro_version < 91)
2466 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2467 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2469 dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2470 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2477 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2478 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2479 peer = mdev->p_uuid[i] & ~((u64)1);
2485 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2486 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2491 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2493 if (mdev->agreed_pro_version < 96 ?
2494 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2495 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2496 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2497 /* The last P_SYNC_UUID did not get though. Undo the last start of
2498 resync as sync source modifications of our UUIDs. */
2500 if (mdev->agreed_pro_version < 91)
2503 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2504 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2506 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2507 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2508 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2516 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2517 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2518 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2524 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2525 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2526 if (self == peer && self != ((u64)0))
2530 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2531 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2532 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2533 peer = mdev->p_uuid[j] & ~((u64)1);
2542 /* drbd_sync_handshake() returns the new conn state on success, or
2543 CONN_MASK (-1) on failure.
2545 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2546 enum drbd_disk_state peer_disk) __must_hold(local)
2549 enum drbd_conns rv = C_MASK;
2550 enum drbd_disk_state mydisk;
2552 mydisk = mdev->state.disk;
2553 if (mydisk == D_NEGOTIATING)
2554 mydisk = mdev->new_state_tmp.disk;
2556 dev_info(DEV, "drbd_sync_handshake:\n");
2557 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2558 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2559 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2561 hg = drbd_uuid_compare(mdev, &rule_nr);
2563 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2566 dev_alert(DEV, "Unrelated data, aborting!\n");
2570 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2574 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2575 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2576 int f = (hg == -100) || abs(hg) == 2;
2577 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2580 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2581 hg > 0 ? "source" : "target");
2585 drbd_khelper(mdev, "initial-split-brain");
2587 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2588 int pcount = (mdev->state.role == R_PRIMARY)
2589 + (peer_role == R_PRIMARY);
2590 int forced = (hg == -100);
2594 hg = drbd_asb_recover_0p(mdev);
2597 hg = drbd_asb_recover_1p(mdev);
2600 hg = drbd_asb_recover_2p(mdev);
2603 if (abs(hg) < 100) {
2604 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2605 "automatically solved. Sync from %s node\n",
2606 pcount, (hg < 0) ? "peer" : "this");
2608 dev_warn(DEV, "Doing a full sync, since"
2609 " UUIDs where ambiguous.\n");
2616 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2618 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2622 dev_warn(DEV, "Split-Brain detected, manually solved. "
2623 "Sync from %s node\n",
2624 (hg < 0) ? "peer" : "this");
2628 /* FIXME this log message is not correct if we end up here
2629 * after an attempted attach on a diskless node.
2630 * We just refuse to attach -- well, we drop the "connection"
2631 * to that disk, in a way... */
2632 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2633 drbd_khelper(mdev, "split-brain");
2637 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2638 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2642 if (hg < 0 && /* by intention we do not use mydisk here. */
2643 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2644 switch (mdev->net_conf->rr_conflict) {
2645 case ASB_CALL_HELPER:
2646 drbd_khelper(mdev, "pri-lost");
2648 case ASB_DISCONNECT:
2649 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2652 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2657 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2659 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2661 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2662 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2663 abs(hg) >= 2 ? "full" : "bit-map based");
2668 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2669 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2670 BM_LOCKED_SET_ALLOWED))
2674 if (hg > 0) { /* become sync source. */
2676 } else if (hg < 0) { /* become sync target */
2680 if (drbd_bm_total_weight(mdev)) {
2681 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2682 drbd_bm_total_weight(mdev));
2689 /* returns 1 if invalid */
2690 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2692 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2693 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2694 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2697 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2698 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2699 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2702 /* everything else is valid if they are equal on both sides. */
2706 /* everything es is invalid. */
2710 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2712 struct p_protocol *p = &mdev->data.rbuf.protocol;
2713 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2714 int p_want_lose, p_two_primaries, cf;
2715 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2717 p_proto = be32_to_cpu(p->protocol);
2718 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2719 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2720 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2721 p_two_primaries = be32_to_cpu(p->two_primaries);
2722 cf = be32_to_cpu(p->conn_flags);
2723 p_want_lose = cf & CF_WANT_LOSE;
2725 clear_bit(CONN_DRY_RUN, &mdev->flags);
2727 if (cf & CF_DRY_RUN)
2728 set_bit(CONN_DRY_RUN, &mdev->flags);
2730 if (p_proto != mdev->net_conf->wire_protocol) {
2731 dev_err(DEV, "incompatible communication protocols\n");
2735 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2736 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2740 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2741 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2745 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2746 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2750 if (p_want_lose && mdev->net_conf->want_lose) {
2751 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2755 if (p_two_primaries != mdev->net_conf->two_primaries) {
2756 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2760 if (mdev->agreed_pro_version >= 87) {
2761 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2763 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2766 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2767 if (strcmp(p_integrity_alg, my_alg)) {
2768 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2771 dev_info(DEV, "data-integrity-alg: %s\n",
2772 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2778 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2783 * input: alg name, feature name
2784 * return: NULL (alg name was "")
2785 * ERR_PTR(error) if something goes wrong
2786 * or the crypto hash ptr, if it worked out ok. */
2787 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2788 const char *alg, const char *name)
2790 struct crypto_hash *tfm;
2795 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2797 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2798 alg, name, PTR_ERR(tfm));
2801 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2802 crypto_free_hash(tfm);
2803 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2804 return ERR_PTR(-EINVAL);
2809 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2812 struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2813 unsigned int header_size, data_size, exp_max_sz;
2814 struct crypto_hash *verify_tfm = NULL;
2815 struct crypto_hash *csums_tfm = NULL;
2816 const int apv = mdev->agreed_pro_version;
2817 int *rs_plan_s = NULL;
2820 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2821 : apv == 88 ? sizeof(struct p_rs_param)
2823 : apv <= 94 ? sizeof(struct p_rs_param_89)
2824 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2826 if (packet_size > exp_max_sz) {
2827 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2828 packet_size, exp_max_sz);
2833 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2834 data_size = packet_size - header_size;
2835 } else if (apv <= 94) {
2836 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2837 data_size = packet_size - header_size;
2838 D_ASSERT(data_size == 0);
2840 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2841 data_size = packet_size - header_size;
2842 D_ASSERT(data_size == 0);
2845 /* initialize verify_alg and csums_alg */
2846 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2848 if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2851 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2855 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2856 dev_err(DEV, "verify-alg of wrong size, "
2857 "peer wants %u, accepting only up to %u byte\n",
2858 data_size, SHARED_SECRET_MAX);
2862 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2865 /* we expect NUL terminated string */
2866 /* but just in case someone tries to be evil */
2867 D_ASSERT(p->verify_alg[data_size-1] == 0);
2868 p->verify_alg[data_size-1] = 0;
2870 } else /* apv >= 89 */ {
2871 /* we still expect NUL terminated strings */
2872 /* but just in case someone tries to be evil */
2873 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2874 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2875 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2876 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2879 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2880 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2881 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2882 mdev->sync_conf.verify_alg, p->verify_alg);
2885 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2886 p->verify_alg, "verify-alg");
2887 if (IS_ERR(verify_tfm)) {
2893 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2894 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2895 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2896 mdev->sync_conf.csums_alg, p->csums_alg);
2899 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2900 p->csums_alg, "csums-alg");
2901 if (IS_ERR(csums_tfm)) {
2908 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2909 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2910 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2911 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2912 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2914 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2915 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2916 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2918 dev_err(DEV, "kmalloc of fifo_buffer failed");
2924 spin_lock(&mdev->peer_seq_lock);
2925 /* lock against drbd_nl_syncer_conf() */
2927 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2928 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2929 crypto_free_hash(mdev->verify_tfm);
2930 mdev->verify_tfm = verify_tfm;
2931 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2934 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2935 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2936 crypto_free_hash(mdev->csums_tfm);
2937 mdev->csums_tfm = csums_tfm;
2938 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2940 if (fifo_size != mdev->rs_plan_s.size) {
2941 kfree(mdev->rs_plan_s.values);
2942 mdev->rs_plan_s.values = rs_plan_s;
2943 mdev->rs_plan_s.size = fifo_size;
2944 mdev->rs_planed = 0;
2946 spin_unlock(&mdev->peer_seq_lock);
2951 /* just for completeness: actually not needed,
2952 * as this is not reached if csums_tfm was ok. */
2953 crypto_free_hash(csums_tfm);
2954 /* but free the verify_tfm again, if csums_tfm did not work out */
2955 crypto_free_hash(verify_tfm);
2956 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2960 /* warn if the arguments differ by more than 12.5% */
2961 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2962 const char *s, sector_t a, sector_t b)
2965 if (a == 0 || b == 0)
2967 d = (a > b) ? (a - b) : (b - a);
2968 if (d > (a>>3) || d > (b>>3))
2969 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2970 (unsigned long long)a, (unsigned long long)b);
2973 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2975 struct p_sizes *p = &mdev->data.rbuf.sizes;
2976 enum determine_dev_size dd = unchanged;
2977 sector_t p_size, p_usize, my_usize;
2978 int ldsc = 0; /* local disk size changed */
2979 enum dds_flags ddsf;
2981 p_size = be64_to_cpu(p->d_size);
2982 p_usize = be64_to_cpu(p->u_size);
2984 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2985 dev_err(DEV, "some backing storage is needed\n");
2986 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2990 /* just store the peer's disk size for now.
2991 * we still need to figure out whether we accept that. */
2992 mdev->p_size = p_size;
2994 if (get_ldev(mdev)) {
2995 warn_if_differ_considerably(mdev, "lower level device sizes",
2996 p_size, drbd_get_max_capacity(mdev->ldev));
2997 warn_if_differ_considerably(mdev, "user requested size",
2998 p_usize, mdev->ldev->dc.disk_size);
3000 /* if this is the first connect, or an otherwise expected
3001 * param exchange, choose the minimum */
3002 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3003 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3006 my_usize = mdev->ldev->dc.disk_size;
3008 if (mdev->ldev->dc.disk_size != p_usize) {
3009 mdev->ldev->dc.disk_size = p_usize;
3010 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3011 (unsigned long)mdev->ldev->dc.disk_size);
3014 /* Never shrink a device with usable data during connect.
3015 But allow online shrinking if we are connected. */
3016 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3017 drbd_get_capacity(mdev->this_bdev) &&
3018 mdev->state.disk >= D_OUTDATED &&
3019 mdev->state.conn < C_CONNECTED) {
3020 dev_err(DEV, "The peer's disk size is too small!\n");
3021 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3022 mdev->ldev->dc.disk_size = my_usize;
3029 ddsf = be16_to_cpu(p->dds_flags);
3030 if (get_ldev(mdev)) {
3031 dd = drbd_determine_dev_size(mdev, ddsf);
3033 if (dd == dev_size_error)
3037 /* I am diskless, need to accept the peer's size. */
3038 drbd_set_my_capacity(mdev, p_size);
3041 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3042 drbd_reconsider_max_bio_size(mdev);
3044 if (get_ldev(mdev)) {
3045 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3046 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3053 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3054 if (be64_to_cpu(p->c_size) !=
3055 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3056 /* we have different sizes, probably peer
3057 * needs to know my new size... */
3058 drbd_send_sizes(mdev, 0, ddsf);
3060 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3061 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3062 if (mdev->state.pdsk >= D_INCONSISTENT &&
3063 mdev->state.disk >= D_INCONSISTENT) {
3064 if (ddsf & DDSF_NO_RESYNC)
3065 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3067 resync_after_online_grow(mdev);
3069 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3076 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3078 struct p_uuids *p = &mdev->data.rbuf.uuids;
3080 int i, updated_uuids = 0;
3082 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3084 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3085 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3087 kfree(mdev->p_uuid);
3088 mdev->p_uuid = p_uuid;
3090 if (mdev->state.conn < C_CONNECTED &&
3091 mdev->state.disk < D_INCONSISTENT &&
3092 mdev->state.role == R_PRIMARY &&
3093 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3094 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3095 (unsigned long long)mdev->ed_uuid);
3096 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3100 if (get_ldev(mdev)) {
3101 int skip_initial_sync =
3102 mdev->state.conn == C_CONNECTED &&
3103 mdev->agreed_pro_version >= 90 &&
3104 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3105 (p_uuid[UI_FLAGS] & 8);
3106 if (skip_initial_sync) {
3107 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3108 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3109 "clear_n_write from receive_uuids",
3110 BM_LOCKED_TEST_ALLOWED);
3111 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3112 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3113 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3119 } else if (mdev->state.disk < D_INCONSISTENT &&
3120 mdev->state.role == R_PRIMARY) {
3121 /* I am a diskless primary, the peer just created a new current UUID
3123 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3126 /* Before we test for the disk state, we should wait until an eventually
3127 ongoing cluster wide state change is finished. That is important if
3128 we are primary and are detaching from our disk. We need to see the
3129 new disk state... */
3130 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3131 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3132 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3135 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3141 * convert_state() - Converts the peer's view of the cluster state to our point of view
3142 * @ps: The state as seen by the peer.
3144 static union drbd_state convert_state(union drbd_state ps)
3146 union drbd_state ms;
3148 static enum drbd_conns c_tab[] = {
3149 [C_CONNECTED] = C_CONNECTED,
3151 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3152 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3153 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3154 [C_VERIFY_S] = C_VERIFY_T,
3160 ms.conn = c_tab[ps.conn];
3165 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3170 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3172 struct p_req_state *p = &mdev->data.rbuf.req_state;
3173 union drbd_state mask, val;
3174 enum drbd_state_rv rv;
3176 mask.i = be32_to_cpu(p->mask);
3177 val.i = be32_to_cpu(p->val);
3179 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3180 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3181 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3185 mask = convert_state(mask);
3186 val = convert_state(val);
3188 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3190 drbd_send_sr_reply(mdev, rv);
3196 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3198 struct p_state *p = &mdev->data.rbuf.state;
3199 union drbd_state os, ns, peer_state;
3200 enum drbd_disk_state real_peer_disk;
3201 enum chg_state_flags cs_flags;
3204 peer_state.i = be32_to_cpu(p->state);
3206 real_peer_disk = peer_state.disk;
3207 if (peer_state.disk == D_NEGOTIATING) {
3208 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3209 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3212 spin_lock_irq(&mdev->req_lock);
3214 os = ns = mdev->state;
3215 spin_unlock_irq(&mdev->req_lock);
3217 /* If some other part of the code (asender thread, timeout)
3218 * already decided to close the connection again,
3219 * we must not "re-establish" it here. */
3220 if (os.conn <= C_TEAR_DOWN)
3223 /* If this is the "end of sync" confirmation, usually the peer disk
3224 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3225 * set) resync started in PausedSyncT, or if the timing of pause-/
3226 * unpause-sync events has been "just right", the peer disk may
3227 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3229 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3230 real_peer_disk == D_UP_TO_DATE &&
3231 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3232 /* If we are (becoming) SyncSource, but peer is still in sync
3233 * preparation, ignore its uptodate-ness to avoid flapping, it
3234 * will change to inconsistent once the peer reaches active
3236 * It may have changed syncer-paused flags, however, so we
3237 * cannot ignore this completely. */
3238 if (peer_state.conn > C_CONNECTED &&
3239 peer_state.conn < C_SYNC_SOURCE)
3240 real_peer_disk = D_INCONSISTENT;
3242 /* if peer_state changes to connected at the same time,
3243 * it explicitly notifies us that it finished resync.
3244 * Maybe we should finish it up, too? */
3245 else if (os.conn >= C_SYNC_SOURCE &&
3246 peer_state.conn == C_CONNECTED) {
3247 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3248 drbd_resync_finished(mdev);
3253 /* peer says his disk is inconsistent, while we think it is uptodate,
3254 * and this happens while the peer still thinks we have a sync going on,
3255 * but we think we are already done with the sync.
3256 * We ignore this to avoid flapping pdsk.
3257 * This should not happen, if the peer is a recent version of drbd. */
3258 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3259 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3260 real_peer_disk = D_UP_TO_DATE;
3262 if (ns.conn == C_WF_REPORT_PARAMS)
3263 ns.conn = C_CONNECTED;
3265 if (peer_state.conn == C_AHEAD)
3268 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3269 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3270 int cr; /* consider resync */
3272 /* if we established a new connection */
3273 cr = (os.conn < C_CONNECTED);
3274 /* if we had an established connection
3275 * and one of the nodes newly attaches a disk */
3276 cr |= (os.conn == C_CONNECTED &&
3277 (peer_state.disk == D_NEGOTIATING ||
3278 os.disk == D_NEGOTIATING));
3279 /* if we have both been inconsistent, and the peer has been
3280 * forced to be UpToDate with --overwrite-data */
3281 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3282 /* if we had been plain connected, and the admin requested to
3283 * start a sync by "invalidate" or "invalidate-remote" */
3284 cr |= (os.conn == C_CONNECTED &&
3285 (peer_state.conn >= C_STARTING_SYNC_S &&
3286 peer_state.conn <= C_WF_BITMAP_T));
3289 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3292 if (ns.conn == C_MASK) {
3293 ns.conn = C_CONNECTED;
3294 if (mdev->state.disk == D_NEGOTIATING) {
3295 drbd_force_state(mdev, NS(disk, D_FAILED));
3296 } else if (peer_state.disk == D_NEGOTIATING) {
3297 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3298 peer_state.disk = D_DISKLESS;
3299 real_peer_disk = D_DISKLESS;
3301 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3303 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3304 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3310 spin_lock_irq(&mdev->req_lock);
3311 if (mdev->state.i != os.i)
3313 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3314 ns.peer = peer_state.role;
3315 ns.pdsk = real_peer_disk;
3316 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3317 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3318 ns.disk = mdev->new_state_tmp.disk;
3319 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3320 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3321 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3322 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3323 for temporal network outages! */
3324 spin_unlock_irq(&mdev->req_lock);
3325 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3327 drbd_uuid_new_current(mdev);
3328 clear_bit(NEW_CUR_UUID, &mdev->flags);
3329 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3332 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3334 spin_unlock_irq(&mdev->req_lock);
3336 if (rv < SS_SUCCESS) {
3337 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3341 if (os.conn > C_WF_REPORT_PARAMS) {
3342 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3343 peer_state.disk != D_NEGOTIATING ) {
3344 /* we want resync, peer has not yet decided to sync... */
3345 /* Nowadays only used when forcing a node into primary role and
3346 setting its disk to UpToDate with that */
3347 drbd_send_uuids(mdev);
3348 drbd_send_current_state(mdev);
3352 mdev->net_conf->want_lose = 0;
3354 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3359 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3361 struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3363 wait_event(mdev->misc_wait,
3364 mdev->state.conn == C_WF_SYNC_UUID ||
3365 mdev->state.conn == C_BEHIND ||
3366 mdev->state.conn < C_CONNECTED ||
3367 mdev->state.disk < D_NEGOTIATING);
3369 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3371 /* Here the _drbd_uuid_ functions are right, current should
3372 _not_ be rotated into the history */
3373 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3374 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3375 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3377 drbd_print_uuids(mdev, "updated sync uuid");
3378 drbd_start_resync(mdev, C_SYNC_TARGET);
3382 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3388 * receive_bitmap_plain
3390 * Return 0 when done, 1 when another iteration is needed, and a negative error
3391 * code upon failure.
3394 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3395 unsigned long *buffer, struct bm_xfer_ctx *c)
3397 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3398 unsigned want = num_words * sizeof(long);
3401 if (want != data_size) {
3402 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3407 err = drbd_recv(mdev, buffer, want);
3414 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3416 c->word_offset += num_words;
3417 c->bit_offset = c->word_offset * BITS_PER_LONG;
3418 if (c->bit_offset > c->bm_bits)
3419 c->bit_offset = c->bm_bits;
3427 * Return 0 when done, 1 when another iteration is needed, and a negative error
3428 * code upon failure.
3431 recv_bm_rle_bits(struct drbd_conf *mdev,
3432 struct p_compressed_bm *p,
3433 struct bm_xfer_ctx *c)
3435 struct bitstream bs;
3439 unsigned long s = c->bit_offset;
3441 int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3442 int toggle = DCBP_get_start(p);
3446 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3448 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3452 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3453 bits = vli_decode_bits(&rl, look_ahead);
3459 if (e >= c->bm_bits) {
3460 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3463 _drbd_bm_set_bits(mdev, s, e);
3467 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3468 have, bits, look_ahead,
3469 (unsigned int)(bs.cur.b - p->code),
3470 (unsigned int)bs.buf_len);
3473 look_ahead >>= bits;
3476 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3479 look_ahead |= tmp << have;
3484 bm_xfer_ctx_bit_to_word_offset(c);
3486 return (s != c->bm_bits);
3492 * Return 0 when done, 1 when another iteration is needed, and a negative error
3493 * code upon failure.
3496 decode_bitmap_c(struct drbd_conf *mdev,
3497 struct p_compressed_bm *p,
3498 struct bm_xfer_ctx *c)
3500 if (DCBP_get_code(p) == RLE_VLI_Bits)
3501 return recv_bm_rle_bits(mdev, p, c);
3503 /* other variants had been implemented for evaluation,
3504 * but have been dropped as this one turned out to be "best"
3505 * during all our tests. */
3507 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3508 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3512 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3513 const char *direction, struct bm_xfer_ctx *c)
3515 /* what would it take to transfer it "plaintext" */
3516 unsigned plain = sizeof(struct p_header80) *
3517 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3518 + c->bm_words * sizeof(long);
3519 unsigned total = c->bytes[0] + c->bytes[1];
3522 /* total can not be zero. but just in case: */
3526 /* don't report if not compressed */
3530 /* total < plain. check for overflow, still */
3531 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3532 : (1000 * total / plain);
3538 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3539 "total %u; compression: %u.%u%%\n",
3541 c->bytes[1], c->packets[1],
3542 c->bytes[0], c->packets[0],
3543 total, r/10, r % 10);
3546 /* Since we are processing the bitfield from lower addresses to higher,
3547 it does not matter if the process it in 32 bit chunks or 64 bit
3548 chunks as long as it is little endian. (Understand it as byte stream,
3549 beginning with the lowest byte...) If we would use big endian
3550 we would need to process it from the highest address to the lowest,
3551 in order to be agnostic to the 32 vs 64 bits issue.
3553 returns 0 on failure, 1 if we successfully received it. */
3554 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3556 struct bm_xfer_ctx c;
3560 struct p_header80 *h = &mdev->data.rbuf.header.h80;
3562 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3563 /* you are supposed to send additional out-of-sync information
3564 * if you actually set bits during this phase */
3566 /* maybe we should use some per thread scratch page,
3567 * and allocate that during initial device creation? */
3568 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3570 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3574 c = (struct bm_xfer_ctx) {
3575 .bm_bits = drbd_bm_bits(mdev),
3576 .bm_words = drbd_bm_words(mdev),
3580 if (cmd == P_BITMAP) {
3581 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3582 } else if (cmd == P_COMPRESSED_BITMAP) {
3583 /* MAYBE: sanity check that we speak proto >= 90,
3584 * and the feature is enabled! */
3585 struct p_compressed_bm *p;
3587 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3588 dev_err(DEV, "ReportCBitmap packet too large\n");
3591 /* use the page buff */
3593 memcpy(p, h, sizeof(*h));
3594 if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3596 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3597 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3600 err = decode_bitmap_c(mdev, p, &c);
3602 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3606 c.packets[cmd == P_BITMAP]++;
3607 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3614 if (!drbd_recv_header(mdev, &cmd, &data_size))
3618 INFO_bm_xfer_stats(mdev, "receive", &c);
3620 if (mdev->state.conn == C_WF_BITMAP_T) {
3621 enum drbd_state_rv rv;
3623 ok = !drbd_send_bitmap(mdev);
3626 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3627 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3628 D_ASSERT(rv == SS_SUCCESS);
3629 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3630 /* admin may have requested C_DISCONNECTING,
3631 * other threads may have noticed network errors */
3632 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3633 drbd_conn_str(mdev->state.conn));
3638 drbd_bm_unlock(mdev);
3639 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3640 drbd_start_resync(mdev, C_SYNC_SOURCE);
3641 free_page((unsigned long) buffer);
3645 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3647 /* TODO zero copy sink :) */
3648 static char sink[128];
3651 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3656 want = min_t(int, size, sizeof(sink));
3657 r = drbd_recv(mdev, sink, want);
3658 ERR_IF(r <= 0) break;
3664 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3666 /* Make sure we've acked all the TCP data associated
3667 * with the data requests being unplugged */
3668 drbd_tcp_quickack(mdev->data.socket);
3673 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3675 struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3677 switch (mdev->state.conn) {
3678 case C_WF_SYNC_UUID:
3683 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3684 drbd_conn_str(mdev->state.conn));
3687 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3692 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3697 drbd_cmd_handler_f function;
3700 static struct data_cmd drbd_cmd_handler[] = {
3701 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3702 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3703 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3704 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3705 [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3706 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3707 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3708 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3709 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3710 [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam },
3711 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam },
3712 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3713 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3714 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3715 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3716 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3717 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3718 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3719 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3720 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3721 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3722 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3723 /* anything missing from this table is in
3724 * the asender_tbl, see get_asender_cmd */
3725 [P_MAX_CMD] = { 0, 0, NULL },
3728 /* All handler functions that expect a sub-header get that sub-heder in
3729 mdev->data.rbuf.header.head.payload.
3731 Usually in mdev->data.rbuf.header.head the callback can find the usual
3732 p_header, but they may not rely on that. Since there is also p_header95 !
3735 static void drbdd(struct drbd_conf *mdev)
3737 union p_header *header = &mdev->data.rbuf.header;
3738 unsigned int packet_size;
3739 enum drbd_packets cmd;
3740 size_t shs; /* sub header size */
3743 while (get_t_state(&mdev->receiver) == Running) {
3744 drbd_thread_current_set_cpu(mdev);
3745 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3748 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3749 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3753 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3754 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3755 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3760 rv = drbd_recv(mdev, &header->h80.payload, shs);
3761 if (unlikely(rv != shs)) {
3762 if (!signal_pending(current))
3763 dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3768 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3770 if (unlikely(!rv)) {
3771 dev_err(DEV, "error receiving %s, l: %d!\n",
3772 cmdname(cmd), packet_size);
3779 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3781 /* If we leave here, we probably want to update at least the
3782 * "Connected" indicator on stable storage. Do so explicitly here. */
3786 void drbd_flush_workqueue(struct drbd_conf *mdev)
3788 struct drbd_wq_barrier barr;
3790 barr.w.cb = w_prev_work_done;
3791 init_completion(&barr.done);
3792 drbd_queue_work(&mdev->data.work, &barr.w);
3793 wait_for_completion(&barr.done);
3796 void drbd_free_tl_hash(struct drbd_conf *mdev)
3798 struct hlist_head *h;
3800 spin_lock_irq(&mdev->req_lock);
3802 if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3803 spin_unlock_irq(&mdev->req_lock);
3807 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3809 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3810 (int)(h - mdev->ee_hash), h->first);
3811 kfree(mdev->ee_hash);
3812 mdev->ee_hash = NULL;
3813 mdev->ee_hash_s = 0;
3815 /* We may not have had the chance to wait for all locally pending
3816 * application requests. The hlist_add_fake() prevents access after
3817 * free on master bio completion. */
3818 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
3819 struct drbd_request *req;
3820 struct hlist_node *pos, *n;
3821 hlist_for_each_entry_safe(req, pos, n, h, collision) {
3822 hlist_del_init(&req->collision);
3823 hlist_add_fake(&req->collision);
3827 kfree(mdev->tl_hash);
3828 mdev->tl_hash = NULL;
3829 mdev->tl_hash_s = 0;
3830 spin_unlock_irq(&mdev->req_lock);
3833 static void drbd_disconnect(struct drbd_conf *mdev)
3835 enum drbd_fencing_p fp;
3836 union drbd_state os, ns;
3837 int rv = SS_UNKNOWN_ERROR;
3840 if (mdev->state.conn == C_STANDALONE)
3843 /* We are about to start the cleanup after connection loss.
3844 * Make sure drbd_make_request knows about that.
3845 * Usually we should be in some network failure state already,
3846 * but just in case we are not, we fix it up here.
3848 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3850 /* asender does not clean up anything. it must not interfere, either */
3851 drbd_thread_stop(&mdev->asender);
3852 drbd_free_sock(mdev);
3854 /* wait for current activity to cease. */
3855 spin_lock_irq(&mdev->req_lock);
3856 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3857 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3858 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3859 spin_unlock_irq(&mdev->req_lock);
3861 /* We do not have data structures that would allow us to
3862 * get the rs_pending_cnt down to 0 again.
3863 * * On C_SYNC_TARGET we do not have any data structures describing
3864 * the pending RSDataRequest's we have sent.
3865 * * On C_SYNC_SOURCE there is no data structure that tracks
3866 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3867 * And no, it is not the sum of the reference counts in the
3868 * resync_LRU. The resync_LRU tracks the whole operation including
3869 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3871 drbd_rs_cancel_all(mdev);
3873 mdev->rs_failed = 0;
3874 atomic_set(&mdev->rs_pending_cnt, 0);
3875 wake_up(&mdev->misc_wait);
3877 /* make sure syncer is stopped and w_resume_next_sg queued */
3878 del_timer_sync(&mdev->resync_timer);
3879 resync_timer_fn((unsigned long)mdev);
3881 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3882 * w_make_resync_request etc. which may still be on the worker queue
3883 * to be "canceled" */
3884 drbd_flush_workqueue(mdev);
3886 /* This also does reclaim_net_ee(). If we do this too early, we might
3887 * miss some resync ee and pages.*/
3888 drbd_process_done_ee(mdev);
3890 kfree(mdev->p_uuid);
3891 mdev->p_uuid = NULL;
3893 if (!is_susp(mdev->state))
3896 dev_info(DEV, "Connection closed\n");
3901 if (get_ldev(mdev)) {
3902 fp = mdev->ldev->dc.fencing;
3906 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3907 drbd_try_outdate_peer_async(mdev);
3909 spin_lock_irq(&mdev->req_lock);
3911 if (os.conn >= C_UNCONNECTED) {
3912 /* Do not restart in case we are C_DISCONNECTING */
3914 ns.conn = C_UNCONNECTED;
3915 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3917 spin_unlock_irq(&mdev->req_lock);
3919 if (os.conn == C_DISCONNECTING) {
3920 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3922 crypto_free_hash(mdev->cram_hmac_tfm);
3923 mdev->cram_hmac_tfm = NULL;
3925 kfree(mdev->net_conf);
3926 mdev->net_conf = NULL;
3927 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3930 /* serialize with bitmap writeout triggered by the state change,
3932 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3934 /* tcp_close and release of sendpage pages can be deferred. I don't
3935 * want to use SO_LINGER, because apparently it can be deferred for
3936 * more than 20 seconds (longest time I checked).
3938 * Actually we don't care for exactly when the network stack does its
3939 * put_page(), but release our reference on these pages right here.
3941 i = drbd_release_ee(mdev, &mdev->net_ee);
3943 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3944 i = atomic_read(&mdev->pp_in_use_by_net);
3946 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3947 i = atomic_read(&mdev->pp_in_use);
3949 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3951 D_ASSERT(list_empty(&mdev->read_ee));
3952 D_ASSERT(list_empty(&mdev->active_ee));
3953 D_ASSERT(list_empty(&mdev->sync_ee));
3954 D_ASSERT(list_empty(&mdev->done_ee));
3956 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3957 atomic_set(&mdev->current_epoch->epoch_size, 0);
3958 D_ASSERT(list_empty(&mdev->current_epoch->list));
3962 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3963 * we can agree on is stored in agreed_pro_version.
3965 * feature flags and the reserved array should be enough room for future
3966 * enhancements of the handshake protocol, and possible plugins...
3968 * for now, they are expected to be zero, but ignored.
3970 static int drbd_send_handshake(struct drbd_conf *mdev)
3972 /* ASSERT current == mdev->receiver ... */
3973 struct p_handshake *p = &mdev->data.sbuf.handshake;
3976 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3977 dev_err(DEV, "interrupted during initial handshake\n");
3978 return 0; /* interrupted. not ok. */
3981 if (mdev->data.socket == NULL) {
3982 mutex_unlock(&mdev->data.mutex);
3986 memset(p, 0, sizeof(*p));
3987 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3988 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3989 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3990 (struct p_header80 *)p, sizeof(*p), 0 );
3991 mutex_unlock(&mdev->data.mutex);
3997 * 1 yes, we have a valid connection
3998 * 0 oops, did not work out, please try again
3999 * -1 peer talks different language,
4000 * no point in trying again, please go standalone.
4002 static int drbd_do_handshake(struct drbd_conf *mdev)
4004 /* ASSERT current == mdev->receiver ... */
4005 struct p_handshake *p = &mdev->data.rbuf.handshake;
4006 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4007 unsigned int length;
4008 enum drbd_packets cmd;
4011 rv = drbd_send_handshake(mdev);
4015 rv = drbd_recv_header(mdev, &cmd, &length);
4019 if (cmd != P_HAND_SHAKE) {
4020 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4025 if (length != expect) {
4026 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4031 rv = drbd_recv(mdev, &p->head.payload, expect);
4034 if (!signal_pending(current))
4035 dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4039 p->protocol_min = be32_to_cpu(p->protocol_min);
4040 p->protocol_max = be32_to_cpu(p->protocol_max);
4041 if (p->protocol_max == 0)
4042 p->protocol_max = p->protocol_min;
4044 if (PRO_VERSION_MAX < p->protocol_min ||
4045 PRO_VERSION_MIN > p->protocol_max)
4048 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4050 dev_info(DEV, "Handshake successful: "
4051 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4056 dev_err(DEV, "incompatible DRBD dialects: "
4057 "I support %d-%d, peer supports %d-%d\n",
4058 PRO_VERSION_MIN, PRO_VERSION_MAX,
4059 p->protocol_min, p->protocol_max);
4063 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4064 static int drbd_do_auth(struct drbd_conf *mdev)
4066 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4067 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4071 #define CHALLENGE_LEN 64
4075 0 - failed, try again (network error),
4076 -1 - auth failed, don't try again.
4079 static int drbd_do_auth(struct drbd_conf *mdev)
4081 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4082 struct scatterlist sg;
4083 char *response = NULL;
4084 char *right_response = NULL;
4085 char *peers_ch = NULL;
4086 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4087 unsigned int resp_size;
4088 struct hash_desc desc;
4089 enum drbd_packets cmd;
4090 unsigned int length;
4093 desc.tfm = mdev->cram_hmac_tfm;
4096 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4097 (u8 *)mdev->net_conf->shared_secret, key_len);
4099 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4104 get_random_bytes(my_challenge, CHALLENGE_LEN);
4106 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4110 rv = drbd_recv_header(mdev, &cmd, &length);
4114 if (cmd != P_AUTH_CHALLENGE) {
4115 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4121 if (length > CHALLENGE_LEN * 2) {
4122 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4127 peers_ch = kmalloc(length, GFP_NOIO);
4128 if (peers_ch == NULL) {
4129 dev_err(DEV, "kmalloc of peers_ch failed\n");
4134 rv = drbd_recv(mdev, peers_ch, length);
4137 if (!signal_pending(current))
4138 dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4143 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4144 response = kmalloc(resp_size, GFP_NOIO);
4145 if (response == NULL) {
4146 dev_err(DEV, "kmalloc of response failed\n");
4151 sg_init_table(&sg, 1);
4152 sg_set_buf(&sg, peers_ch, length);
4154 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4156 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4161 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4165 rv = drbd_recv_header(mdev, &cmd, &length);
4169 if (cmd != P_AUTH_RESPONSE) {
4170 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4176 if (length != resp_size) {
4177 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4182 rv = drbd_recv(mdev, response , resp_size);
4184 if (rv != resp_size) {
4185 if (!signal_pending(current))
4186 dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4191 right_response = kmalloc(resp_size, GFP_NOIO);
4192 if (right_response == NULL) {
4193 dev_err(DEV, "kmalloc of right_response failed\n");
4198 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4200 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4202 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4207 rv = !memcmp(response, right_response, resp_size);
4210 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4211 resp_size, mdev->net_conf->cram_hmac_alg);
4218 kfree(right_response);
4224 int drbdd_init(struct drbd_thread *thi)
4226 struct drbd_conf *mdev = thi->mdev;
4227 unsigned int minor = mdev_to_minor(mdev);
4230 sprintf(current->comm, "drbd%d_receiver", minor);
4232 dev_info(DEV, "receiver (re)started\n");
4235 h = drbd_connect(mdev);
4237 drbd_disconnect(mdev);
4238 schedule_timeout_interruptible(HZ);
4241 dev_warn(DEV, "Discarding network configuration.\n");
4242 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4247 if (get_net_conf(mdev)) {
4253 drbd_disconnect(mdev);
4255 dev_info(DEV, "receiver terminated\n");
4259 /* ********* acknowledge sender ******** */
4261 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4263 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4265 int retcode = be32_to_cpu(p->retcode);
4267 if (retcode >= SS_SUCCESS) {
4268 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4270 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4271 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4272 drbd_set_st_err_str(retcode), retcode);
4274 wake_up(&mdev->state_wait);
4279 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4281 return drbd_send_ping_ack(mdev);
4285 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4287 /* restore idle timeout */
4288 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4289 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4290 wake_up(&mdev->misc_wait);
4295 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4297 struct p_block_ack *p = (struct p_block_ack *)h;
4298 sector_t sector = be64_to_cpu(p->sector);
4299 int blksize = be32_to_cpu(p->blksize);
4301 D_ASSERT(mdev->agreed_pro_version >= 89);
4303 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4305 if (get_ldev(mdev)) {
4306 drbd_rs_complete_io(mdev, sector);
4307 drbd_set_in_sync(mdev, sector, blksize);
4308 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4309 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4312 dec_rs_pending(mdev);
4313 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4318 /* when we receive the ACK for a write request,
4319 * verify that we actually know about it */
4320 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4321 u64 id, sector_t sector)
4323 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4324 struct hlist_node *n;
4325 struct drbd_request *req;
4327 hlist_for_each_entry(req, n, slot, collision) {
4328 if ((unsigned long)req == (unsigned long)id) {
4329 if (req->sector != sector) {
4330 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4331 "wrong sector (%llus versus %llus)\n", req,
4332 (unsigned long long)req->sector,
4333 (unsigned long long)sector);
4342 typedef struct drbd_request *(req_validator_fn)
4343 (struct drbd_conf *mdev, u64 id, sector_t sector);
4345 static int validate_req_change_req_state(struct drbd_conf *mdev,
4346 u64 id, sector_t sector, req_validator_fn validator,
4347 const char *func, enum drbd_req_event what)
4349 struct drbd_request *req;
4350 struct bio_and_error m;
4352 spin_lock_irq(&mdev->req_lock);
4353 req = validator(mdev, id, sector);
4354 if (unlikely(!req)) {
4355 spin_unlock_irq(&mdev->req_lock);
4357 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4358 (void *)(unsigned long)id, (unsigned long long)sector);
4361 __req_mod(req, what, &m);
4362 spin_unlock_irq(&mdev->req_lock);
4365 complete_master_bio(mdev, &m);
4369 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4371 struct p_block_ack *p = (struct p_block_ack *)h;
4372 sector_t sector = be64_to_cpu(p->sector);
4373 int blksize = be32_to_cpu(p->blksize);
4374 enum drbd_req_event what;
4376 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4378 if (is_syncer_block_id(p->block_id)) {
4379 drbd_set_in_sync(mdev, sector, blksize);
4380 dec_rs_pending(mdev);
4383 switch (be16_to_cpu(h->command)) {
4384 case P_RS_WRITE_ACK:
4385 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4386 what = write_acked_by_peer_and_sis;
4389 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4390 what = write_acked_by_peer;
4393 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4394 what = recv_acked_by_peer;
4397 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4398 what = conflict_discarded_by_peer;
4405 return validate_req_change_req_state(mdev, p->block_id, sector,
4406 _ack_id_to_req, __func__ , what);
4409 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4411 struct p_block_ack *p = (struct p_block_ack *)h;
4412 sector_t sector = be64_to_cpu(p->sector);
4413 int size = be32_to_cpu(p->blksize);
4414 struct drbd_request *req;
4415 struct bio_and_error m;
4417 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4419 if (is_syncer_block_id(p->block_id)) {
4420 dec_rs_pending(mdev);
4421 drbd_rs_failed_io(mdev, sector, size);
4425 spin_lock_irq(&mdev->req_lock);
4426 req = _ack_id_to_req(mdev, p->block_id, sector);
4428 spin_unlock_irq(&mdev->req_lock);
4429 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4430 mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4431 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4432 The master bio might already be completed, therefore the
4433 request is no longer in the collision hash.
4434 => Do not try to validate block_id as request. */
4435 /* In Protocol B we might already have got a P_RECV_ACK
4436 but then get a P_NEG_ACK after wards. */
4437 drbd_set_out_of_sync(mdev, sector, size);
4440 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4441 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4445 __req_mod(req, neg_acked, &m);
4446 spin_unlock_irq(&mdev->req_lock);
4449 complete_master_bio(mdev, &m);
4453 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4455 struct p_block_ack *p = (struct p_block_ack *)h;
4456 sector_t sector = be64_to_cpu(p->sector);
4458 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4459 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4460 (unsigned long long)sector, be32_to_cpu(p->blksize));
4462 return validate_req_change_req_state(mdev, p->block_id, sector,
4463 _ar_id_to_req, __func__ , neg_acked);
4466 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4470 struct p_block_ack *p = (struct p_block_ack *)h;
4472 sector = be64_to_cpu(p->sector);
4473 size = be32_to_cpu(p->blksize);
4475 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4477 dec_rs_pending(mdev);
4479 if (get_ldev_if_state(mdev, D_FAILED)) {
4480 drbd_rs_complete_io(mdev, sector);
4481 switch (be16_to_cpu(h->command)) {
4482 case P_NEG_RS_DREPLY:
4483 drbd_rs_failed_io(mdev, sector, size);
4497 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4499 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4501 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4503 if (mdev->state.conn == C_AHEAD &&
4504 atomic_read(&mdev->ap_in_flight) == 0 &&
4505 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
4506 mdev->start_resync_timer.expires = jiffies + HZ;
4507 add_timer(&mdev->start_resync_timer);
4513 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4515 struct p_block_ack *p = (struct p_block_ack *)h;
4516 struct drbd_work *w;
4520 sector = be64_to_cpu(p->sector);
4521 size = be32_to_cpu(p->blksize);
4523 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4525 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4526 drbd_ov_oos_found(mdev, sector, size);
4530 if (!get_ldev(mdev))
4533 drbd_rs_complete_io(mdev, sector);
4534 dec_rs_pending(mdev);
4538 /* let's advance progress step marks only for every other megabyte */
4539 if ((mdev->ov_left & 0x200) == 0x200)
4540 drbd_advance_rs_marks(mdev, mdev->ov_left);
4542 if (mdev->ov_left == 0) {
4543 w = kmalloc(sizeof(*w), GFP_NOIO);
4545 w->cb = w_ov_finished;
4546 drbd_queue_work_front(&mdev->data.work, w);
4548 dev_err(DEV, "kmalloc(w) failed.");
4550 drbd_resync_finished(mdev);
4557 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4562 struct asender_cmd {
4564 int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4567 static struct asender_cmd *get_asender_cmd(int cmd)
4569 static struct asender_cmd asender_tbl[] = {
4570 /* anything missing from this table is in
4571 * the drbd_cmd_handler (drbd_default_handler) table,
4572 * see the beginning of drbdd() */
4573 [P_PING] = { sizeof(struct p_header80), got_Ping },
4574 [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck },
4575 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4576 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4577 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4578 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4579 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4580 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4581 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4582 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4583 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4584 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4585 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4586 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4587 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
4588 [P_MAX_CMD] = { 0, NULL },
4590 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4592 return &asender_tbl[cmd];
4595 int drbd_asender(struct drbd_thread *thi)
4597 struct drbd_conf *mdev = thi->mdev;
4598 struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4599 struct asender_cmd *cmd = NULL;
4604 int expect = sizeof(struct p_header80);
4606 int ping_timeout_active = 0;
4608 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4610 current->policy = SCHED_RR; /* Make this a realtime task! */
4611 current->rt_priority = 2; /* more important than all other tasks */
4613 while (get_t_state(thi) == Running) {
4614 drbd_thread_current_set_cpu(mdev);
4615 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4616 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4617 mdev->meta.socket->sk->sk_rcvtimeo =
4618 mdev->net_conf->ping_timeo*HZ/10;
4619 ping_timeout_active = 1;
4622 /* conditionally cork;
4623 * it may hurt latency if we cork without much to send */
4624 if (!mdev->net_conf->no_cork &&
4625 3 < atomic_read(&mdev->unacked_cnt))
4626 drbd_tcp_cork(mdev->meta.socket);
4628 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4629 flush_signals(current);
4630 if (!drbd_process_done_ee(mdev))
4632 /* to avoid race with newly queued ACKs */
4633 set_bit(SIGNAL_ASENDER, &mdev->flags);
4634 spin_lock_irq(&mdev->req_lock);
4635 empty = list_empty(&mdev->done_ee);
4636 spin_unlock_irq(&mdev->req_lock);
4637 /* new ack may have been queued right here,
4638 * but then there is also a signal pending,
4639 * and we start over... */
4643 /* but unconditionally uncork unless disabled */
4644 if (!mdev->net_conf->no_cork)
4645 drbd_tcp_uncork(mdev->meta.socket);
4647 /* short circuit, recv_msg would return EINTR anyways. */
4648 if (signal_pending(current))
4651 rv = drbd_recv_short(mdev, mdev->meta.socket,
4652 buf, expect-received, 0);
4653 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4655 flush_signals(current);
4658 * -EINTR (on meta) we got a signal
4659 * -EAGAIN (on meta) rcvtimeo expired
4660 * -ECONNRESET other side closed the connection
4661 * -ERESTARTSYS (on data) we got a signal
4662 * rv < 0 other than above: unexpected error!
4663 * rv == expected: full header or command
4664 * rv < expected: "woken" by signal during receive
4665 * rv == 0 : "connection shut down by peer"
4667 if (likely(rv > 0)) {
4670 } else if (rv == 0) {
4671 dev_err(DEV, "meta connection shut down by peer.\n");
4673 } else if (rv == -EAGAIN) {
4674 /* If the data socket received something meanwhile,
4675 * that is good enough: peer is still alive. */
4676 if (time_after(mdev->last_received,
4677 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4679 if (ping_timeout_active) {
4680 dev_err(DEV, "PingAck did not arrive in time.\n");
4683 set_bit(SEND_PING, &mdev->flags);
4685 } else if (rv == -EINTR) {
4688 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4692 if (received == expect && cmd == NULL) {
4693 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4694 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4695 be32_to_cpu(h->magic),
4696 be16_to_cpu(h->command),
4697 be16_to_cpu(h->length));
4700 cmd = get_asender_cmd(be16_to_cpu(h->command));
4701 len = be16_to_cpu(h->length);
4702 if (unlikely(cmd == NULL)) {
4703 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4704 be32_to_cpu(h->magic),
4705 be16_to_cpu(h->command),
4706 be16_to_cpu(h->length));
4709 expect = cmd->pkt_size;
4710 ERR_IF(len != expect-sizeof(struct p_header80))
4713 if (received == expect) {
4714 mdev->last_received = jiffies;
4715 D_ASSERT(cmd != NULL);
4716 if (!cmd->process(mdev, h))
4719 /* the idle_timeout (ping-int)
4720 * has been restored in got_PingAck() */
4721 if (cmd == get_asender_cmd(P_PING_ACK))
4722 ping_timeout_active = 0;
4726 expect = sizeof(struct p_header80);
4733 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4738 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4741 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4743 D_ASSERT(mdev->state.conn < C_CONNECTED);
4744 dev_info(DEV, "asender terminated\n");