libceph: fix corruption when using page_count 0 page in rbd
[pandora-kernel.git] / net / ceph / messenger.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/crc32c.h>
4 #include <linux/ctype.h>
5 #include <linux/highmem.h>
6 #include <linux/inet.h>
7 #include <linux/kthread.h>
8 #include <linux/net.h>
9 #include <linux/slab.h>
10 #include <linux/socket.h>
11 #include <linux/string.h>
12 #include <linux/bio.h>
13 #include <linux/blkdev.h>
14 #include <linux/dns_resolver.h>
15 #include <net/tcp.h>
16
17 #include <linux/ceph/libceph.h>
18 #include <linux/ceph/messenger.h>
19 #include <linux/ceph/decode.h>
20 #include <linux/ceph/pagelist.h>
21 #include <linux/export.h>
22
23 /*
24  * Ceph uses the messenger to exchange ceph_msg messages with other
25  * hosts in the system.  The messenger provides ordered and reliable
26  * delivery.  We tolerate TCP disconnects by reconnecting (with
27  * exponential backoff) in the case of a fault (disconnection, bad
28  * crc, protocol error).  Acks allow sent messages to be discarded by
29  * the sender.
30  */
31
32 /* static tag bytes (protocol control messages) */
33 static char tag_msg = CEPH_MSGR_TAG_MSG;
34 static char tag_ack = CEPH_MSGR_TAG_ACK;
35 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
36
37 #ifdef CONFIG_LOCKDEP
38 static struct lock_class_key socket_class;
39 #endif
40
41
42 static void queue_con(struct ceph_connection *con);
43 static void con_work(struct work_struct *);
44 static void ceph_fault(struct ceph_connection *con);
45
46 /*
47  * nicely render a sockaddr as a string.
48  */
49 #define MAX_ADDR_STR 20
50 #define MAX_ADDR_STR_LEN 60
51 static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
52 static DEFINE_SPINLOCK(addr_str_lock);
53 static int last_addr_str;
54
55 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
56 {
57         int i;
58         char *s;
59         struct sockaddr_in *in4 = (void *)ss;
60         struct sockaddr_in6 *in6 = (void *)ss;
61
62         spin_lock(&addr_str_lock);
63         i = last_addr_str++;
64         if (last_addr_str == MAX_ADDR_STR)
65                 last_addr_str = 0;
66         spin_unlock(&addr_str_lock);
67         s = addr_str[i];
68
69         switch (ss->ss_family) {
70         case AF_INET:
71                 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
72                          (unsigned int)ntohs(in4->sin_port));
73                 break;
74
75         case AF_INET6:
76                 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
77                          (unsigned int)ntohs(in6->sin6_port));
78                 break;
79
80         default:
81                 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
82                          (int)ss->ss_family);
83         }
84
85         return s;
86 }
87 EXPORT_SYMBOL(ceph_pr_addr);
88
89 static void encode_my_addr(struct ceph_messenger *msgr)
90 {
91         memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
92         ceph_encode_addr(&msgr->my_enc_addr);
93 }
94
95 /*
96  * work queue for all reading and writing to/from the socket.
97  */
98 struct workqueue_struct *ceph_msgr_wq;
99
100 int ceph_msgr_init(void)
101 {
102         ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
103         if (!ceph_msgr_wq) {
104                 pr_err("msgr_init failed to create workqueue\n");
105                 return -ENOMEM;
106         }
107         return 0;
108 }
109 EXPORT_SYMBOL(ceph_msgr_init);
110
111 void ceph_msgr_exit(void)
112 {
113         destroy_workqueue(ceph_msgr_wq);
114 }
115 EXPORT_SYMBOL(ceph_msgr_exit);
116
117 void ceph_msgr_flush(void)
118 {
119         flush_workqueue(ceph_msgr_wq);
120 }
121 EXPORT_SYMBOL(ceph_msgr_flush);
122
123
124 /*
125  * socket callback functions
126  */
127
128 /* data available on socket, or listen socket received a connect */
129 static void ceph_data_ready(struct sock *sk, int count_unused)
130 {
131         struct ceph_connection *con =
132                 (struct ceph_connection *)sk->sk_user_data;
133         if (sk->sk_state != TCP_CLOSE_WAIT) {
134                 dout("ceph_data_ready on %p state = %lu, queueing work\n",
135                      con, con->state);
136                 queue_con(con);
137         }
138 }
139
140 /* socket has buffer space for writing */
141 static void ceph_write_space(struct sock *sk)
142 {
143         struct ceph_connection *con =
144                 (struct ceph_connection *)sk->sk_user_data;
145
146         /* only queue to workqueue if there is data we want to write. */
147         if (test_bit(WRITE_PENDING, &con->state)) {
148                 dout("ceph_write_space %p queueing write work\n", con);
149                 queue_con(con);
150         } else {
151                 dout("ceph_write_space %p nothing to write\n", con);
152         }
153
154         /* since we have our own write_space, clear the SOCK_NOSPACE flag */
155         clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
156 }
157
158 /* socket's state has changed */
159 static void ceph_state_change(struct sock *sk)
160 {
161         struct ceph_connection *con =
162                 (struct ceph_connection *)sk->sk_user_data;
163
164         dout("ceph_state_change %p state = %lu sk_state = %u\n",
165              con, con->state, sk->sk_state);
166
167         if (test_bit(CLOSED, &con->state))
168                 return;
169
170         switch (sk->sk_state) {
171         case TCP_CLOSE:
172                 dout("ceph_state_change TCP_CLOSE\n");
173         case TCP_CLOSE_WAIT:
174                 dout("ceph_state_change TCP_CLOSE_WAIT\n");
175                 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
176                         if (test_bit(CONNECTING, &con->state))
177                                 con->error_msg = "connection failed";
178                         else
179                                 con->error_msg = "socket closed";
180                         queue_con(con);
181                 }
182                 break;
183         case TCP_ESTABLISHED:
184                 dout("ceph_state_change TCP_ESTABLISHED\n");
185                 queue_con(con);
186                 break;
187         }
188 }
189
190 /*
191  * set up socket callbacks
192  */
193 static void set_sock_callbacks(struct socket *sock,
194                                struct ceph_connection *con)
195 {
196         struct sock *sk = sock->sk;
197         sk->sk_user_data = (void *)con;
198         sk->sk_data_ready = ceph_data_ready;
199         sk->sk_write_space = ceph_write_space;
200         sk->sk_state_change = ceph_state_change;
201 }
202
203
204 /*
205  * socket helpers
206  */
207
208 /*
209  * initiate connection to a remote socket.
210  */
211 static struct socket *ceph_tcp_connect(struct ceph_connection *con)
212 {
213         struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
214         struct socket *sock;
215         int ret;
216
217         BUG_ON(con->sock);
218         ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
219                                IPPROTO_TCP, &sock);
220         if (ret)
221                 return ERR_PTR(ret);
222         con->sock = sock;
223         sock->sk->sk_allocation = GFP_NOFS;
224
225 #ifdef CONFIG_LOCKDEP
226         lockdep_set_class(&sock->sk->sk_lock, &socket_class);
227 #endif
228
229         set_sock_callbacks(sock, con);
230
231         dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
232
233         ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
234                                  O_NONBLOCK);
235         if (ret == -EINPROGRESS) {
236                 dout("connect %s EINPROGRESS sk_state = %u\n",
237                      ceph_pr_addr(&con->peer_addr.in_addr),
238                      sock->sk->sk_state);
239                 ret = 0;
240         }
241         if (ret < 0) {
242                 pr_err("connect %s error %d\n",
243                        ceph_pr_addr(&con->peer_addr.in_addr), ret);
244                 sock_release(sock);
245                 con->sock = NULL;
246                 con->error_msg = "connect error";
247         }
248
249         if (ret < 0)
250                 return ERR_PTR(ret);
251         return sock;
252 }
253
254 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
255 {
256         struct kvec iov = {buf, len};
257         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
258         int r;
259
260         r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
261         if (r == -EAGAIN)
262                 r = 0;
263         return r;
264 }
265
266 /*
267  * write something.  @more is true if caller will be sending more data
268  * shortly.
269  */
270 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
271                      size_t kvlen, size_t len, int more)
272 {
273         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
274         int r;
275
276         if (more)
277                 msg.msg_flags |= MSG_MORE;
278         else
279                 msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
280
281         r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
282         if (r == -EAGAIN)
283                 r = 0;
284         return r;
285 }
286
287 static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,
288                      int offset, size_t size, bool more)
289 {
290         int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
291         int ret;
292
293         ret = kernel_sendpage(sock, page, offset, size, flags);
294         if (ret == -EAGAIN)
295                 ret = 0;
296
297         return ret;
298 }
299
300 static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
301                      int offset, size_t size, bool more)
302 {
303         int ret;
304         struct kvec iov;
305
306         /* sendpage cannot properly handle pages with page_count == 0,
307          * we need to fallback to sendmsg if that's the case */
308         if (page_count(page) >= 1)
309                 return __ceph_tcp_sendpage(sock, page, offset, size, more);
310
311         iov.iov_base = kmap(page) + offset;
312         iov.iov_len = size;
313         ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more);
314         kunmap(page);
315
316         return ret;
317 }
318
319 /*
320  * Shutdown/close the socket for the given connection.
321  */
322 static int con_close_socket(struct ceph_connection *con)
323 {
324         int rc;
325
326         dout("con_close_socket on %p sock %p\n", con, con->sock);
327         if (!con->sock)
328                 return 0;
329         set_bit(SOCK_CLOSED, &con->state);
330         rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
331         sock_release(con->sock);
332         con->sock = NULL;
333         clear_bit(SOCK_CLOSED, &con->state);
334         return rc;
335 }
336
337 /*
338  * Reset a connection.  Discard all incoming and outgoing messages
339  * and clear *_seq state.
340  */
341 static void ceph_msg_remove(struct ceph_msg *msg)
342 {
343         list_del_init(&msg->list_head);
344         ceph_msg_put(msg);
345 }
346 static void ceph_msg_remove_list(struct list_head *head)
347 {
348         while (!list_empty(head)) {
349                 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
350                                                         list_head);
351                 ceph_msg_remove(msg);
352         }
353 }
354
355 static void reset_connection(struct ceph_connection *con)
356 {
357         /* reset connection, out_queue, msg_ and connect_seq */
358         /* discard existing out_queue and msg_seq */
359         ceph_msg_remove_list(&con->out_queue);
360         ceph_msg_remove_list(&con->out_sent);
361
362         if (con->in_msg) {
363                 ceph_msg_put(con->in_msg);
364                 con->in_msg = NULL;
365         }
366
367         con->connect_seq = 0;
368         con->out_seq = 0;
369         if (con->out_msg) {
370                 ceph_msg_put(con->out_msg);
371                 con->out_msg = NULL;
372         }
373         con->in_seq = 0;
374         con->in_seq_acked = 0;
375 }
376
377 /*
378  * mark a peer down.  drop any open connections.
379  */
380 void ceph_con_close(struct ceph_connection *con)
381 {
382         dout("con_close %p peer %s\n", con,
383              ceph_pr_addr(&con->peer_addr.in_addr));
384         set_bit(CLOSED, &con->state);  /* in case there's queued work */
385         clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
386         clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
387         clear_bit(KEEPALIVE_PENDING, &con->state);
388         clear_bit(WRITE_PENDING, &con->state);
389         mutex_lock(&con->mutex);
390         reset_connection(con);
391         con->peer_global_seq = 0;
392         cancel_delayed_work(&con->work);
393         mutex_unlock(&con->mutex);
394         queue_con(con);
395 }
396 EXPORT_SYMBOL(ceph_con_close);
397
398 /*
399  * Reopen a closed connection, with a new peer address.
400  */
401 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
402 {
403         dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
404         set_bit(OPENING, &con->state);
405         clear_bit(CLOSED, &con->state);
406         memcpy(&con->peer_addr, addr, sizeof(*addr));
407         con->delay = 0;      /* reset backoff memory */
408         queue_con(con);
409 }
410 EXPORT_SYMBOL(ceph_con_open);
411
412 /*
413  * return true if this connection ever successfully opened
414  */
415 bool ceph_con_opened(struct ceph_connection *con)
416 {
417         return con->connect_seq > 0;
418 }
419
420 /*
421  * generic get/put
422  */
423 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
424 {
425         dout("con_get %p nref = %d -> %d\n", con,
426              atomic_read(&con->nref), atomic_read(&con->nref) + 1);
427         if (atomic_inc_not_zero(&con->nref))
428                 return con;
429         return NULL;
430 }
431
432 void ceph_con_put(struct ceph_connection *con)
433 {
434         dout("con_put %p nref = %d -> %d\n", con,
435              atomic_read(&con->nref), atomic_read(&con->nref) - 1);
436         BUG_ON(atomic_read(&con->nref) == 0);
437         if (atomic_dec_and_test(&con->nref)) {
438                 BUG_ON(con->sock);
439                 kfree(con);
440         }
441 }
442
443 /*
444  * initialize a new connection.
445  */
446 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
447 {
448         dout("con_init %p\n", con);
449         memset(con, 0, sizeof(*con));
450         atomic_set(&con->nref, 1);
451         con->msgr = msgr;
452         mutex_init(&con->mutex);
453         INIT_LIST_HEAD(&con->out_queue);
454         INIT_LIST_HEAD(&con->out_sent);
455         INIT_DELAYED_WORK(&con->work, con_work);
456 }
457 EXPORT_SYMBOL(ceph_con_init);
458
459
460 /*
461  * We maintain a global counter to order connection attempts.  Get
462  * a unique seq greater than @gt.
463  */
464 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
465 {
466         u32 ret;
467
468         spin_lock(&msgr->global_seq_lock);
469         if (msgr->global_seq < gt)
470                 msgr->global_seq = gt;
471         ret = ++msgr->global_seq;
472         spin_unlock(&msgr->global_seq_lock);
473         return ret;
474 }
475
476
477 /*
478  * Prepare footer for currently outgoing message, and finish things
479  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
480  */
481 static void prepare_write_message_footer(struct ceph_connection *con, int v)
482 {
483         struct ceph_msg *m = con->out_msg;
484
485         dout("prepare_write_message_footer %p\n", con);
486         con->out_kvec_is_msg = true;
487         con->out_kvec[v].iov_base = &m->footer;
488         con->out_kvec[v].iov_len = sizeof(m->footer);
489         con->out_kvec_bytes += sizeof(m->footer);
490         con->out_kvec_left++;
491         con->out_more = m->more_to_follow;
492         con->out_msg_done = true;
493 }
494
495 /*
496  * Prepare headers for the next outgoing message.
497  */
498 static void prepare_write_message(struct ceph_connection *con)
499 {
500         struct ceph_msg *m;
501         int v = 0;
502
503         con->out_kvec_bytes = 0;
504         con->out_kvec_is_msg = true;
505         con->out_msg_done = false;
506
507         /* Sneak an ack in there first?  If we can get it into the same
508          * TCP packet that's a good thing. */
509         if (con->in_seq > con->in_seq_acked) {
510                 con->in_seq_acked = con->in_seq;
511                 con->out_kvec[v].iov_base = &tag_ack;
512                 con->out_kvec[v++].iov_len = 1;
513                 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
514                 con->out_kvec[v].iov_base = &con->out_temp_ack;
515                 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
516                 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
517         }
518
519         m = list_first_entry(&con->out_queue,
520                        struct ceph_msg, list_head);
521         con->out_msg = m;
522
523         /* put message on sent list */
524         ceph_msg_get(m);
525         list_move_tail(&m->list_head, &con->out_sent);
526
527         /*
528          * only assign outgoing seq # if we haven't sent this message
529          * yet.  if it is requeued, resend with it's original seq.
530          */
531         if (m->needs_out_seq) {
532                 m->hdr.seq = cpu_to_le64(++con->out_seq);
533                 m->needs_out_seq = false;
534         }
535
536         dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
537              m, con->out_seq, le16_to_cpu(m->hdr.type),
538              le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
539              le32_to_cpu(m->hdr.data_len),
540              m->nr_pages);
541         BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
542
543         /* tag + hdr + front + middle */
544         con->out_kvec[v].iov_base = &tag_msg;
545         con->out_kvec[v++].iov_len = 1;
546         con->out_kvec[v].iov_base = &m->hdr;
547         con->out_kvec[v++].iov_len = sizeof(m->hdr);
548         con->out_kvec[v++] = m->front;
549         if (m->middle)
550                 con->out_kvec[v++] = m->middle->vec;
551         con->out_kvec_left = v;
552         con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
553                 (m->middle ? m->middle->vec.iov_len : 0);
554         con->out_kvec_cur = con->out_kvec;
555
556         /* fill in crc (except data pages), footer */
557         con->out_msg->hdr.crc =
558                 cpu_to_le32(crc32c(0, (void *)&m->hdr,
559                                       sizeof(m->hdr) - sizeof(m->hdr.crc)));
560         con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
561         con->out_msg->footer.front_crc =
562                 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
563         if (m->middle)
564                 con->out_msg->footer.middle_crc =
565                         cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
566                                            m->middle->vec.iov_len));
567         else
568                 con->out_msg->footer.middle_crc = 0;
569         con->out_msg->footer.data_crc = 0;
570         dout("prepare_write_message front_crc %u data_crc %u\n",
571              le32_to_cpu(con->out_msg->footer.front_crc),
572              le32_to_cpu(con->out_msg->footer.middle_crc));
573
574         /* is there a data payload? */
575         if (le32_to_cpu(m->hdr.data_len) > 0) {
576                 /* initialize page iterator */
577                 con->out_msg_pos.page = 0;
578                 if (m->pages)
579                         con->out_msg_pos.page_pos = m->page_alignment;
580                 else
581                         con->out_msg_pos.page_pos = 0;
582                 con->out_msg_pos.data_pos = 0;
583                 con->out_msg_pos.did_page_crc = 0;
584                 con->out_more = 1;  /* data + footer will follow */
585         } else {
586                 /* no, queue up footer too and be done */
587                 prepare_write_message_footer(con, v);
588         }
589
590         set_bit(WRITE_PENDING, &con->state);
591 }
592
593 /*
594  * Prepare an ack.
595  */
596 static void prepare_write_ack(struct ceph_connection *con)
597 {
598         dout("prepare_write_ack %p %llu -> %llu\n", con,
599              con->in_seq_acked, con->in_seq);
600         con->in_seq_acked = con->in_seq;
601
602         con->out_kvec[0].iov_base = &tag_ack;
603         con->out_kvec[0].iov_len = 1;
604         con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
605         con->out_kvec[1].iov_base = &con->out_temp_ack;
606         con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
607         con->out_kvec_left = 2;
608         con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
609         con->out_kvec_cur = con->out_kvec;
610         con->out_more = 1;  /* more will follow.. eventually.. */
611         set_bit(WRITE_PENDING, &con->state);
612 }
613
614 /*
615  * Prepare to write keepalive byte.
616  */
617 static void prepare_write_keepalive(struct ceph_connection *con)
618 {
619         dout("prepare_write_keepalive %p\n", con);
620         con->out_kvec[0].iov_base = &tag_keepalive;
621         con->out_kvec[0].iov_len = 1;
622         con->out_kvec_left = 1;
623         con->out_kvec_bytes = 1;
624         con->out_kvec_cur = con->out_kvec;
625         set_bit(WRITE_PENDING, &con->state);
626 }
627
628 /*
629  * Connection negotiation.
630  */
631
632 static int prepare_connect_authorizer(struct ceph_connection *con)
633 {
634         void *auth_buf;
635         int auth_len = 0;
636         int auth_protocol = 0;
637
638         mutex_unlock(&con->mutex);
639         if (con->ops->get_authorizer)
640                 con->ops->get_authorizer(con, &auth_buf, &auth_len,
641                                          &auth_protocol, &con->auth_reply_buf,
642                                          &con->auth_reply_buf_len,
643                                          con->auth_retry);
644         mutex_lock(&con->mutex);
645
646         if (test_bit(CLOSED, &con->state) ||
647             test_bit(OPENING, &con->state))
648                 return -EAGAIN;
649
650         con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
651         con->out_connect.authorizer_len = cpu_to_le32(auth_len);
652
653         if (auth_len) {
654                 con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
655                 con->out_kvec[con->out_kvec_left].iov_len = auth_len;
656                 con->out_kvec_left++;
657                 con->out_kvec_bytes += auth_len;
658         }
659         return 0;
660 }
661
662 /*
663  * We connected to a peer and are saying hello.
664  */
665 static void prepare_write_banner(struct ceph_messenger *msgr,
666                                  struct ceph_connection *con)
667 {
668         int len = strlen(CEPH_BANNER);
669
670         con->out_kvec[0].iov_base = CEPH_BANNER;
671         con->out_kvec[0].iov_len = len;
672         con->out_kvec[1].iov_base = &msgr->my_enc_addr;
673         con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
674         con->out_kvec_left = 2;
675         con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
676         con->out_kvec_cur = con->out_kvec;
677         con->out_more = 0;
678         set_bit(WRITE_PENDING, &con->state);
679 }
680
681 static int prepare_write_connect(struct ceph_messenger *msgr,
682                                  struct ceph_connection *con,
683                                  int after_banner)
684 {
685         unsigned global_seq = get_global_seq(con->msgr, 0);
686         int proto;
687
688         switch (con->peer_name.type) {
689         case CEPH_ENTITY_TYPE_MON:
690                 proto = CEPH_MONC_PROTOCOL;
691                 break;
692         case CEPH_ENTITY_TYPE_OSD:
693                 proto = CEPH_OSDC_PROTOCOL;
694                 break;
695         case CEPH_ENTITY_TYPE_MDS:
696                 proto = CEPH_MDSC_PROTOCOL;
697                 break;
698         default:
699                 BUG();
700         }
701
702         dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
703              con->connect_seq, global_seq, proto);
704
705         con->out_connect.features = cpu_to_le64(msgr->supported_features);
706         con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
707         con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
708         con->out_connect.global_seq = cpu_to_le32(global_seq);
709         con->out_connect.protocol_version = cpu_to_le32(proto);
710         con->out_connect.flags = 0;
711
712         if (!after_banner) {
713                 con->out_kvec_left = 0;
714                 con->out_kvec_bytes = 0;
715         }
716         con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
717         con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
718         con->out_kvec_left++;
719         con->out_kvec_bytes += sizeof(con->out_connect);
720         con->out_kvec_cur = con->out_kvec;
721         con->out_more = 0;
722         set_bit(WRITE_PENDING, &con->state);
723
724         return prepare_connect_authorizer(con);
725 }
726
727
728 /*
729  * write as much of pending kvecs to the socket as we can.
730  *  1 -> done
731  *  0 -> socket full, but more to do
732  * <0 -> error
733  */
734 static int write_partial_kvec(struct ceph_connection *con)
735 {
736         int ret;
737
738         dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
739         while (con->out_kvec_bytes > 0) {
740                 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
741                                        con->out_kvec_left, con->out_kvec_bytes,
742                                        con->out_more);
743                 if (ret <= 0)
744                         goto out;
745                 con->out_kvec_bytes -= ret;
746                 if (con->out_kvec_bytes == 0)
747                         break;            /* done */
748                 while (ret > 0) {
749                         if (ret >= con->out_kvec_cur->iov_len) {
750                                 ret -= con->out_kvec_cur->iov_len;
751                                 con->out_kvec_cur++;
752                                 con->out_kvec_left--;
753                         } else {
754                                 con->out_kvec_cur->iov_len -= ret;
755                                 con->out_kvec_cur->iov_base += ret;
756                                 ret = 0;
757                                 break;
758                         }
759                 }
760         }
761         con->out_kvec_left = 0;
762         con->out_kvec_is_msg = false;
763         ret = 1;
764 out:
765         dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
766              con->out_kvec_bytes, con->out_kvec_left, ret);
767         return ret;  /* done! */
768 }
769
770 #ifdef CONFIG_BLOCK
771 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
772 {
773         if (!bio) {
774                 *iter = NULL;
775                 *seg = 0;
776                 return;
777         }
778         *iter = bio;
779         *seg = bio->bi_idx;
780 }
781
782 static void iter_bio_next(struct bio **bio_iter, int *seg)
783 {
784         if (*bio_iter == NULL)
785                 return;
786
787         BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
788
789         (*seg)++;
790         if (*seg == (*bio_iter)->bi_vcnt)
791                 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
792 }
793 #endif
794
795 /*
796  * Write as much message data payload as we can.  If we finish, queue
797  * up the footer.
798  *  1 -> done, footer is now queued in out_kvec[].
799  *  0 -> socket full, but more to do
800  * <0 -> error
801  */
802 static int write_partial_msg_pages(struct ceph_connection *con)
803 {
804         struct ceph_msg *msg = con->out_msg;
805         unsigned data_len = le32_to_cpu(msg->hdr.data_len);
806         size_t len;
807         int crc = con->msgr->nocrc;
808         int ret;
809         int total_max_write;
810         int in_trail = 0;
811         size_t trail_len = (msg->trail ? msg->trail->length : 0);
812
813         dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
814              con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
815              con->out_msg_pos.page_pos);
816
817 #ifdef CONFIG_BLOCK
818         if (msg->bio && !msg->bio_iter)
819                 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
820 #endif
821
822         while (data_len > con->out_msg_pos.data_pos) {
823                 struct page *page = NULL;
824                 void *kaddr = NULL;
825                 int max_write = PAGE_SIZE;
826                 int page_shift = 0;
827
828                 total_max_write = data_len - trail_len -
829                         con->out_msg_pos.data_pos;
830
831                 /*
832                  * if we are calculating the data crc (the default), we need
833                  * to map the page.  if our pages[] has been revoked, use the
834                  * zero page.
835                  */
836
837                 /* have we reached the trail part of the data? */
838                 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
839                         in_trail = 1;
840
841                         total_max_write = data_len - con->out_msg_pos.data_pos;
842
843                         page = list_first_entry(&msg->trail->head,
844                                                 struct page, lru);
845                         if (crc)
846                                 kaddr = kmap(page);
847                         max_write = PAGE_SIZE;
848                 } else if (msg->pages) {
849                         page = msg->pages[con->out_msg_pos.page];
850                         if (crc)
851                                 kaddr = kmap(page);
852                 } else if (msg->pagelist) {
853                         page = list_first_entry(&msg->pagelist->head,
854                                                 struct page, lru);
855                         if (crc)
856                                 kaddr = kmap(page);
857 #ifdef CONFIG_BLOCK
858                 } else if (msg->bio) {
859                         struct bio_vec *bv;
860
861                         bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
862                         page = bv->bv_page;
863                         page_shift = bv->bv_offset;
864                         if (crc)
865                                 kaddr = kmap(page) + page_shift;
866                         max_write = bv->bv_len;
867 #endif
868                 } else {
869                         page = con->msgr->zero_page;
870                         if (crc)
871                                 kaddr = page_address(con->msgr->zero_page);
872                 }
873                 len = min_t(int, max_write - con->out_msg_pos.page_pos,
874                             total_max_write);
875
876                 if (crc && !con->out_msg_pos.did_page_crc) {
877                         void *base = kaddr + con->out_msg_pos.page_pos;
878                         u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
879
880                         BUG_ON(kaddr == NULL);
881                         con->out_msg->footer.data_crc =
882                                 cpu_to_le32(crc32c(tmpcrc, base, len));
883                         con->out_msg_pos.did_page_crc = 1;
884                 }
885                 ret = ceph_tcp_sendpage(con->sock, page,
886                                       con->out_msg_pos.page_pos + page_shift,
887                                       len, 1);
888
889                 if (crc &&
890                     (msg->pages || msg->pagelist || msg->bio || in_trail))
891                         kunmap(page);
892
893                 if (ret <= 0)
894                         goto out;
895
896                 con->out_msg_pos.data_pos += ret;
897                 con->out_msg_pos.page_pos += ret;
898                 if (ret == len) {
899                         con->out_msg_pos.page_pos = 0;
900                         con->out_msg_pos.page++;
901                         con->out_msg_pos.did_page_crc = 0;
902                         if (in_trail)
903                                 list_move_tail(&page->lru,
904                                                &msg->trail->head);
905                         else if (msg->pagelist)
906                                 list_move_tail(&page->lru,
907                                                &msg->pagelist->head);
908 #ifdef CONFIG_BLOCK
909                         else if (msg->bio)
910                                 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
911 #endif
912                 }
913         }
914
915         dout("write_partial_msg_pages %p msg %p done\n", con, msg);
916
917         /* prepare and queue up footer, too */
918         if (!crc)
919                 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
920         con->out_kvec_bytes = 0;
921         con->out_kvec_left = 0;
922         con->out_kvec_cur = con->out_kvec;
923         prepare_write_message_footer(con, 0);
924         ret = 1;
925 out:
926         return ret;
927 }
928
929 /*
930  * write some zeros
931  */
932 static int write_partial_skip(struct ceph_connection *con)
933 {
934         int ret;
935
936         while (con->out_skip > 0) {
937                 struct kvec iov = {
938                         .iov_base = page_address(con->msgr->zero_page),
939                         .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
940                 };
941
942                 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
943                 if (ret <= 0)
944                         goto out;
945                 con->out_skip -= ret;
946         }
947         ret = 1;
948 out:
949         return ret;
950 }
951
952 /*
953  * Prepare to read connection handshake, or an ack.
954  */
955 static void prepare_read_banner(struct ceph_connection *con)
956 {
957         dout("prepare_read_banner %p\n", con);
958         con->in_base_pos = 0;
959 }
960
961 static void prepare_read_connect(struct ceph_connection *con)
962 {
963         dout("prepare_read_connect %p\n", con);
964         con->in_base_pos = 0;
965 }
966
967 static void prepare_read_ack(struct ceph_connection *con)
968 {
969         dout("prepare_read_ack %p\n", con);
970         con->in_base_pos = 0;
971 }
972
973 static void prepare_read_tag(struct ceph_connection *con)
974 {
975         dout("prepare_read_tag %p\n", con);
976         con->in_base_pos = 0;
977         con->in_tag = CEPH_MSGR_TAG_READY;
978 }
979
980 /*
981  * Prepare to read a message.
982  */
983 static int prepare_read_message(struct ceph_connection *con)
984 {
985         dout("prepare_read_message %p\n", con);
986         BUG_ON(con->in_msg != NULL);
987         con->in_base_pos = 0;
988         con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
989         return 0;
990 }
991
992
993 static int read_partial(struct ceph_connection *con,
994                         int *to, int size, void *object)
995 {
996         *to += size;
997         while (con->in_base_pos < *to) {
998                 int left = *to - con->in_base_pos;
999                 int have = size - left;
1000                 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1001                 if (ret <= 0)
1002                         return ret;
1003                 con->in_base_pos += ret;
1004         }
1005         return 1;
1006 }
1007
1008
1009 /*
1010  * Read all or part of the connect-side handshake on a new connection
1011  */
1012 static int read_partial_banner(struct ceph_connection *con)
1013 {
1014         int ret, to = 0;
1015
1016         dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1017
1018         /* peer's banner */
1019         ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
1020         if (ret <= 0)
1021                 goto out;
1022         ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
1023                            &con->actual_peer_addr);
1024         if (ret <= 0)
1025                 goto out;
1026         ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
1027                            &con->peer_addr_for_me);
1028         if (ret <= 0)
1029                 goto out;
1030 out:
1031         return ret;
1032 }
1033
1034 static int read_partial_connect(struct ceph_connection *con)
1035 {
1036         int ret, to = 0;
1037
1038         dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1039
1040         ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
1041         if (ret <= 0)
1042                 goto out;
1043         ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
1044                            con->auth_reply_buf);
1045         if (ret <= 0)
1046                 goto out;
1047
1048         dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
1049              con, (int)con->in_reply.tag,
1050              le32_to_cpu(con->in_reply.connect_seq),
1051              le32_to_cpu(con->in_reply.global_seq));
1052 out:
1053         return ret;
1054
1055 }
1056
1057 /*
1058  * Verify the hello banner looks okay.
1059  */
1060 static int verify_hello(struct ceph_connection *con)
1061 {
1062         if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1063                 pr_err("connect to %s got bad banner\n",
1064                        ceph_pr_addr(&con->peer_addr.in_addr));
1065                 con->error_msg = "protocol error, bad banner";
1066                 return -1;
1067         }
1068         return 0;
1069 }
1070
1071 static bool addr_is_blank(struct sockaddr_storage *ss)
1072 {
1073         switch (ss->ss_family) {
1074         case AF_INET:
1075                 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
1076         case AF_INET6:
1077                 return
1078                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
1079                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
1080                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
1081                      ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
1082         }
1083         return false;
1084 }
1085
1086 static int addr_port(struct sockaddr_storage *ss)
1087 {
1088         switch (ss->ss_family) {
1089         case AF_INET:
1090                 return ntohs(((struct sockaddr_in *)ss)->sin_port);
1091         case AF_INET6:
1092                 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port);
1093         }
1094         return 0;
1095 }
1096
1097 static void addr_set_port(struct sockaddr_storage *ss, int p)
1098 {
1099         switch (ss->ss_family) {
1100         case AF_INET:
1101                 ((struct sockaddr_in *)ss)->sin_port = htons(p);
1102                 break;
1103         case AF_INET6:
1104                 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
1105                 break;
1106         }
1107 }
1108
1109 /*
1110  * Unlike other *_pton function semantics, zero indicates success.
1111  */
1112 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
1113                 char delim, const char **ipend)
1114 {
1115         struct sockaddr_in *in4 = (void *)ss;
1116         struct sockaddr_in6 *in6 = (void *)ss;
1117
1118         memset(ss, 0, sizeof(*ss));
1119
1120         if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
1121                 ss->ss_family = AF_INET;
1122                 return 0;
1123         }
1124
1125         if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
1126                 ss->ss_family = AF_INET6;
1127                 return 0;
1128         }
1129
1130         return -EINVAL;
1131 }
1132
1133 /*
1134  * Extract hostname string and resolve using kernel DNS facility.
1135  */
1136 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1137 static int ceph_dns_resolve_name(const char *name, size_t namelen,
1138                 struct sockaddr_storage *ss, char delim, const char **ipend)
1139 {
1140         const char *end, *delim_p;
1141         char *colon_p, *ip_addr = NULL;
1142         int ip_len, ret;
1143
1144         /*
1145          * The end of the hostname occurs immediately preceding the delimiter or
1146          * the port marker (':') where the delimiter takes precedence.
1147          */
1148         delim_p = memchr(name, delim, namelen);
1149         colon_p = memchr(name, ':', namelen);
1150
1151         if (delim_p && colon_p)
1152                 end = delim_p < colon_p ? delim_p : colon_p;
1153         else if (!delim_p && colon_p)
1154                 end = colon_p;
1155         else {
1156                 end = delim_p;
1157                 if (!end) /* case: hostname:/ */
1158                         end = name + namelen;
1159         }
1160
1161         if (end <= name)
1162                 return -EINVAL;
1163
1164         /* do dns_resolve upcall */
1165         ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
1166         if (ip_len > 0)
1167                 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
1168         else
1169                 ret = -ESRCH;
1170
1171         kfree(ip_addr);
1172
1173         *ipend = end;
1174
1175         pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1176                         ret, ret ? "failed" : ceph_pr_addr(ss));
1177
1178         return ret;
1179 }
1180 #else
1181 static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1182                 struct sockaddr_storage *ss, char delim, const char **ipend)
1183 {
1184         return -EINVAL;
1185 }
1186 #endif
1187
1188 /*
1189  * Parse a server name (IP or hostname). If a valid IP address is not found
1190  * then try to extract a hostname to resolve using userspace DNS upcall.
1191  */
1192 static int ceph_parse_server_name(const char *name, size_t namelen,
1193                         struct sockaddr_storage *ss, char delim, const char **ipend)
1194 {
1195         int ret;
1196
1197         ret = ceph_pton(name, namelen, ss, delim, ipend);
1198         if (ret)
1199                 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
1200
1201         return ret;
1202 }
1203
1204 /*
1205  * Parse an ip[:port] list into an addr array.  Use the default
1206  * monitor port if a port isn't specified.
1207  */
1208 int ceph_parse_ips(const char *c, const char *end,
1209                    struct ceph_entity_addr *addr,
1210                    int max_count, int *count)
1211 {
1212         int i, ret = -EINVAL;
1213         const char *p = c;
1214
1215         dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1216         for (i = 0; i < max_count; i++) {
1217                 const char *ipend;
1218                 struct sockaddr_storage *ss = &addr[i].in_addr;
1219                 int port;
1220                 char delim = ',';
1221
1222                 if (*p == '[') {
1223                         delim = ']';
1224                         p++;
1225                 }
1226
1227                 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
1228                 if (ret)
1229                         goto bad;
1230                 ret = -EINVAL;
1231
1232                 p = ipend;
1233
1234                 if (delim == ']') {
1235                         if (*p != ']') {
1236                                 dout("missing matching ']'\n");
1237                                 goto bad;
1238                         }
1239                         p++;
1240                 }
1241
1242                 /* port? */
1243                 if (p < end && *p == ':') {
1244                         port = 0;
1245                         p++;
1246                         while (p < end && *p >= '0' && *p <= '9') {
1247                                 port = (port * 10) + (*p - '0');
1248                                 p++;
1249                         }
1250                         if (port > 65535 || port == 0)
1251                                 goto bad;
1252                 } else {
1253                         port = CEPH_MON_PORT;
1254                 }
1255
1256                 addr_set_port(ss, port);
1257
1258                 dout("parse_ips got %s\n", ceph_pr_addr(ss));
1259
1260                 if (p == end)
1261                         break;
1262                 if (*p != ',')
1263                         goto bad;
1264                 p++;
1265         }
1266
1267         if (p != end)
1268                 goto bad;
1269
1270         if (count)
1271                 *count = i + 1;
1272         return 0;
1273
1274 bad:
1275         pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
1276         return ret;
1277 }
1278 EXPORT_SYMBOL(ceph_parse_ips);
1279
1280 static int process_banner(struct ceph_connection *con)
1281 {
1282         dout("process_banner on %p\n", con);
1283
1284         if (verify_hello(con) < 0)
1285                 return -1;
1286
1287         ceph_decode_addr(&con->actual_peer_addr);
1288         ceph_decode_addr(&con->peer_addr_for_me);
1289
1290         /*
1291          * Make sure the other end is who we wanted.  note that the other
1292          * end may not yet know their ip address, so if it's 0.0.0.0, give
1293          * them the benefit of the doubt.
1294          */
1295         if (memcmp(&con->peer_addr, &con->actual_peer_addr,
1296                    sizeof(con->peer_addr)) != 0 &&
1297             !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1298               con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1299                 pr_warning("wrong peer, want %s/%d, got %s/%d\n",
1300                            ceph_pr_addr(&con->peer_addr.in_addr),
1301                            (int)le32_to_cpu(con->peer_addr.nonce),
1302                            ceph_pr_addr(&con->actual_peer_addr.in_addr),
1303                            (int)le32_to_cpu(con->actual_peer_addr.nonce));
1304                 con->error_msg = "wrong peer at address";
1305                 return -1;
1306         }
1307
1308         /*
1309          * did we learn our address?
1310          */
1311         if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1312                 int port = addr_port(&con->msgr->inst.addr.in_addr);
1313
1314                 memcpy(&con->msgr->inst.addr.in_addr,
1315                        &con->peer_addr_for_me.in_addr,
1316                        sizeof(con->peer_addr_for_me.in_addr));
1317                 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1318                 encode_my_addr(con->msgr);
1319                 dout("process_banner learned my addr is %s\n",
1320                      ceph_pr_addr(&con->msgr->inst.addr.in_addr));
1321         }
1322
1323         set_bit(NEGOTIATING, &con->state);
1324         prepare_read_connect(con);
1325         return 0;
1326 }
1327
1328 static void fail_protocol(struct ceph_connection *con)
1329 {
1330         reset_connection(con);
1331         set_bit(CLOSED, &con->state);  /* in case there's queued work */
1332
1333         mutex_unlock(&con->mutex);
1334         if (con->ops->bad_proto)
1335                 con->ops->bad_proto(con);
1336         mutex_lock(&con->mutex);
1337 }
1338
1339 static int process_connect(struct ceph_connection *con)
1340 {
1341         u64 sup_feat = con->msgr->supported_features;
1342         u64 req_feat = con->msgr->required_features;
1343         u64 server_feat = le64_to_cpu(con->in_reply.features);
1344         int ret;
1345
1346         dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1347
1348         switch (con->in_reply.tag) {
1349         case CEPH_MSGR_TAG_FEATURES:
1350                 pr_err("%s%lld %s feature set mismatch,"
1351                        " my %llx < server's %llx, missing %llx\n",
1352                        ENTITY_NAME(con->peer_name),
1353                        ceph_pr_addr(&con->peer_addr.in_addr),
1354                        sup_feat, server_feat, server_feat & ~sup_feat);
1355                 con->error_msg = "missing required protocol features";
1356                 fail_protocol(con);
1357                 return -1;
1358
1359         case CEPH_MSGR_TAG_BADPROTOVER:
1360                 pr_err("%s%lld %s protocol version mismatch,"
1361                        " my %d != server's %d\n",
1362                        ENTITY_NAME(con->peer_name),
1363                        ceph_pr_addr(&con->peer_addr.in_addr),
1364                        le32_to_cpu(con->out_connect.protocol_version),
1365                        le32_to_cpu(con->in_reply.protocol_version));
1366                 con->error_msg = "protocol version mismatch";
1367                 fail_protocol(con);
1368                 return -1;
1369
1370         case CEPH_MSGR_TAG_BADAUTHORIZER:
1371                 con->auth_retry++;
1372                 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
1373                      con->auth_retry);
1374                 if (con->auth_retry == 2) {
1375                         con->error_msg = "connect authorization failure";
1376                         return -1;
1377                 }
1378                 con->auth_retry = 1;
1379                 ret = prepare_write_connect(con->msgr, con, 0);
1380                 if (ret < 0)
1381                         return ret;
1382                 prepare_read_connect(con);
1383                 break;
1384
1385         case CEPH_MSGR_TAG_RESETSESSION:
1386                 /*
1387                  * If we connected with a large connect_seq but the peer
1388                  * has no record of a session with us (no connection, or
1389                  * connect_seq == 0), they will send RESETSESION to indicate
1390                  * that they must have reset their session, and may have
1391                  * dropped messages.
1392                  */
1393                 dout("process_connect got RESET peer seq %u\n",
1394                      le32_to_cpu(con->in_connect.connect_seq));
1395                 pr_err("%s%lld %s connection reset\n",
1396                        ENTITY_NAME(con->peer_name),
1397                        ceph_pr_addr(&con->peer_addr.in_addr));
1398                 reset_connection(con);
1399                 prepare_write_connect(con->msgr, con, 0);
1400                 prepare_read_connect(con);
1401
1402                 /* Tell ceph about it. */
1403                 mutex_unlock(&con->mutex);
1404                 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1405                 if (con->ops->peer_reset)
1406                         con->ops->peer_reset(con);
1407                 mutex_lock(&con->mutex);
1408                 if (test_bit(CLOSED, &con->state) ||
1409                     test_bit(OPENING, &con->state))
1410                         return -EAGAIN;
1411                 break;
1412
1413         case CEPH_MSGR_TAG_RETRY_SESSION:
1414                 /*
1415                  * If we sent a smaller connect_seq than the peer has, try
1416                  * again with a larger value.
1417                  */
1418                 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n",
1419                      le32_to_cpu(con->out_connect.connect_seq),
1420                      le32_to_cpu(con->in_connect.connect_seq));
1421                 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1422                 prepare_write_connect(con->msgr, con, 0);
1423                 prepare_read_connect(con);
1424                 break;
1425
1426         case CEPH_MSGR_TAG_RETRY_GLOBAL:
1427                 /*
1428                  * If we sent a smaller global_seq than the peer has, try
1429                  * again with a larger value.
1430                  */
1431                 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
1432                      con->peer_global_seq,
1433                      le32_to_cpu(con->in_connect.global_seq));
1434                 get_global_seq(con->msgr,
1435                                le32_to_cpu(con->in_connect.global_seq));
1436                 prepare_write_connect(con->msgr, con, 0);
1437                 prepare_read_connect(con);
1438                 break;
1439
1440         case CEPH_MSGR_TAG_READY:
1441                 if (req_feat & ~server_feat) {
1442                         pr_err("%s%lld %s protocol feature mismatch,"
1443                                " my required %llx > server's %llx, need %llx\n",
1444                                ENTITY_NAME(con->peer_name),
1445                                ceph_pr_addr(&con->peer_addr.in_addr),
1446                                req_feat, server_feat, req_feat & ~server_feat);
1447                         con->error_msg = "missing required protocol features";
1448                         fail_protocol(con);
1449                         return -1;
1450                 }
1451                 clear_bit(CONNECTING, &con->state);
1452                 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1453                 con->connect_seq++;
1454                 con->peer_features = server_feat;
1455                 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1456                      con->peer_global_seq,
1457                      le32_to_cpu(con->in_reply.connect_seq),
1458                      con->connect_seq);
1459                 WARN_ON(con->connect_seq !=
1460                         le32_to_cpu(con->in_reply.connect_seq));
1461
1462                 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1463                         set_bit(LOSSYTX, &con->state);
1464
1465                 prepare_read_tag(con);
1466                 break;
1467
1468         case CEPH_MSGR_TAG_WAIT:
1469                 /*
1470                  * If there is a connection race (we are opening
1471                  * connections to each other), one of us may just have
1472                  * to WAIT.  This shouldn't happen if we are the
1473                  * client.
1474                  */
1475                 pr_err("process_connect got WAIT as client\n");
1476                 con->error_msg = "protocol error, got WAIT as client";
1477                 return -1;
1478
1479         default:
1480                 pr_err("connect protocol error, will retry\n");
1481                 con->error_msg = "protocol error, garbage tag during connect";
1482                 return -1;
1483         }
1484         return 0;
1485 }
1486
1487
1488 /*
1489  * read (part of) an ack
1490  */
1491 static int read_partial_ack(struct ceph_connection *con)
1492 {
1493         int to = 0;
1494
1495         return read_partial(con, &to, sizeof(con->in_temp_ack),
1496                             &con->in_temp_ack);
1497 }
1498
1499
1500 /*
1501  * We can finally discard anything that's been acked.
1502  */
1503 static void process_ack(struct ceph_connection *con)
1504 {
1505         struct ceph_msg *m;
1506         u64 ack = le64_to_cpu(con->in_temp_ack);
1507         u64 seq;
1508
1509         while (!list_empty(&con->out_sent)) {
1510                 m = list_first_entry(&con->out_sent, struct ceph_msg,
1511                                      list_head);
1512                 seq = le64_to_cpu(m->hdr.seq);
1513                 if (seq > ack)
1514                         break;
1515                 dout("got ack for seq %llu type %d at %p\n", seq,
1516                      le16_to_cpu(m->hdr.type), m);
1517                 m->ack_stamp = jiffies;
1518                 ceph_msg_remove(m);
1519         }
1520         prepare_read_tag(con);
1521 }
1522
1523
1524
1525
1526 static int read_partial_message_section(struct ceph_connection *con,
1527                                         struct kvec *section,
1528                                         unsigned int sec_len, u32 *crc)
1529 {
1530         int ret, left;
1531
1532         BUG_ON(!section);
1533
1534         while (section->iov_len < sec_len) {
1535                 BUG_ON(section->iov_base == NULL);
1536                 left = sec_len - section->iov_len;
1537                 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
1538                                        section->iov_len, left);
1539                 if (ret <= 0)
1540                         return ret;
1541                 section->iov_len += ret;
1542                 if (section->iov_len == sec_len)
1543                         *crc = crc32c(0, section->iov_base,
1544                                       section->iov_len);
1545         }
1546
1547         return 1;
1548 }
1549
1550 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1551                                 struct ceph_msg_header *hdr,
1552                                 int *skip);
1553
1554
1555 static int read_partial_message_pages(struct ceph_connection *con,
1556                                       struct page **pages,
1557                                       unsigned data_len, int datacrc)
1558 {
1559         void *p;
1560         int ret;
1561         int left;
1562
1563         left = min((int)(data_len - con->in_msg_pos.data_pos),
1564                    (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1565         /* (page) data */
1566         BUG_ON(pages == NULL);
1567         p = kmap(pages[con->in_msg_pos.page]);
1568         ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1569                                left);
1570         if (ret > 0 && datacrc)
1571                 con->in_data_crc =
1572                         crc32c(con->in_data_crc,
1573                                   p + con->in_msg_pos.page_pos, ret);
1574         kunmap(pages[con->in_msg_pos.page]);
1575         if (ret <= 0)
1576                 return ret;
1577         con->in_msg_pos.data_pos += ret;
1578         con->in_msg_pos.page_pos += ret;
1579         if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1580                 con->in_msg_pos.page_pos = 0;
1581                 con->in_msg_pos.page++;
1582         }
1583
1584         return ret;
1585 }
1586
1587 #ifdef CONFIG_BLOCK
1588 static int read_partial_message_bio(struct ceph_connection *con,
1589                                     struct bio **bio_iter, int *bio_seg,
1590                                     unsigned data_len, int datacrc)
1591 {
1592         struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1593         void *p;
1594         int ret, left;
1595
1596         if (IS_ERR(bv))
1597                 return PTR_ERR(bv);
1598
1599         left = min((int)(data_len - con->in_msg_pos.data_pos),
1600                    (int)(bv->bv_len - con->in_msg_pos.page_pos));
1601
1602         p = kmap(bv->bv_page) + bv->bv_offset;
1603
1604         ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1605                                left);
1606         if (ret > 0 && datacrc)
1607                 con->in_data_crc =
1608                         crc32c(con->in_data_crc,
1609                                   p + con->in_msg_pos.page_pos, ret);
1610         kunmap(bv->bv_page);
1611         if (ret <= 0)
1612                 return ret;
1613         con->in_msg_pos.data_pos += ret;
1614         con->in_msg_pos.page_pos += ret;
1615         if (con->in_msg_pos.page_pos == bv->bv_len) {
1616                 con->in_msg_pos.page_pos = 0;
1617                 iter_bio_next(bio_iter, bio_seg);
1618         }
1619
1620         return ret;
1621 }
1622 #endif
1623
1624 /*
1625  * read (part of) a message.
1626  */
1627 static int read_partial_message(struct ceph_connection *con)
1628 {
1629         struct ceph_msg *m = con->in_msg;
1630         int ret;
1631         int to, left;
1632         unsigned front_len, middle_len, data_len;
1633         int datacrc = con->msgr->nocrc;
1634         int skip;
1635         u64 seq;
1636
1637         dout("read_partial_message con %p msg %p\n", con, m);
1638
1639         /* header */
1640         while (con->in_base_pos < sizeof(con->in_hdr)) {
1641                 left = sizeof(con->in_hdr) - con->in_base_pos;
1642                 ret = ceph_tcp_recvmsg(con->sock,
1643                                        (char *)&con->in_hdr + con->in_base_pos,
1644                                        left);
1645                 if (ret <= 0)
1646                         return ret;
1647                 con->in_base_pos += ret;
1648                 if (con->in_base_pos == sizeof(con->in_hdr)) {
1649                         u32 crc = crc32c(0, (void *)&con->in_hdr,
1650                                  sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
1651                         if (crc != le32_to_cpu(con->in_hdr.crc)) {
1652                                 pr_err("read_partial_message bad hdr "
1653                                        " crc %u != expected %u\n",
1654                                        crc, con->in_hdr.crc);
1655                                 return -EBADMSG;
1656                         }
1657                 }
1658         }
1659         front_len = le32_to_cpu(con->in_hdr.front_len);
1660         if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1661                 return -EIO;
1662         middle_len = le32_to_cpu(con->in_hdr.middle_len);
1663         if (middle_len > CEPH_MSG_MAX_DATA_LEN)
1664                 return -EIO;
1665         data_len = le32_to_cpu(con->in_hdr.data_len);
1666         if (data_len > CEPH_MSG_MAX_DATA_LEN)
1667                 return -EIO;
1668
1669         /* verify seq# */
1670         seq = le64_to_cpu(con->in_hdr.seq);
1671         if ((s64)seq - (s64)con->in_seq < 1) {
1672                 pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1673                         ENTITY_NAME(con->peer_name),
1674                         ceph_pr_addr(&con->peer_addr.in_addr),
1675                         seq, con->in_seq + 1);
1676                 con->in_base_pos = -front_len - middle_len - data_len -
1677                         sizeof(m->footer);
1678                 con->in_tag = CEPH_MSGR_TAG_READY;
1679                 return 0;
1680         } else if ((s64)seq - (s64)con->in_seq > 1) {
1681                 pr_err("read_partial_message bad seq %lld expected %lld\n",
1682                        seq, con->in_seq + 1);
1683                 con->error_msg = "bad message sequence # for incoming message";
1684                 return -EBADMSG;
1685         }
1686
1687         /* allocate message? */
1688         if (!con->in_msg) {
1689                 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1690                      con->in_hdr.front_len, con->in_hdr.data_len);
1691                 skip = 0;
1692                 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
1693                 if (skip) {
1694                         /* skip this message */
1695                         dout("alloc_msg said skip message\n");
1696                         BUG_ON(con->in_msg);
1697                         con->in_base_pos = -front_len - middle_len - data_len -
1698                                 sizeof(m->footer);
1699                         con->in_tag = CEPH_MSGR_TAG_READY;
1700                         con->in_seq++;
1701                         return 0;
1702                 }
1703                 if (!con->in_msg) {
1704                         con->error_msg =
1705                                 "error allocating memory for incoming message";
1706                         return -ENOMEM;
1707                 }
1708                 m = con->in_msg;
1709                 m->front.iov_len = 0;    /* haven't read it yet */
1710                 if (m->middle)
1711                         m->middle->vec.iov_len = 0;
1712
1713                 con->in_msg_pos.page = 0;
1714                 if (m->pages)
1715                         con->in_msg_pos.page_pos = m->page_alignment;
1716                 else
1717                         con->in_msg_pos.page_pos = 0;
1718                 con->in_msg_pos.data_pos = 0;
1719         }
1720
1721         /* front */
1722         ret = read_partial_message_section(con, &m->front, front_len,
1723                                            &con->in_front_crc);
1724         if (ret <= 0)
1725                 return ret;
1726
1727         /* middle */
1728         if (m->middle) {
1729                 ret = read_partial_message_section(con, &m->middle->vec,
1730                                                    middle_len,
1731                                                    &con->in_middle_crc);
1732                 if (ret <= 0)
1733                         return ret;
1734         }
1735 #ifdef CONFIG_BLOCK
1736         if (m->bio && !m->bio_iter)
1737                 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1738 #endif
1739
1740         /* (page) data */
1741         while (con->in_msg_pos.data_pos < data_len) {
1742                 if (m->pages) {
1743                         ret = read_partial_message_pages(con, m->pages,
1744                                                  data_len, datacrc);
1745                         if (ret <= 0)
1746                                 return ret;
1747 #ifdef CONFIG_BLOCK
1748                 } else if (m->bio) {
1749
1750                         ret = read_partial_message_bio(con,
1751                                                  &m->bio_iter, &m->bio_seg,
1752                                                  data_len, datacrc);
1753                         if (ret <= 0)
1754                                 return ret;
1755 #endif
1756                 } else {
1757                         BUG_ON(1);
1758                 }
1759         }
1760
1761         /* footer */
1762         to = sizeof(m->hdr) + sizeof(m->footer);
1763         while (con->in_base_pos < to) {
1764                 left = to - con->in_base_pos;
1765                 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
1766                                        (con->in_base_pos - sizeof(m->hdr)),
1767                                        left);
1768                 if (ret <= 0)
1769                         return ret;
1770                 con->in_base_pos += ret;
1771         }
1772         dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1773              m, front_len, m->footer.front_crc, middle_len,
1774              m->footer.middle_crc, data_len, m->footer.data_crc);
1775
1776         /* crc ok? */
1777         if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1778                 pr_err("read_partial_message %p front crc %u != exp. %u\n",
1779                        m, con->in_front_crc, m->footer.front_crc);
1780                 return -EBADMSG;
1781         }
1782         if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1783                 pr_err("read_partial_message %p middle crc %u != exp %u\n",
1784                        m, con->in_middle_crc, m->footer.middle_crc);
1785                 return -EBADMSG;
1786         }
1787         if (datacrc &&
1788             (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1789             con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1790                 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1791                        con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1792                 return -EBADMSG;
1793         }
1794
1795         return 1; /* done! */
1796 }
1797
1798 /*
1799  * Process message.  This happens in the worker thread.  The callback should
1800  * be careful not to do anything that waits on other incoming messages or it
1801  * may deadlock.
1802  */
1803 static void process_message(struct ceph_connection *con)
1804 {
1805         struct ceph_msg *msg;
1806
1807         msg = con->in_msg;
1808         con->in_msg = NULL;
1809
1810         /* if first message, set peer_name */
1811         if (con->peer_name.type == 0)
1812                 con->peer_name = msg->hdr.src;
1813
1814         con->in_seq++;
1815         mutex_unlock(&con->mutex);
1816
1817         dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1818              msg, le64_to_cpu(msg->hdr.seq),
1819              ENTITY_NAME(msg->hdr.src),
1820              le16_to_cpu(msg->hdr.type),
1821              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1822              le32_to_cpu(msg->hdr.front_len),
1823              le32_to_cpu(msg->hdr.data_len),
1824              con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1825         con->ops->dispatch(con, msg);
1826
1827         mutex_lock(&con->mutex);
1828         prepare_read_tag(con);
1829 }
1830
1831
1832 /*
1833  * Write something to the socket.  Called in a worker thread when the
1834  * socket appears to be writeable and we have something ready to send.
1835  */
1836 static int try_write(struct ceph_connection *con)
1837 {
1838         struct ceph_messenger *msgr = con->msgr;
1839         int ret = 1;
1840
1841         dout("try_write start %p state %lu nref %d\n", con, con->state,
1842              atomic_read(&con->nref));
1843
1844 more:
1845         dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1846
1847         /* open the socket first? */
1848         if (con->sock == NULL) {
1849                 prepare_write_banner(msgr, con);
1850                 prepare_write_connect(msgr, con, 1);
1851                 prepare_read_banner(con);
1852                 set_bit(CONNECTING, &con->state);
1853                 clear_bit(NEGOTIATING, &con->state);
1854
1855                 BUG_ON(con->in_msg);
1856                 con->in_tag = CEPH_MSGR_TAG_READY;
1857                 dout("try_write initiating connect on %p new state %lu\n",
1858                      con, con->state);
1859                 con->sock = ceph_tcp_connect(con);
1860                 if (IS_ERR(con->sock)) {
1861                         con->sock = NULL;
1862                         con->error_msg = "connect error";
1863                         ret = -1;
1864                         goto out;
1865                 }
1866         }
1867
1868 more_kvec:
1869         /* kvec data queued? */
1870         if (con->out_skip) {
1871                 ret = write_partial_skip(con);
1872                 if (ret <= 0)
1873                         goto out;
1874         }
1875         if (con->out_kvec_left) {
1876                 ret = write_partial_kvec(con);
1877                 if (ret <= 0)
1878                         goto out;
1879         }
1880
1881         /* msg pages? */
1882         if (con->out_msg) {
1883                 if (con->out_msg_done) {
1884                         ceph_msg_put(con->out_msg);
1885                         con->out_msg = NULL;   /* we're done with this one */
1886                         goto do_next;
1887                 }
1888
1889                 ret = write_partial_msg_pages(con);
1890                 if (ret == 1)
1891                         goto more_kvec;  /* we need to send the footer, too! */
1892                 if (ret == 0)
1893                         goto out;
1894                 if (ret < 0) {
1895                         dout("try_write write_partial_msg_pages err %d\n",
1896                              ret);
1897                         goto out;
1898                 }
1899         }
1900
1901 do_next:
1902         if (!test_bit(CONNECTING, &con->state)) {
1903                 /* is anything else pending? */
1904                 if (!list_empty(&con->out_queue)) {
1905                         prepare_write_message(con);
1906                         goto more;
1907                 }
1908                 if (con->in_seq > con->in_seq_acked) {
1909                         prepare_write_ack(con);
1910                         goto more;
1911                 }
1912                 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
1913                         prepare_write_keepalive(con);
1914                         goto more;
1915                 }
1916         }
1917
1918         /* Nothing to do! */
1919         clear_bit(WRITE_PENDING, &con->state);
1920         dout("try_write nothing else to write.\n");
1921         ret = 0;
1922 out:
1923         dout("try_write done on %p ret %d\n", con, ret);
1924         return ret;
1925 }
1926
1927
1928
1929 /*
1930  * Read what we can from the socket.
1931  */
1932 static int try_read(struct ceph_connection *con)
1933 {
1934         int ret = -1;
1935
1936         if (!con->sock)
1937                 return 0;
1938
1939         if (test_bit(STANDBY, &con->state))
1940                 return 0;
1941
1942         dout("try_read start on %p\n", con);
1943
1944 more:
1945         dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1946              con->in_base_pos);
1947
1948         /*
1949          * process_connect and process_message drop and re-take
1950          * con->mutex.  make sure we handle a racing close or reopen.
1951          */
1952         if (test_bit(CLOSED, &con->state) ||
1953             test_bit(OPENING, &con->state)) {
1954                 ret = -EAGAIN;
1955                 goto out;
1956         }
1957
1958         if (test_bit(CONNECTING, &con->state)) {
1959                 if (!test_bit(NEGOTIATING, &con->state)) {
1960                         dout("try_read connecting\n");
1961                         ret = read_partial_banner(con);
1962                         if (ret <= 0)
1963                                 goto out;
1964                         ret = process_banner(con);
1965                         if (ret < 0)
1966                                 goto out;
1967                 }
1968                 ret = read_partial_connect(con);
1969                 if (ret <= 0)
1970                         goto out;
1971                 ret = process_connect(con);
1972                 if (ret < 0)
1973                         goto out;
1974                 goto more;
1975         }
1976
1977         if (con->in_base_pos < 0) {
1978                 /*
1979                  * skipping + discarding content.
1980                  *
1981                  * FIXME: there must be a better way to do this!
1982                  */
1983                 static char buf[1024];
1984                 int skip = min(1024, -con->in_base_pos);
1985                 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
1986                 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
1987                 if (ret <= 0)
1988                         goto out;
1989                 con->in_base_pos += ret;
1990                 if (con->in_base_pos)
1991                         goto more;
1992         }
1993         if (con->in_tag == CEPH_MSGR_TAG_READY) {
1994                 /*
1995                  * what's next?
1996                  */
1997                 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
1998                 if (ret <= 0)
1999                         goto out;
2000                 dout("try_read got tag %d\n", (int)con->in_tag);
2001                 switch (con->in_tag) {
2002                 case CEPH_MSGR_TAG_MSG:
2003                         prepare_read_message(con);
2004                         break;
2005                 case CEPH_MSGR_TAG_ACK:
2006                         prepare_read_ack(con);
2007                         break;
2008                 case CEPH_MSGR_TAG_CLOSE:
2009                         set_bit(CLOSED, &con->state);   /* fixme */
2010                         goto out;
2011                 default:
2012                         goto bad_tag;
2013                 }
2014         }
2015         if (con->in_tag == CEPH_MSGR_TAG_MSG) {
2016                 ret = read_partial_message(con);
2017                 if (ret <= 0) {
2018                         switch (ret) {
2019                         case -EBADMSG:
2020                                 con->error_msg = "bad crc";
2021                                 ret = -EIO;
2022                                 break;
2023                         case -EIO:
2024                                 con->error_msg = "io error";
2025                                 break;
2026                         }
2027                         goto out;
2028                 }
2029                 if (con->in_tag == CEPH_MSGR_TAG_READY)
2030                         goto more;
2031                 process_message(con);
2032                 goto more;
2033         }
2034         if (con->in_tag == CEPH_MSGR_TAG_ACK) {
2035                 ret = read_partial_ack(con);
2036                 if (ret <= 0)
2037                         goto out;
2038                 process_ack(con);
2039                 goto more;
2040         }
2041
2042 out:
2043         dout("try_read done on %p ret %d\n", con, ret);
2044         return ret;
2045
2046 bad_tag:
2047         pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
2048         con->error_msg = "protocol error, garbage tag";
2049         ret = -1;
2050         goto out;
2051 }
2052
2053
2054 /*
2055  * Atomically queue work on a connection.  Bump @con reference to
2056  * avoid races with connection teardown.
2057  */
2058 static void queue_con(struct ceph_connection *con)
2059 {
2060         if (test_bit(DEAD, &con->state)) {
2061                 dout("queue_con %p ignoring: DEAD\n",
2062                      con);
2063                 return;
2064         }
2065
2066         if (!con->ops->get(con)) {
2067                 dout("queue_con %p ref count 0\n", con);
2068                 return;
2069         }
2070
2071         if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
2072                 dout("queue_con %p - already queued\n", con);
2073                 con->ops->put(con);
2074         } else {
2075                 dout("queue_con %p\n", con);
2076         }
2077 }
2078
2079 /*
2080  * Do some work on a connection.  Drop a connection ref when we're done.
2081  */
2082 static void con_work(struct work_struct *work)
2083 {
2084         struct ceph_connection *con = container_of(work, struct ceph_connection,
2085                                                    work.work);
2086         int ret;
2087
2088         mutex_lock(&con->mutex);
2089 restart:
2090         if (test_and_clear_bit(BACKOFF, &con->state)) {
2091                 dout("con_work %p backing off\n", con);
2092                 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2093                                        round_jiffies_relative(con->delay))) {
2094                         dout("con_work %p backoff %lu\n", con, con->delay);
2095                         mutex_unlock(&con->mutex);
2096                         return;
2097                 } else {
2098                         con->ops->put(con);
2099                         dout("con_work %p FAILED to back off %lu\n", con,
2100                              con->delay);
2101                 }
2102         }
2103
2104         if (test_bit(STANDBY, &con->state)) {
2105                 dout("con_work %p STANDBY\n", con);
2106                 goto done;
2107         }
2108         if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
2109                 dout("con_work CLOSED\n");
2110                 con_close_socket(con);
2111                 goto done;
2112         }
2113         if (test_and_clear_bit(OPENING, &con->state)) {
2114                 /* reopen w/ new peer */
2115                 dout("con_work OPENING\n");
2116                 con_close_socket(con);
2117         }
2118
2119         if (test_and_clear_bit(SOCK_CLOSED, &con->state))
2120                 goto fault;
2121
2122         ret = try_read(con);
2123         if (ret == -EAGAIN)
2124                 goto restart;
2125         if (ret < 0)
2126                 goto fault;
2127
2128         ret = try_write(con);
2129         if (ret == -EAGAIN)
2130                 goto restart;
2131         if (ret < 0)
2132                 goto fault;
2133
2134 done:
2135         mutex_unlock(&con->mutex);
2136 done_unlocked:
2137         con->ops->put(con);
2138         return;
2139
2140 fault:
2141         mutex_unlock(&con->mutex);
2142         ceph_fault(con);     /* error/fault path */
2143         goto done_unlocked;
2144 }
2145
2146
2147 /*
2148  * Generic error/fault handler.  A retry mechanism is used with
2149  * exponential backoff
2150  */
2151 static void ceph_fault(struct ceph_connection *con)
2152 {
2153         pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2154                ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2155         dout("fault %p state %lu to peer %s\n",
2156              con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2157
2158         if (test_bit(LOSSYTX, &con->state)) {
2159                 dout("fault on LOSSYTX channel\n");
2160                 goto out;
2161         }
2162
2163         mutex_lock(&con->mutex);
2164         if (test_bit(CLOSED, &con->state))
2165                 goto out_unlock;
2166
2167         con_close_socket(con);
2168
2169         if (con->in_msg) {
2170                 ceph_msg_put(con->in_msg);
2171                 con->in_msg = NULL;
2172         }
2173
2174         /* Requeue anything that hasn't been acked */
2175         list_splice_init(&con->out_sent, &con->out_queue);
2176
2177         /* If there are no messages queued or keepalive pending, place
2178          * the connection in a STANDBY state */
2179         if (list_empty(&con->out_queue) &&
2180             !test_bit(KEEPALIVE_PENDING, &con->state)) {
2181                 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2182                 clear_bit(WRITE_PENDING, &con->state);
2183                 set_bit(STANDBY, &con->state);
2184         } else {
2185                 /* retry after a delay. */
2186                 if (con->delay == 0)
2187                         con->delay = BASE_DELAY_INTERVAL;
2188                 else if (con->delay < MAX_DELAY_INTERVAL)
2189                         con->delay *= 2;
2190                 con->ops->get(con);
2191                 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2192                                        round_jiffies_relative(con->delay))) {
2193                         dout("fault queued %p delay %lu\n", con, con->delay);
2194                 } else {
2195                         con->ops->put(con);
2196                         dout("fault failed to queue %p delay %lu, backoff\n",
2197                              con, con->delay);
2198                         /*
2199                          * In many cases we see a socket state change
2200                          * while con_work is running and end up
2201                          * queuing (non-delayed) work, such that we
2202                          * can't backoff with a delay.  Set a flag so
2203                          * that when con_work restarts we schedule the
2204                          * delay then.
2205                          */
2206                         set_bit(BACKOFF, &con->state);
2207                 }
2208         }
2209
2210 out_unlock:
2211         mutex_unlock(&con->mutex);
2212 out:
2213         /*
2214          * in case we faulted due to authentication, invalidate our
2215          * current tickets so that we can get new ones.
2216          */
2217         if (con->auth_retry && con->ops->invalidate_authorizer) {
2218                 dout("calling invalidate_authorizer()\n");
2219                 con->ops->invalidate_authorizer(con);
2220         }
2221
2222         if (con->ops->fault)
2223                 con->ops->fault(con);
2224 }
2225
2226
2227
2228 /*
2229  * create a new messenger instance
2230  */
2231 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
2232                                              u32 supported_features,
2233                                              u32 required_features)
2234 {
2235         struct ceph_messenger *msgr;
2236
2237         msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
2238         if (msgr == NULL)
2239                 return ERR_PTR(-ENOMEM);
2240
2241         msgr->supported_features = supported_features;
2242         msgr->required_features = required_features;
2243
2244         spin_lock_init(&msgr->global_seq_lock);
2245
2246         /* the zero page is needed if a request is "canceled" while the message
2247          * is being written over the socket */
2248         msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
2249         if (!msgr->zero_page) {
2250                 kfree(msgr);
2251                 return ERR_PTR(-ENOMEM);
2252         }
2253         kmap(msgr->zero_page);
2254
2255         if (myaddr)
2256                 msgr->inst.addr = *myaddr;
2257
2258         /* select a random nonce */
2259         msgr->inst.addr.type = 0;
2260         get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2261         encode_my_addr(msgr);
2262
2263         dout("messenger_create %p\n", msgr);
2264         return msgr;
2265 }
2266 EXPORT_SYMBOL(ceph_messenger_create);
2267
2268 void ceph_messenger_destroy(struct ceph_messenger *msgr)
2269 {
2270         dout("destroy %p\n", msgr);
2271         kunmap(msgr->zero_page);
2272         __free_page(msgr->zero_page);
2273         kfree(msgr);
2274         dout("destroyed messenger %p\n", msgr);
2275 }
2276 EXPORT_SYMBOL(ceph_messenger_destroy);
2277
2278 static void clear_standby(struct ceph_connection *con)
2279 {
2280         /* come back from STANDBY? */
2281         if (test_and_clear_bit(STANDBY, &con->state)) {
2282                 mutex_lock(&con->mutex);
2283                 dout("clear_standby %p and ++connect_seq\n", con);
2284                 con->connect_seq++;
2285                 WARN_ON(test_bit(WRITE_PENDING, &con->state));
2286                 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
2287                 mutex_unlock(&con->mutex);
2288         }
2289 }
2290
2291 /*
2292  * Queue up an outgoing message on the given connection.
2293  */
2294 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2295 {
2296         if (test_bit(CLOSED, &con->state)) {
2297                 dout("con_send %p closed, dropping %p\n", con, msg);
2298                 ceph_msg_put(msg);
2299                 return;
2300         }
2301
2302         /* set src+dst */
2303         msg->hdr.src = con->msgr->inst.name;
2304
2305         BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
2306
2307         msg->needs_out_seq = true;
2308
2309         /* queue */
2310         mutex_lock(&con->mutex);
2311         BUG_ON(!list_empty(&msg->list_head));
2312         list_add_tail(&msg->list_head, &con->out_queue);
2313         dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
2314              ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
2315              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
2316              le32_to_cpu(msg->hdr.front_len),
2317              le32_to_cpu(msg->hdr.middle_len),
2318              le32_to_cpu(msg->hdr.data_len));
2319         mutex_unlock(&con->mutex);
2320
2321         /* if there wasn't anything waiting to send before, queue
2322          * new work */
2323         clear_standby(con);
2324         if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2325                 queue_con(con);
2326 }
2327 EXPORT_SYMBOL(ceph_con_send);
2328
2329 /*
2330  * Revoke a message that was previously queued for send
2331  */
2332 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2333 {
2334         mutex_lock(&con->mutex);
2335         if (!list_empty(&msg->list_head)) {
2336                 dout("con_revoke %p msg %p - was on queue\n", con, msg);
2337                 list_del_init(&msg->list_head);
2338                 ceph_msg_put(msg);
2339                 msg->hdr.seq = 0;
2340         }
2341         if (con->out_msg == msg) {
2342                 dout("con_revoke %p msg %p - was sending\n", con, msg);
2343                 con->out_msg = NULL;
2344                 if (con->out_kvec_is_msg) {
2345                         con->out_skip = con->out_kvec_bytes;
2346                         con->out_kvec_is_msg = false;
2347                 }
2348                 ceph_msg_put(msg);
2349                 msg->hdr.seq = 0;
2350         }
2351         mutex_unlock(&con->mutex);
2352 }
2353
2354 /*
2355  * Revoke a message that we may be reading data into
2356  */
2357 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2358 {
2359         mutex_lock(&con->mutex);
2360         if (con->in_msg && con->in_msg == msg) {
2361                 unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
2362                 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
2363                 unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
2364
2365                 /* skip rest of message */
2366                 dout("con_revoke_pages %p msg %p revoked\n", con, msg);
2367                         con->in_base_pos = con->in_base_pos -
2368                                 sizeof(struct ceph_msg_header) -
2369                                 front_len -
2370                                 middle_len -
2371                                 data_len -
2372                                 sizeof(struct ceph_msg_footer);
2373                 ceph_msg_put(con->in_msg);
2374                 con->in_msg = NULL;
2375                 con->in_tag = CEPH_MSGR_TAG_READY;
2376                 con->in_seq++;
2377         } else {
2378                 dout("con_revoke_pages %p msg %p pages %p no-op\n",
2379                      con, con->in_msg, msg);
2380         }
2381         mutex_unlock(&con->mutex);
2382 }
2383
2384 /*
2385  * Queue a keepalive byte to ensure the tcp connection is alive.
2386  */
2387 void ceph_con_keepalive(struct ceph_connection *con)
2388 {
2389         dout("con_keepalive %p\n", con);
2390         clear_standby(con);
2391         if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
2392             test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2393                 queue_con(con);
2394 }
2395 EXPORT_SYMBOL(ceph_con_keepalive);
2396
2397
2398 /*
2399  * construct a new message with given type, size
2400  * the new msg has a ref count of 1.
2401  */
2402 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2403                               bool can_fail)
2404 {
2405         struct ceph_msg *m;
2406
2407         m = kmalloc(sizeof(*m), flags);
2408         if (m == NULL)
2409                 goto out;
2410         kref_init(&m->kref);
2411         INIT_LIST_HEAD(&m->list_head);
2412
2413         m->hdr.tid = 0;
2414         m->hdr.type = cpu_to_le16(type);
2415         m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
2416         m->hdr.version = 0;
2417         m->hdr.front_len = cpu_to_le32(front_len);
2418         m->hdr.middle_len = 0;
2419         m->hdr.data_len = 0;
2420         m->hdr.data_off = 0;
2421         m->hdr.reserved = 0;
2422         m->footer.front_crc = 0;
2423         m->footer.middle_crc = 0;
2424         m->footer.data_crc = 0;
2425         m->footer.flags = 0;
2426         m->front_max = front_len;
2427         m->front_is_vmalloc = false;
2428         m->more_to_follow = false;
2429         m->ack_stamp = 0;
2430         m->pool = NULL;
2431
2432         /* middle */
2433         m->middle = NULL;
2434
2435         /* data */
2436         m->nr_pages = 0;
2437         m->page_alignment = 0;
2438         m->pages = NULL;
2439         m->pagelist = NULL;
2440         m->bio = NULL;
2441         m->bio_iter = NULL;
2442         m->bio_seg = 0;
2443         m->trail = NULL;
2444
2445         /* front */
2446         if (front_len) {
2447                 if (front_len > PAGE_CACHE_SIZE) {
2448                         m->front.iov_base = __vmalloc(front_len, flags,
2449                                                       PAGE_KERNEL);
2450                         m->front_is_vmalloc = true;
2451                 } else {
2452                         m->front.iov_base = kmalloc(front_len, flags);
2453                 }
2454                 if (m->front.iov_base == NULL) {
2455                         dout("ceph_msg_new can't allocate %d bytes\n",
2456                              front_len);
2457                         goto out2;
2458                 }
2459         } else {
2460                 m->front.iov_base = NULL;
2461         }
2462         m->front.iov_len = front_len;
2463
2464         dout("ceph_msg_new %p front %d\n", m, front_len);
2465         return m;
2466
2467 out2:
2468         ceph_msg_put(m);
2469 out:
2470         if (!can_fail) {
2471                 pr_err("msg_new can't create type %d front %d\n", type,
2472                        front_len);
2473                 WARN_ON(1);
2474         } else {
2475                 dout("msg_new can't create type %d front %d\n", type,
2476                      front_len);
2477         }
2478         return NULL;
2479 }
2480 EXPORT_SYMBOL(ceph_msg_new);
2481
2482 /*
2483  * Allocate "middle" portion of a message, if it is needed and wasn't
2484  * allocated by alloc_msg.  This allows us to read a small fixed-size
2485  * per-type header in the front and then gracefully fail (i.e.,
2486  * propagate the error to the caller based on info in the front) when
2487  * the middle is too large.
2488  */
2489 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2490 {
2491         int type = le16_to_cpu(msg->hdr.type);
2492         int middle_len = le32_to_cpu(msg->hdr.middle_len);
2493
2494         dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
2495              ceph_msg_type_name(type), middle_len);
2496         BUG_ON(!middle_len);
2497         BUG_ON(msg->middle);
2498
2499         msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
2500         if (!msg->middle)
2501                 return -ENOMEM;
2502         return 0;
2503 }
2504
2505 /*
2506  * Generic message allocator, for incoming messages.
2507  */
2508 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2509                                 struct ceph_msg_header *hdr,
2510                                 int *skip)
2511 {
2512         int type = le16_to_cpu(hdr->type);
2513         int front_len = le32_to_cpu(hdr->front_len);
2514         int middle_len = le32_to_cpu(hdr->middle_len);
2515         struct ceph_msg *msg = NULL;
2516         int ret;
2517
2518         if (con->ops->alloc_msg) {
2519                 mutex_unlock(&con->mutex);
2520                 msg = con->ops->alloc_msg(con, hdr, skip);
2521                 mutex_lock(&con->mutex);
2522                 if (!msg || *skip)
2523                         return NULL;
2524         }
2525         if (!msg) {
2526                 *skip = 0;
2527                 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2528                 if (!msg) {
2529                         pr_err("unable to allocate msg type %d len %d\n",
2530                                type, front_len);
2531                         return NULL;
2532                 }
2533                 msg->page_alignment = le16_to_cpu(hdr->data_off);
2534         }
2535         memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2536
2537         if (middle_len && !msg->middle) {
2538                 ret = ceph_alloc_middle(con, msg);
2539                 if (ret < 0) {
2540                         ceph_msg_put(msg);
2541                         return NULL;
2542                 }
2543         }
2544
2545         return msg;
2546 }
2547
2548
2549 /*
2550  * Free a generically kmalloc'd message.
2551  */
2552 void ceph_msg_kfree(struct ceph_msg *m)
2553 {
2554         dout("msg_kfree %p\n", m);
2555         if (m->front_is_vmalloc)
2556                 vfree(m->front.iov_base);
2557         else
2558                 kfree(m->front.iov_base);
2559         kfree(m);
2560 }
2561
2562 /*
2563  * Drop a msg ref.  Destroy as needed.
2564  */
2565 void ceph_msg_last_put(struct kref *kref)
2566 {
2567         struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
2568
2569         dout("ceph_msg_put last one on %p\n", m);
2570         WARN_ON(!list_empty(&m->list_head));
2571
2572         /* drop middle, data, if any */
2573         if (m->middle) {
2574                 ceph_buffer_put(m->middle);
2575                 m->middle = NULL;
2576         }
2577         m->nr_pages = 0;
2578         m->pages = NULL;
2579
2580         if (m->pagelist) {
2581                 ceph_pagelist_release(m->pagelist);
2582                 kfree(m->pagelist);
2583                 m->pagelist = NULL;
2584         }
2585
2586         m->trail = NULL;
2587
2588         if (m->pool)
2589                 ceph_msgpool_put(m->pool, m);
2590         else
2591                 ceph_msg_kfree(m);
2592 }
2593 EXPORT_SYMBOL(ceph_msg_last_put);
2594
2595 void ceph_msg_dump(struct ceph_msg *msg)
2596 {
2597         pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
2598                  msg->front_max, msg->nr_pages);
2599         print_hex_dump(KERN_DEBUG, "header: ",
2600                        DUMP_PREFIX_OFFSET, 16, 1,
2601                        &msg->hdr, sizeof(msg->hdr), true);
2602         print_hex_dump(KERN_DEBUG, " front: ",
2603                        DUMP_PREFIX_OFFSET, 16, 1,
2604                        msg->front.iov_base, msg->front.iov_len, true);
2605         if (msg->middle)
2606                 print_hex_dump(KERN_DEBUG, "middle: ",
2607                                DUMP_PREFIX_OFFSET, 16, 1,
2608                                msg->middle->vec.iov_base,
2609                                msg->middle->vec.iov_len, true);
2610         print_hex_dump(KERN_DEBUG, "footer: ",
2611                        DUMP_PREFIX_OFFSET, 16, 1,
2612                        &msg->footer, sizeof(msg->footer), true);
2613 }
2614 EXPORT_SYMBOL(ceph_msg_dump);