Loading include/linux/ceph/messenger.h +12 −1 Original line number Diff line number Diff line Loading @@ -249,6 +249,17 @@ struct ceph_msg { #define CEPH_CON_S_OPEN 5 #define CEPH_CON_S_STANDBY 6 /* * ceph_connection flag bits */ #define CEPH_CON_F_LOSSYTX 0 /* we can close channel or drop messages on errors */ #define CEPH_CON_F_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ #define CEPH_CON_F_WRITE_PENDING 2 /* we have data ready to send */ #define CEPH_CON_F_SOCK_CLOSED 3 /* socket state changed to closed */ #define CEPH_CON_F_BACKOFF 4 /* need to retry queuing delayed work */ /* ceph connection fault delay defaults, for exponential backoff */ #define BASE_DELAY_INTERVAL (HZ / 4) #define MAX_DELAY_INTERVAL (15 * HZ) Loading @@ -273,7 +284,7 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_addr peer_addr_for_me; unsigned long flags; unsigned long flags; /* CEPH_CON_F_* */ const char *error_msg; /* error message, if any */ struct ceph_entity_name peer_name; /* peer name */ Loading net/ceph/messenger.c +34 −43 Original line number Diff line number Diff line Loading @@ -82,24 +82,14 @@ #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ /* * ceph_connection flag bits */ #define CON_FLAG_LOSSYTX 0 /* we can close channel or drop * messages on errors */ #define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ #define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ static bool con_flag_valid(unsigned long con_flag) { switch (con_flag) { case CON_FLAG_LOSSYTX: case CON_FLAG_KEEPALIVE_PENDING: case CON_FLAG_WRITE_PENDING: case CON_FLAG_SOCK_CLOSED: case CON_FLAG_BACKOFF: case CEPH_CON_F_LOSSYTX: case CEPH_CON_F_KEEPALIVE_PENDING: case CEPH_CON_F_WRITE_PENDING: case CEPH_CON_F_SOCK_CLOSED: case CEPH_CON_F_BACKOFF: return true; default: return false; Loading Loading @@ -380,7 +370,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { if (con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (sk_stream_is_writeable(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); Loading @@ -406,7 +396,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); con_flag_set(con, CON_FLAG_SOCK_CLOSED); con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: Loading Loading @@ -597,7 +587,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ con_flag_clear(con, CON_FLAG_SOCK_CLOSED); con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); con_sock_state_closed(con); return rc; Loading Loading @@ -666,10 +656,10 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); con->state = CEPH_CON_S_CLOSED; con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); con_flag_clear(con, CON_FLAG_WRITE_PENDING); con_flag_clear(con, CON_FLAG_BACKOFF); con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_reset_protocol(con); ceph_con_reset_session(con); Loading Loading @@ -1365,7 +1355,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1386,7 +1376,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1404,7 +1394,7 @@ static void prepare_write_seq(struct ceph_connection *con) con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1425,7 +1415,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) } else { con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); } con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading Loading @@ -1464,7 +1454,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static void __prepare_write_connect(struct ceph_connection *con) Loading @@ -1475,7 +1465,7 @@ static void __prepare_write_connect(struct ceph_connection *con) con->auth->authorizer_buf); con->out_more = 0; con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) Loading Loading @@ -2236,7 +2226,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) con_flag_set(con, CON_FLAG_LOSSYTX); con_flag_set(con, CEPH_CON_F_LOSSYTX); con->delay = 0; /* reset backoff memory */ Loading Loading @@ -2637,7 +2627,8 @@ static int try_write(struct ceph_connection *con) do_next: if (con->state == CEPH_CON_S_OPEN) { if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { if (con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; } Loading @@ -2653,7 +2644,7 @@ static int try_write(struct ceph_connection *con) } /* Nothing to do! */ con_flag_clear(con, CON_FLAG_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: Loading Loading @@ -2848,7 +2839,7 @@ static void cancel_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) if (!con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) return false; #define CASE(x) \ Loading @@ -2875,7 +2866,7 @@ static bool con_backoff(struct ceph_connection *con) { int ret; if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) if (!con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) return false; ret = queue_con_delay(con, con->delay); Loading @@ -2883,7 +2874,7 @@ static bool con_backoff(struct ceph_connection *con) dout("%s: con %p FAILED to back off %lu\n", __func__, con, con->delay); BUG_ON(ret == -ENOENT); con_flag_set(con, CON_FLAG_BACKOFF); con_flag_set(con, CEPH_CON_F_BACKOFF); } return true; Loading Loading @@ -2995,7 +2986,7 @@ static void con_fault(struct ceph_connection *con) ceph_con_reset_protocol(con); if (con_flag_test(con, CON_FLAG_LOSSYTX)) { if (con_flag_test(con, CEPH_CON_F_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CEPH_CON_S_CLOSED; return; Loading @@ -3007,9 +2998,9 @@ static void con_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { !con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); con_flag_clear(con, CON_FLAG_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con->state = CEPH_CON_S_STANDBY; } else { /* retry after a delay. */ Loading @@ -3021,7 +3012,7 @@ static void con_fault(struct ceph_connection *con) if (con->delay > MAX_DELAY_INTERVAL) con->delay = MAX_DELAY_INTERVAL; } con_flag_set(con, CON_FLAG_BACKOFF); con_flag_set(con, CEPH_CON_F_BACKOFF); queue_con(con); } } Loading Loading @@ -3084,8 +3075,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CEPH_CON_S_PREOPEN; con->connect_seq++; WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); WARN_ON(con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); } } Loading Loading @@ -3126,7 +3117,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); Loading Loading @@ -3222,10 +3213,10 @@ void ceph_con_keepalive(struct ceph_connection *con) dout("con_keepalive %p\n", con); mutex_lock(&con->mutex); clear_standby(con); con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING); con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); mutex_unlock(&con->mutex); if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); Loading Loading
include/linux/ceph/messenger.h +12 −1 Original line number Diff line number Diff line Loading @@ -249,6 +249,17 @@ struct ceph_msg { #define CEPH_CON_S_OPEN 5 #define CEPH_CON_S_STANDBY 6 /* * ceph_connection flag bits */ #define CEPH_CON_F_LOSSYTX 0 /* we can close channel or drop messages on errors */ #define CEPH_CON_F_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ #define CEPH_CON_F_WRITE_PENDING 2 /* we have data ready to send */ #define CEPH_CON_F_SOCK_CLOSED 3 /* socket state changed to closed */ #define CEPH_CON_F_BACKOFF 4 /* need to retry queuing delayed work */ /* ceph connection fault delay defaults, for exponential backoff */ #define BASE_DELAY_INTERVAL (HZ / 4) #define MAX_DELAY_INTERVAL (15 * HZ) Loading @@ -273,7 +284,7 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_addr peer_addr_for_me; unsigned long flags; unsigned long flags; /* CEPH_CON_F_* */ const char *error_msg; /* error message, if any */ struct ceph_entity_name peer_name; /* peer name */ Loading
net/ceph/messenger.c +34 −43 Original line number Diff line number Diff line Loading @@ -82,24 +82,14 @@ #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ /* * ceph_connection flag bits */ #define CON_FLAG_LOSSYTX 0 /* we can close channel or drop * messages on errors */ #define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ #define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ static bool con_flag_valid(unsigned long con_flag) { switch (con_flag) { case CON_FLAG_LOSSYTX: case CON_FLAG_KEEPALIVE_PENDING: case CON_FLAG_WRITE_PENDING: case CON_FLAG_SOCK_CLOSED: case CON_FLAG_BACKOFF: case CEPH_CON_F_LOSSYTX: case CEPH_CON_F_KEEPALIVE_PENDING: case CEPH_CON_F_WRITE_PENDING: case CEPH_CON_F_SOCK_CLOSED: case CEPH_CON_F_BACKOFF: return true; default: return false; Loading Loading @@ -380,7 +370,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { if (con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { if (sk_stream_is_writeable(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); Loading @@ -406,7 +396,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); con_flag_set(con, CON_FLAG_SOCK_CLOSED); con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: Loading Loading @@ -597,7 +587,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ con_flag_clear(con, CON_FLAG_SOCK_CLOSED); con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); con_sock_state_closed(con); return rc; Loading Loading @@ -666,10 +656,10 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); con->state = CEPH_CON_S_CLOSED; con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); con_flag_clear(con, CON_FLAG_WRITE_PENDING); con_flag_clear(con, CON_FLAG_BACKOFF); con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next connect */ con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_BACKOFF); ceph_con_reset_protocol(con); ceph_con_reset_session(con); Loading Loading @@ -1365,7 +1355,7 @@ static void prepare_write_message(struct ceph_connection *con) prepare_write_message_footer(con); } con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1386,7 +1376,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1404,7 +1394,7 @@ static void prepare_write_seq(struct ceph_connection *con) con_out_kvec_add(con, sizeof (con->out_temp_ack), &con->out_temp_ack); con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading @@ -1425,7 +1415,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) } else { con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); } con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } /* Loading Loading @@ -1464,7 +1454,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static void __prepare_write_connect(struct ceph_connection *con) Loading @@ -1475,7 +1465,7 @@ static void __prepare_write_connect(struct ceph_connection *con) con->auth->authorizer_buf); con->out_more = 0; con_flag_set(con, CON_FLAG_WRITE_PENDING); con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) Loading Loading @@ -2236,7 +2226,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) con_flag_set(con, CON_FLAG_LOSSYTX); con_flag_set(con, CEPH_CON_F_LOSSYTX); con->delay = 0; /* reset backoff memory */ Loading Loading @@ -2637,7 +2627,8 @@ static int try_write(struct ceph_connection *con) do_next: if (con->state == CEPH_CON_S_OPEN) { if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { if (con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; } Loading @@ -2653,7 +2644,7 @@ static int try_write(struct ceph_connection *con) } /* Nothing to do! */ con_flag_clear(con, CON_FLAG_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: Loading Loading @@ -2848,7 +2839,7 @@ static void cancel_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) if (!con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) return false; #define CASE(x) \ Loading @@ -2875,7 +2866,7 @@ static bool con_backoff(struct ceph_connection *con) { int ret; if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) if (!con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) return false; ret = queue_con_delay(con, con->delay); Loading @@ -2883,7 +2874,7 @@ static bool con_backoff(struct ceph_connection *con) dout("%s: con %p FAILED to back off %lu\n", __func__, con, con->delay); BUG_ON(ret == -ENOENT); con_flag_set(con, CON_FLAG_BACKOFF); con_flag_set(con, CEPH_CON_F_BACKOFF); } return true; Loading Loading @@ -2995,7 +2986,7 @@ static void con_fault(struct ceph_connection *con) ceph_con_reset_protocol(con); if (con_flag_test(con, CON_FLAG_LOSSYTX)) { if (con_flag_test(con, CEPH_CON_F_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CEPH_CON_S_CLOSED; return; Loading @@ -3007,9 +2998,9 @@ static void con_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { !con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); con_flag_clear(con, CON_FLAG_WRITE_PENDING); con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); con->state = CEPH_CON_S_STANDBY; } else { /* retry after a delay. */ Loading @@ -3021,7 +3012,7 @@ static void con_fault(struct ceph_connection *con) if (con->delay > MAX_DELAY_INTERVAL) con->delay = MAX_DELAY_INTERVAL; } con_flag_set(con, CON_FLAG_BACKOFF); con_flag_set(con, CEPH_CON_F_BACKOFF); queue_con(con); } } Loading Loading @@ -3084,8 +3075,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CEPH_CON_S_PREOPEN; con->connect_seq++; WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); WARN_ON(con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); } } Loading Loading @@ -3126,7 +3117,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); Loading Loading @@ -3222,10 +3213,10 @@ void ceph_con_keepalive(struct ceph_connection *con) dout("con_keepalive %p\n", con); mutex_lock(&con->mutex); clear_standby(con); con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING); con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); mutex_unlock(&con->mutex); if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) if (con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); Loading