Commit 02471928 authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: handle discarding acked and requeued messages separately



Make it easier to follow and remove dependency on msgr1 specific
CEPH_MSGR_TAG_SEQ.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 5cd8da3a
Loading
Loading
Loading
Loading
+54 −20
Original line number Diff line number Diff line
@@ -760,6 +760,56 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
	return ret;
}

/*
 * Discard messages that have been acked by the server.
 */
static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
{
	struct ceph_msg *msg;
	u64 seq;

	dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
	while (!list_empty(&con->out_sent)) {
		msg = list_first_entry(&con->out_sent, struct ceph_msg,
				       list_head);
		WARN_ON(msg->needs_out_seq);
		seq = le64_to_cpu(msg->hdr.seq);
		if (seq > ack_seq)
			break;

		dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
		     msg, seq);
		ceph_msg_remove(msg);
	}
}

/*
 * Discard messages that have been requeued in con_fault(), up to
 * reconnect_seq.  This avoids gratuitously resending messages that
 * the server had received and handled prior to reconnect.
 */
static void ceph_con_discard_requeued(struct ceph_connection *con,
				      u64 reconnect_seq)
{
	struct ceph_msg *msg;
	u64 seq;

	dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
	while (!list_empty(&con->out_queue)) {
		msg = list_first_entry(&con->out_queue, struct ceph_msg,
				       list_head);
		if (msg->needs_out_seq)
			break;
		seq = le64_to_cpu(msg->hdr.seq);
		if (seq > reconnect_seq)
			break;

		dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
		     msg, seq);
		ceph_msg_remove(msg);
	}
}

static void con_out_kvec_reset(struct ceph_connection *con)
{
	BUG_ON(con->out_skip);
@@ -2259,28 +2309,12 @@ static int read_partial_ack(struct ceph_connection *con)
 */
static void process_ack(struct ceph_connection *con)
{
	struct ceph_msg *m;
	u64 ack = le64_to_cpu(con->in_temp_ack);
	u64 seq;
	bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
	struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;

	/*
	 * In the reconnect case, con_fault() has requeued messages
	 * in out_sent. We should cleanup old messages according to
	 * the reconnect seq.
	 */
	while (!list_empty(list)) {
		m = list_first_entry(list, struct ceph_msg, list_head);
		if (reconnect && m->needs_out_seq)
			break;
		seq = le64_to_cpu(m->hdr.seq);
		if (seq > ack)
			break;
		dout("got ack for seq %llu type %d at %p\n", seq,
		     le16_to_cpu(m->hdr.type), m);
		ceph_msg_remove(m);
	}
	if (con->in_tag == CEPH_MSGR_TAG_ACK)
		ceph_con_discard_sent(con, ack);
	else
		ceph_con_discard_requeued(con, ack);

	prepare_read_tag(con);
}