Loading include/linux/ceph/messenger.h +38 −1 Original line number Diff line number Diff line Loading @@ -345,13 +345,50 @@ struct ceph_connection { extern struct page *ceph_zero_page; void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag); void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag); bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag); bool ceph_con_flag_test_and_clear(struct ceph_connection *con, unsigned long con_flag); bool ceph_con_flag_test_and_set(struct ceph_connection *con, unsigned long con_flag); void ceph_encode_my_addr(struct ceph_messenger *msgr); int ceph_tcp_connect(struct ceph_connection *con); int ceph_con_close_socket(struct ceph_connection *con); void ceph_con_reset_session(struct ceph_connection *con); u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt); void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq); void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq); void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, struct ceph_msg *msg, size_t length); struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, size_t *page_offset, size_t *length, bool *last_piece); void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes); u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, unsigned int length); bool ceph_addr_is_blank(const struct ceph_entity_addr *addr); int ceph_addr_port(const struct ceph_entity_addr *addr); void ceph_addr_set_port(struct ceph_entity_addr *addr, int p); void ceph_con_process_message(struct ceph_connection *con); int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); void ceph_con_get_out_msg(struct ceph_connection *con); extern const char *ceph_pr_addr(const struct ceph_entity_addr *addr); extern int ceph_parse_ips(const char *c, const char *end, struct ceph_entity_addr *addr, int max_count, int *count); extern int ceph_msgr_init(void); extern void ceph_msgr_exit(void); extern void ceph_msgr_flush(void); Loading net/ceph/messenger.c +75 −82 Original line number Diff line number Diff line Loading @@ -96,28 +96,28 @@ static bool con_flag_valid(unsigned long con_flag) } } static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); clear_bit(con_flag, &con->flags); } static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); set_bit(con_flag, &con->flags); } static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); return test_bit(con_flag, &con->flags); } static bool con_flag_test_and_clear(struct ceph_connection *con, bool ceph_con_flag_test_and_clear(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); Loading @@ -125,7 +125,7 @@ static bool con_flag_test_and_clear(struct ceph_connection *con, return test_and_clear_bit(con_flag, &con->flags); } static bool con_flag_test_and_set(struct ceph_connection *con, bool ceph_con_flag_test_and_set(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); Loading Loading @@ -199,7 +199,7 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr) } EXPORT_SYMBOL(ceph_pr_addr); static void encode_my_addr(struct ceph_messenger *msgr) void ceph_encode_my_addr(struct ceph_messenger *msgr) { memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); ceph_encode_banner_addr(&msgr->my_enc_addr); Loading Loading @@ -370,7 +370,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ if (con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (sk_stream_is_writeable(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); Loading @@ -396,7 +396,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: Loading Loading @@ -430,13 +430,15 @@ static void set_sock_callbacks(struct socket *sock, /* * initiate connection to a remote socket. */ static int ceph_tcp_connect(struct ceph_connection *con) int ceph_tcp_connect(struct ceph_connection *con) { struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ struct socket *sock; unsigned int noio_flag; int ret; dout("%s con %p peer_addr %s\n", __func__, con, ceph_pr_addr(&con->peer_addr)); BUG_ON(con->sock); /* sock_create_kern() allocates with GFP_KERNEL */ Loading @@ -454,8 +456,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) set_sock_callbacks(sock, con); dout("connect %s\n", ceph_pr_addr(&con->peer_addr)); con_sock_state_connecting(con); ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss), O_NONBLOCK); Loading Loading @@ -570,11 +570,11 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, /* * Shutdown/close the socket for the given connection. */ static int con_close_socket(struct ceph_connection *con) int ceph_con_close_socket(struct ceph_connection *con) { int rc = 0; dout("con_close_socket on %p sock %p\n", con, con->sock); dout("%s con %p sock %p\n", __func__, con, con->sock); if (con->sock) { rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); Loading @@ -587,7 +587,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); con_sock_state_closed(con); return rc; Loading @@ -597,7 +597,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); con_close_socket(con); ceph_con_close_socket(con); if (con->in_msg) { WARN_ON(con->in_msg->con != con); ceph_msg_put(con->in_msg); Loading Loading @@ -631,7 +631,7 @@ static void ceph_msg_remove_list(struct list_head *head) } } static void ceph_con_reset_session(struct ceph_connection *con) void ceph_con_reset_session(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); Loading @@ -656,10 +656,11 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); con->state = CEPH_CON_S_CLOSED; con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_reset_protocol(con); ceph_con_reset_session(con); Loading Loading @@ -728,7 +729,7 @@ EXPORT_SYMBOL(ceph_con_init); * We maintain a global counter to order connection attempts. Get * a unique seq greater than @gt. */ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) { u32 ret; Loading @@ -743,7 +744,7 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) /* * Discard messages that have been acked by the server. */ static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) { struct ceph_msg *msg; u64 seq; Loading @@ -768,8 +769,7 @@ static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) * reconnect_seq. This avoids gratuitously resending messages that * the server had received and handled prior to reconnect. */ static void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) { struct ceph_msg *msg; u64 seq; Loading Loading @@ -1150,7 +1150,7 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) cursor->need_crc = true; } static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, struct ceph_msg *msg, size_t length) { BUG_ON(!length); Loading @@ -1168,7 +1168,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, * data item, and supply the page offset and length of that piece. * Indicate whether this is the last piece in this data item. */ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, size_t *page_offset, size_t *length, bool *last_piece) { Loading Loading @@ -1209,8 +1209,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, * Returns true if the result moves the cursor on to the next piece * of the data item. */ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) { bool new_piece; Loading Loading @@ -1284,8 +1283,6 @@ static void prepare_write_message_footer(struct ceph_connection *con) con->out_msg_done = true; } static void ceph_con_get_out_msg(struct ceph_connection *con); /* * Prepare headers for the next outgoing message. */ Loading Loading @@ -1355,7 +1352,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1376,7 +1373,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1394,7 +1391,7 @@ static void prepare_write_seq(struct ceph_connection *con) con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1415,7 +1412,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) } else { con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); } con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading Loading @@ -1454,7 +1451,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static void __prepare_write_connect(struct ceph_connection *con) Loading @@ -1465,12 +1462,12 @@ static void __prepare_write_connect(struct ceph_connection *con) con->auth->authorizer_buf); con->out_more = 0; con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) { unsigned int global_seq = get_global_seq(con->msgr, 0); unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); int proto; int ret; Loading Loading @@ -1549,8 +1546,7 @@ static int write_partial_kvec(struct ceph_connection *con) return ret; /* done! */ } static u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, unsigned int length) { char *kaddr; Loading Loading @@ -1813,7 +1809,7 @@ static int verify_hello(struct ceph_connection *con) return 0; } static bool addr_is_blank(struct ceph_entity_addr *addr) bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) { struct sockaddr_storage ss = addr->in_addr; /* align */ struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; Loading @@ -1829,7 +1825,7 @@ static bool addr_is_blank(struct ceph_entity_addr *addr) } } static int addr_port(struct ceph_entity_addr *addr) int ceph_addr_port(const struct ceph_entity_addr *addr) { switch (get_unaligned(&addr->in_addr.ss_family)) { case AF_INET: Loading @@ -1840,7 +1836,7 @@ static int addr_port(struct ceph_entity_addr *addr) return 0; } static void addr_set_port(struct ceph_entity_addr *addr, int p) void ceph_addr_set_port(struct ceph_entity_addr *addr, int p) { switch (get_unaligned(&addr->in_addr.ss_family)) { case AF_INET: Loading Loading @@ -1998,7 +1994,7 @@ int ceph_parse_ips(const char *c, const char *end, port = CEPH_MON_PORT; } addr_set_port(&addr[i], port); ceph_addr_set_port(&addr[i], port); addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); Loading Loading @@ -2037,7 +2033,7 @@ static int process_banner(struct ceph_connection *con) */ if (memcmp(&con->peer_addr, &con->actual_peer_addr, sizeof(con->peer_addr)) != 0 && !(addr_is_blank(&con->actual_peer_addr) && !(ceph_addr_is_blank(&con->actual_peer_addr) && con->actual_peer_addr.nonce == con->peer_addr.nonce)) { pr_warn("wrong peer, want %s/%u, got %s/%u\n", ceph_pr_addr(&con->peer_addr), Loading @@ -2051,12 +2047,12 @@ static int process_banner(struct ceph_connection *con) /* * did we learn our address? */ if (addr_is_blank(my_addr)) { if (ceph_addr_is_blank(my_addr)) { memcpy(&my_addr->in_addr, &con->peer_addr_for_me.in_addr, sizeof(con->peer_addr_for_me.in_addr)); addr_set_port(my_addr, 0); encode_my_addr(con->msgr); ceph_addr_set_port(my_addr, 0); ceph_encode_my_addr(con->msgr); dout("process_banner learned my addr is %s\n", ceph_pr_addr(my_addr)); } Loading Loading @@ -2192,7 +2188,7 @@ static int process_connect(struct ceph_connection *con) dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", con->peer_global_seq, le32_to_cpu(con->in_reply.global_seq)); get_global_seq(con->msgr, ceph_get_global_seq(con->msgr, le32_to_cpu(con->in_reply.global_seq)); con_out_kvec_reset(con); ret = prepare_write_connect(con); Loading Loading @@ -2227,7 +2223,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) con_flag_set(con, CEPH_CON_F_LOSSYTX); ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); con->delay = 0; /* reset backoff memory */ Loading Loading @@ -2351,9 +2347,6 @@ static int read_partial_msg_data(struct ceph_connection *con) /* * read (part of) a message. */ static int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; Loading Loading @@ -2515,7 +2508,7 @@ static int read_partial_message(struct ceph_connection *con) * be careful not to do anything that waits on other incoming messages or it * may deadlock. */ static void process_message(struct ceph_connection *con) void ceph_con_process_message(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; Loading Loading @@ -2628,7 +2621,7 @@ static int try_write(struct ceph_connection *con) do_next: if (con->state == CEPH_CON_S_OPEN) { if (con_flag_test_and_clear(con, if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; Loading @@ -2645,7 +2638,7 @@ static int try_write(struct ceph_connection *con) } /* Nothing to do! */ con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: Loading Loading @@ -2739,7 +2732,7 @@ static int try_read(struct ceph_connection *con) prepare_read_keepalive_ack(con); break; case CEPH_MSGR_TAG_CLOSE: con_close_socket(con); ceph_con_close_socket(con); con->state = CEPH_CON_S_CLOSED; goto out; default: Loading @@ -2764,7 +2757,7 @@ static int try_read(struct ceph_connection *con) } if (con->in_tag == CEPH_MSGR_TAG_READY) goto more; process_message(con); ceph_con_process_message(con); if (con->state == CEPH_CON_S_OPEN) prepare_read_tag(con); goto more; Loading Loading @@ -2840,7 +2833,7 @@ static void cancel_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { if (!con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) return false; #define CASE(x) \ Loading @@ -2867,7 +2860,7 @@ static bool con_backoff(struct ceph_connection *con) { int ret; if (!con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) return false; ret = queue_con_delay(con, con->delay); Loading @@ -2875,7 +2868,7 @@ static bool con_backoff(struct ceph_connection *con) dout("%s: con %p FAILED to back off %lu\n", __func__, con, con->delay); BUG_ON(ret == -ENOENT); con_flag_set(con, CEPH_CON_F_BACKOFF); ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); } return true; Loading Loading @@ -2987,7 +2980,7 @@ static void con_fault(struct ceph_connection *con) ceph_con_reset_protocol(con); if (con_flag_test(con, CEPH_CON_F_LOSSYTX)) { if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CEPH_CON_S_CLOSED; return; Loading @@ -2999,9 +2992,9 @@ static void con_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && !con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con->state = CEPH_CON_S_STANDBY; } else { /* retry after a delay. */ Loading @@ -3013,7 +3006,7 @@ static void con_fault(struct ceph_connection *con) if (con->delay > MAX_DELAY_INTERVAL) con->delay = MAX_DELAY_INTERVAL; } con_flag_set(con, CEPH_CON_F_BACKOFF); ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); queue_con(con); } } Loading @@ -3023,7 +3016,7 @@ void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) { u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; msgr->inst.addr.nonce = cpu_to_le32(nonce); encode_my_addr(msgr); ceph_encode_my_addr(msgr); } /* Loading @@ -3037,7 +3030,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, if (myaddr) { memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, sizeof(msgr->inst.addr.in_addr)); addr_set_port(&msgr->inst.addr, 0); ceph_addr_set_port(&msgr->inst.addr, 0); } msgr->inst.addr.type = 0; Loading @@ -3047,7 +3040,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); } while (!msgr->inst.addr.nonce); encode_my_addr(msgr); ceph_encode_my_addr(msgr); atomic_set(&msgr->stopping, 0); write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); Loading Loading @@ -3076,8 +3069,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CEPH_CON_S_PREOPEN; con->connect_seq++; WARN_ON(con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); } } Loading Loading @@ -3118,7 +3111,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); Loading Loading @@ -3214,10 +3207,10 @@ void ceph_con_keepalive(struct ceph_connection *con) dout("con_keepalive %p\n", con); mutex_lock(&con->mutex); clear_standby(con); con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); mutex_unlock(&con->mutex); if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); Loading Loading @@ -3423,7 +3416,7 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) * On error (ENOMEM, EAGAIN, ...), * - con->in_msg == NULL */ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip) { int middle_len = le32_to_cpu(hdr->middle_len); Loading Loading @@ -3470,7 +3463,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, return ret; } static void ceph_con_get_out_msg(struct ceph_connection *con) void ceph_con_get_out_msg(struct ceph_connection *con) { struct ceph_msg *msg; Loading Loading
include/linux/ceph/messenger.h +38 −1 Original line number Diff line number Diff line Loading @@ -345,13 +345,50 @@ struct ceph_connection { extern struct page *ceph_zero_page; void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag); void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag); bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag); bool ceph_con_flag_test_and_clear(struct ceph_connection *con, unsigned long con_flag); bool ceph_con_flag_test_and_set(struct ceph_connection *con, unsigned long con_flag); void ceph_encode_my_addr(struct ceph_messenger *msgr); int ceph_tcp_connect(struct ceph_connection *con); int ceph_con_close_socket(struct ceph_connection *con); void ceph_con_reset_session(struct ceph_connection *con); u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt); void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq); void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq); void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, struct ceph_msg *msg, size_t length); struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, size_t *page_offset, size_t *length, bool *last_piece); void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes); u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, unsigned int length); bool ceph_addr_is_blank(const struct ceph_entity_addr *addr); int ceph_addr_port(const struct ceph_entity_addr *addr); void ceph_addr_set_port(struct ceph_entity_addr *addr, int p); void ceph_con_process_message(struct ceph_connection *con); int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); void ceph_con_get_out_msg(struct ceph_connection *con); extern const char *ceph_pr_addr(const struct ceph_entity_addr *addr); extern int ceph_parse_ips(const char *c, const char *end, struct ceph_entity_addr *addr, int max_count, int *count); extern int ceph_msgr_init(void); extern void ceph_msgr_exit(void); extern void ceph_msgr_flush(void); Loading
net/ceph/messenger.c +75 −82 Original line number Diff line number Diff line Loading @@ -96,28 +96,28 @@ static bool con_flag_valid(unsigned long con_flag) } } static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); clear_bit(con_flag, &con->flags); } static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); set_bit(con_flag, &con->flags); } static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); return test_bit(con_flag, &con->flags); } static bool con_flag_test_and_clear(struct ceph_connection *con, bool ceph_con_flag_test_and_clear(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); Loading @@ -125,7 +125,7 @@ static bool con_flag_test_and_clear(struct ceph_connection *con, return test_and_clear_bit(con_flag, &con->flags); } static bool con_flag_test_and_set(struct ceph_connection *con, bool ceph_con_flag_test_and_set(struct ceph_connection *con, unsigned long con_flag) { BUG_ON(!con_flag_valid(con_flag)); Loading Loading @@ -199,7 +199,7 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr) } EXPORT_SYMBOL(ceph_pr_addr); static void encode_my_addr(struct ceph_messenger *msgr) void ceph_encode_my_addr(struct ceph_messenger *msgr) { memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); ceph_encode_banner_addr(&msgr->my_enc_addr); Loading Loading @@ -370,7 +370,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ if (con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (sk_stream_is_writeable(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); Loading @@ -396,7 +396,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: Loading Loading @@ -430,13 +430,15 @@ static void set_sock_callbacks(struct socket *sock, /* * initiate connection to a remote socket. */ static int ceph_tcp_connect(struct ceph_connection *con) int ceph_tcp_connect(struct ceph_connection *con) { struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ struct socket *sock; unsigned int noio_flag; int ret; dout("%s con %p peer_addr %s\n", __func__, con, ceph_pr_addr(&con->peer_addr)); BUG_ON(con->sock); /* sock_create_kern() allocates with GFP_KERNEL */ Loading @@ -454,8 +456,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) set_sock_callbacks(sock, con); dout("connect %s\n", ceph_pr_addr(&con->peer_addr)); con_sock_state_connecting(con); ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss), O_NONBLOCK); Loading Loading @@ -570,11 +570,11 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, /* * Shutdown/close the socket for the given connection. */ static int con_close_socket(struct ceph_connection *con) int ceph_con_close_socket(struct ceph_connection *con) { int rc = 0; dout("con_close_socket on %p sock %p\n", con, con->sock); dout("%s con %p sock %p\n", __func__, con, con->sock); if (con->sock) { rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); sock_release(con->sock); Loading @@ -587,7 +587,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); con_sock_state_closed(con); return rc; Loading @@ -597,7 +597,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); con_close_socket(con); ceph_con_close_socket(con); if (con->in_msg) { WARN_ON(con->in_msg->con != con); ceph_msg_put(con->in_msg); Loading Loading @@ -631,7 +631,7 @@ static void ceph_msg_remove_list(struct list_head *head) } } static void ceph_con_reset_session(struct ceph_connection *con) void ceph_con_reset_session(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); Loading @@ -656,10 +656,11 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); con->state = CEPH_CON_S_CLOSED; con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_reset_protocol(con); ceph_con_reset_session(con); Loading Loading @@ -728,7 +729,7 @@ EXPORT_SYMBOL(ceph_con_init); * We maintain a global counter to order connection attempts. Get * a unique seq greater than @gt. */ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) { u32 ret; Loading @@ -743,7 +744,7 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) /* * Discard messages that have been acked by the server. */ static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) { struct ceph_msg *msg; u64 seq; Loading @@ -768,8 +769,7 @@ static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) * reconnect_seq. This avoids gratuitously resending messages that * the server had received and handled prior to reconnect. */ static void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) { struct ceph_msg *msg; u64 seq; Loading Loading @@ -1150,7 +1150,7 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) cursor->need_crc = true; } static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, struct ceph_msg *msg, size_t length) { BUG_ON(!length); Loading @@ -1168,7 +1168,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, * data item, and supply the page offset and length of that piece. * Indicate whether this is the last piece in this data item. */ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, size_t *page_offset, size_t *length, bool *last_piece) { Loading Loading @@ -1209,8 +1209,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, * Returns true if the result moves the cursor on to the next piece * of the data item. */ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) { bool new_piece; Loading Loading @@ -1284,8 +1283,6 @@ static void prepare_write_message_footer(struct ceph_connection *con) con->out_msg_done = true; } static void ceph_con_get_out_msg(struct ceph_connection *con); /* * Prepare headers for the next outgoing message. */ Loading Loading @@ -1355,7 +1352,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1376,7 +1373,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1394,7 +1391,7 @@ static void prepare_write_seq(struct ceph_connection *con) con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1415,7 +1412,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) } else { con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); } con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading Loading @@ -1454,7 +1451,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static void __prepare_write_connect(struct ceph_connection *con) Loading @@ -1465,12 +1462,12 @@ static void __prepare_write_connect(struct ceph_connection *con) con->auth->authorizer_buf); con->out_more = 0; con_flag_set(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) { unsigned int global_seq = get_global_seq(con->msgr, 0); unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); int proto; int ret; Loading Loading @@ -1549,8 +1546,7 @@ static int write_partial_kvec(struct ceph_connection *con) return ret; /* done! */ } static u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, unsigned int length) { char *kaddr; Loading Loading @@ -1813,7 +1809,7 @@ static int verify_hello(struct ceph_connection *con) return 0; } static bool addr_is_blank(struct ceph_entity_addr *addr) bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) { struct sockaddr_storage ss = addr->in_addr; /* align */ struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; Loading @@ -1829,7 +1825,7 @@ static bool addr_is_blank(struct ceph_entity_addr *addr) } } static int addr_port(struct ceph_entity_addr *addr) int ceph_addr_port(const struct ceph_entity_addr *addr) { switch (get_unaligned(&addr->in_addr.ss_family)) { case AF_INET: Loading @@ -1840,7 +1836,7 @@ static int addr_port(struct ceph_entity_addr *addr) return 0; } static void addr_set_port(struct ceph_entity_addr *addr, int p) void ceph_addr_set_port(struct ceph_entity_addr *addr, int p) { switch (get_unaligned(&addr->in_addr.ss_family)) { case AF_INET: Loading Loading @@ -1998,7 +1994,7 @@ int ceph_parse_ips(const char *c, const char *end, port = CEPH_MON_PORT; } addr_set_port(&addr[i], port); ceph_addr_set_port(&addr[i], port); addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); Loading Loading @@ -2037,7 +2033,7 @@ static int process_banner(struct ceph_connection *con) */ if (memcmp(&con->peer_addr, &con->actual_peer_addr, sizeof(con->peer_addr)) != 0 && !(addr_is_blank(&con->actual_peer_addr) && !(ceph_addr_is_blank(&con->actual_peer_addr) && con->actual_peer_addr.nonce == con->peer_addr.nonce)) { pr_warn("wrong peer, want %s/%u, got %s/%u\n", ceph_pr_addr(&con->peer_addr), Loading @@ -2051,12 +2047,12 @@ static int process_banner(struct ceph_connection *con) /* * did we learn our address? */ if (addr_is_blank(my_addr)) { if (ceph_addr_is_blank(my_addr)) { memcpy(&my_addr->in_addr, &con->peer_addr_for_me.in_addr, sizeof(con->peer_addr_for_me.in_addr)); addr_set_port(my_addr, 0); encode_my_addr(con->msgr); ceph_addr_set_port(my_addr, 0); ceph_encode_my_addr(con->msgr); dout("process_banner learned my addr is %s\n", ceph_pr_addr(my_addr)); } Loading Loading @@ -2192,7 +2188,7 @@ static int process_connect(struct ceph_connection *con) dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", con->peer_global_seq, le32_to_cpu(con->in_reply.global_seq)); get_global_seq(con->msgr, ceph_get_global_seq(con->msgr, le32_to_cpu(con->in_reply.global_seq)); con_out_kvec_reset(con); ret = prepare_write_connect(con); Loading Loading @@ -2227,7 +2223,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) con_flag_set(con, CEPH_CON_F_LOSSYTX); ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); con->delay = 0; /* reset backoff memory */ Loading Loading @@ -2351,9 +2347,6 @@ static int read_partial_msg_data(struct ceph_connection *con) /* * read (part of) a message. */ static int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); static int read_partial_message(struct ceph_connection *con) { struct ceph_msg *m = con->in_msg; Loading Loading @@ -2515,7 +2508,7 @@ static int read_partial_message(struct ceph_connection *con) * be careful not to do anything that waits on other incoming messages or it * may deadlock. */ static void process_message(struct ceph_connection *con) void ceph_con_process_message(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; Loading Loading @@ -2628,7 +2621,7 @@ static int try_write(struct ceph_connection *con) do_next: if (con->state == CEPH_CON_S_OPEN) { if (con_flag_test_and_clear(con, if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; Loading @@ -2645,7 +2638,7 @@ static int try_write(struct ceph_connection *con) } /* Nothing to do! */ con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: Loading Loading @@ -2739,7 +2732,7 @@ static int try_read(struct ceph_connection *con) prepare_read_keepalive_ack(con); break; case CEPH_MSGR_TAG_CLOSE: con_close_socket(con); ceph_con_close_socket(con); con->state = CEPH_CON_S_CLOSED; goto out; default: Loading @@ -2764,7 +2757,7 @@ static int try_read(struct ceph_connection *con) } if (con->in_tag == CEPH_MSGR_TAG_READY) goto more; process_message(con); ceph_con_process_message(con); if (con->state == CEPH_CON_S_OPEN) prepare_read_tag(con); goto more; Loading Loading @@ -2840,7 +2833,7 @@ static void cancel_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { if (!con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) return false; #define CASE(x) \ Loading @@ -2867,7 +2860,7 @@ static bool con_backoff(struct ceph_connection *con) { int ret; if (!con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) return false; ret = queue_con_delay(con, con->delay); Loading @@ -2875,7 +2868,7 @@ static bool con_backoff(struct ceph_connection *con) dout("%s: con %p FAILED to back off %lu\n", __func__, con, con->delay); BUG_ON(ret == -ENOENT); con_flag_set(con, CEPH_CON_F_BACKOFF); ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); } return true; Loading Loading @@ -2987,7 +2980,7 @@ static void con_fault(struct ceph_connection *con) ceph_con_reset_protocol(con); if (con_flag_test(con, CEPH_CON_F_LOSSYTX)) { if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CEPH_CON_S_CLOSED; return; Loading @@ -2999,9 +2992,9 @@ static void con_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && !con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con->state = CEPH_CON_S_STANDBY; } else { /* retry after a delay. */ Loading @@ -3013,7 +3006,7 @@ static void con_fault(struct ceph_connection *con) if (con->delay > MAX_DELAY_INTERVAL) con->delay = MAX_DELAY_INTERVAL; } con_flag_set(con, CEPH_CON_F_BACKOFF); ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); queue_con(con); } } Loading @@ -3023,7 +3016,7 @@ void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) { u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; msgr->inst.addr.nonce = cpu_to_le32(nonce); encode_my_addr(msgr); ceph_encode_my_addr(msgr); } /* Loading @@ -3037,7 +3030,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, if (myaddr) { memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, sizeof(msgr->inst.addr.in_addr)); addr_set_port(&msgr->inst.addr, 0); ceph_addr_set_port(&msgr->inst.addr, 0); } msgr->inst.addr.type = 0; Loading @@ -3047,7 +3040,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); } while (!msgr->inst.addr.nonce); encode_my_addr(msgr); ceph_encode_my_addr(msgr); atomic_set(&msgr->stopping, 0); write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); Loading Loading @@ -3076,8 +3069,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CEPH_CON_S_PREOPEN; con->connect_seq++; WARN_ON(con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); } } Loading Loading @@ -3118,7 +3111,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); Loading Loading @@ -3214,10 +3207,10 @@ void ceph_con_keepalive(struct ceph_connection *con) dout("con_keepalive %p\n", con); mutex_lock(&con->mutex); clear_standby(con); con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); mutex_unlock(&con->mutex); if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); Loading Loading @@ -3423,7 +3416,7 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) * On error (ENOMEM, EAGAIN, ...), * - con->in_msg == NULL */ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip) { int middle_len = le32_to_cpu(hdr->middle_len); Loading Loading @@ -3470,7 +3463,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, return ret; } static void ceph_con_get_out_msg(struct ceph_connection *con) void ceph_con_get_out_msg(struct ceph_connection *con) { struct ceph_msg *msg; Loading