Commit 5d7edbc9 authored by David Howells's avatar David Howells
Browse files

rxrpc: Get rid of the Rx ring



Get rid of the Rx ring and replace it with a pair of queues instead.  One
queue gets the packets that are in-sequence and are ready for processing by
recvmsg(); the other queue gets the out-of-sequence packets for addition to
the first queue as the holes get filled.

The annotation ring is removed and replaced with a SACK table.  The SACK
table has the bits set that correspond exactly to the sequence number of
the packet being acked.  The SACK ring is copied when an ACK packet is
being assembled and rotated so that the first ACK is in byte 0.

Flow control handling is altered so that packets that are moved to the
in-sequence queue are hard-ACK'd even before they're consumed - and then
the Rx window size in the ACK packet (rsize) is shrunk down to compensate
(even going to 0 if the window is full).

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent d4d02d8b
Loading
Loading
Loading
Loading
+11 −8
Original line number Diff line number Diff line
@@ -104,7 +104,12 @@
	EM(rxrpc_receive_incoming,		"INC") \
	EM(rxrpc_receive_queue,			"QUE") \
	EM(rxrpc_receive_queue_last,		"QLS") \
	E_(rxrpc_receive_rotate,		"ROT")
	EM(rxrpc_receive_queue_oos,		"QUO") \
	EM(rxrpc_receive_queue_oos_last,	"QOL") \
	EM(rxrpc_receive_oos,			"OOS") \
	EM(rxrpc_receive_oos_last,		"OSL") \
	EM(rxrpc_receive_rotate,		"ROT") \
	E_(rxrpc_receive_rotate_last,		"RLS")

#define rxrpc_recvmsg_traces \
	EM(rxrpc_recvmsg_cont,			"CONT") \
@@ -860,8 +865,7 @@ TRACE_EVENT(rxrpc_receive,
		    __field(enum rxrpc_receive_trace,	why		)
		    __field(rxrpc_serial_t,		serial		)
		    __field(rxrpc_seq_t,		seq		)
		    __field(rxrpc_seq_t,		hard_ack	)
		    __field(rxrpc_seq_t,		top		)
		    __field(u64,			window		)
			     ),

	    TP_fast_assign(
@@ -869,8 +873,7 @@ TRACE_EVENT(rxrpc_receive,
		    __entry->why = why;
		    __entry->serial = serial;
		    __entry->seq = seq;
		    __entry->hard_ack = call->rx_hard_ack;
		    __entry->top = call->rx_top;
		    __entry->window = atomic64_read(&call->ackr_window);
			   ),

	    TP_printk("c=%08x %s r=%08x q=%08x w=%08x-%08x",
@@ -878,8 +881,8 @@ TRACE_EVENT(rxrpc_receive,
		      __print_symbolic(__entry->why, rxrpc_receive_traces),
		      __entry->serial,
		      __entry->seq,
		      __entry->hard_ack,
		      __entry->top)
		      lower_32_bits(__entry->window),
		      upper_32_bits(__entry->window))
	    );

TRACE_EVENT(rxrpc_recvmsg,
@@ -1459,7 +1462,7 @@ TRACE_EVENT(rxrpc_call_reset,
		    __entry->call_serial = call->rx_serial;
		    __entry->conn_serial = call->conn->hi_serial;
		    __entry->tx_seq = call->tx_hard_ack;
		    __entry->rx_seq = call->rx_hard_ack;
		    __entry->rx_seq = call->rx_highest_seq;
			   ),

	    TP_printk("c=%08x %08x:%08x r=%08x/%08x tx=%08x rx=%08x",
+18 −11
Original line number Diff line number Diff line
@@ -198,7 +198,6 @@ struct rxrpc_skb_priv {
	u16		remain;
	u16		offset;		/* Offset of data */
	u16		len;		/* Length of data */
	u8		rx_flags;	/* Received packet flags */
	u8		flags;
#define RXRPC_RX_VERIFIED	0x01

@@ -644,8 +643,20 @@ struct rxrpc_call {
	rxrpc_seq_t		tx_hard_ack;	/* Dead slot in buffer; the first transmitted but
						 * not hard-ACK'd packet follows this.
						 */

	/* Transmitted data tracking. */
	rxrpc_seq_t		tx_top;		/* Highest Tx slot allocated. */
	u16			tx_backoff;	/* Delay to insert due to Tx failure */
	u8			tx_winsize;	/* Maximum size of Tx window */

	/* Received data tracking */
	struct sk_buff_head	recvmsg_queue;	/* Queue of packets ready for recvmsg() */
	struct sk_buff_head	rx_oos_queue;	/* Queue of out of sequence packets */

	rxrpc_seq_t		rx_highest_seq;	/* Higest sequence number received */
	rxrpc_seq_t		rx_consumed;	/* Highest packet consumed */
	rxrpc_serial_t		rx_serial;	/* Highest serial received for this call */
	u8			rx_winsize;	/* Size of Rx window */

	/* TCP-style slow-start congestion control [RFC5681].  Since the SMSS
	 * is fixed, we keep these numbers in terms of segments (ie. DATA
@@ -660,23 +671,19 @@ struct rxrpc_call {
	u8			cong_cumul_acks; /* Cumulative ACK count */
	ktime_t			cong_tstamp;	/* Last time cwnd was changed */

	rxrpc_seq_t		rx_hard_ack;	/* Dead slot in buffer; the first received but not
						 * consumed packet follows this.
						 */
	rxrpc_seq_t		rx_top;		/* Highest Rx slot allocated. */
	rxrpc_seq_t		rx_expect_next;	/* Expected next packet sequence number */
	rxrpc_serial_t		rx_serial;	/* Highest serial received for this call */
	u8			rx_winsize;	/* Size of Rx window */
	u8			tx_winsize;	/* Maximum size of Tx window */

	spinlock_t		input_lock;	/* Lock for packet input to this call */

	/* Receive-phase ACK management (ACKs we send). */
	u8			ackr_reason;	/* reason to ACK */
	rxrpc_serial_t		ackr_serial;	/* serial of packet being ACK'd */
	rxrpc_seq_t		ackr_highest_seq; /* Higest sequence number received */
	atomic64_t		ackr_window;	/* Base (in LSW) and top (in MSW) of SACK window */
	atomic_t		ackr_nr_unacked; /* Number of unacked packets */
	atomic_t		ackr_nr_consumed; /* Number of packets needing hard ACK */
	struct {
#define RXRPC_SACK_SIZE 256
		 /* SACK table for soft-acked packets */
		u8		ackr_sack_table[RXRPC_SACK_SIZE];
	} __aligned(8);

	/* RTT management */
	rxrpc_serial_t		rtt_serial[4];	/* Serial number of DATA or PING sent */
+5 −2
Original line number Diff line number Diff line
@@ -155,6 +155,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
	INIT_LIST_HEAD(&call->accept_link);
	INIT_LIST_HEAD(&call->recvmsg_link);
	INIT_LIST_HEAD(&call->sock_link);
	skb_queue_head_init(&call->recvmsg_queue);
	skb_queue_head_init(&call->rx_oos_queue);
	init_waitqueue_head(&call->waitq);
	spin_lock_init(&call->lock);
	spin_lock_init(&call->notify_lock);
@@ -165,13 +167,12 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
	call->tx_total_len = -1;
	call->next_rx_timo = 20 * HZ;
	call->next_req_timo = 1 * HZ;
	atomic64_set(&call->ackr_window, 0x100000001ULL);

	memset(&call->sock_node, 0xed, sizeof(call->sock_node));

	/* Leave space in the ring to handle a maxed-out jumbo packet */
	call->rx_winsize = rxrpc_rx_window_size;
	call->tx_winsize = 16;
	call->rx_expect_next = 1;

	call->cong_cwnd = 2;
	call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
@@ -519,6 +520,8 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
		rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
		call->rxtx_buffer[i] = NULL;
	}
	skb_queue_purge(&call->recvmsg_queue);
	skb_queue_purge(&call->rx_oos_queue);
}

/*
+1 −1
Original line number Diff line number Diff line
@@ -175,7 +175,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
		trace_rxrpc_disconnect_call(call);
		switch (call->completion) {
		case RXRPC_CALL_SUCCEEDED:
			chan->last_seq = call->rx_hard_ack;
			chan->last_seq = call->rx_highest_seq;
			chan->last_type = RXRPC_PACKET_TYPE_ACK;
			break;
		case RXRPC_CALL_LOCALLY_ABORTED:
+131 −82
Original line number Diff line number Diff line
@@ -312,18 +312,43 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
	return rxrpc_end_tx_phase(call, true, "ETD");
}

static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
					  rxrpc_seq_t window, rxrpc_seq_t wtop)
{
	atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
}

/*
 * Process a DATA packet, adding the packet to the Rx ring.  The caller's
 * packet ref must be passed on or discarded.
 * Push a DATA packet onto the Rx queue.
 */
static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
				   rxrpc_seq_t window, rxrpc_seq_t wtop,
				   enum rxrpc_receive_trace why)
{
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
	bool last = sp->hdr.flags & RXRPC_LAST_PACKET;

	__skb_queue_tail(&call->recvmsg_queue, skb);
	rxrpc_input_update_ack_window(call, window, wtop);

	trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
}

/*
 * Process a DATA packet.
 */
static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
{
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
	struct sk_buff *oos;
	rxrpc_serial_t serial = sp->hdr.serial;
	rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
	unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
	u64 win = atomic64_read(&call->ackr_window);
	rxrpc_seq_t window = lower_32_bits(win);
	rxrpc_seq_t wtop = upper_32_bits(win);
	rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
	rxrpc_seq_t seq = sp->hdr.seq;
	bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
	bool acked = false;
	int ack_reason = -1;

	rxrpc_inc_stat(call->rxnet, stat_rx_data);
	if (sp->hdr.flags & RXRPC_REQUEST_ACK)
@@ -331,112 +356,135 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
	if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
		rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);

	hard_ack = READ_ONCE(call->rx_hard_ack);

	_proto("Rx DATA %%%u { #%x l=%u }", serial, seq, last);

	if (last) {
		if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
		    seq != call->rx_top) {
		if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
		    seq + 1 != wtop) {
			rxrpc_proto_abort("LSN", call, seq);
			goto out;
			goto err_free;
		}
	} else {
		if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
		    after_eq(seq, call->rx_top)) {
		    after_eq(seq, wtop)) {
			pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
				call->debug_id, seq, window, wtop, wlimit);
			rxrpc_proto_abort("LSA", call, seq);
			goto out;
			goto err_free;
		}
	}

	if (after(seq, call->rx_highest_seq))
		call->rx_highest_seq = seq;

	trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);

	if (before_eq(seq, hard_ack)) {
		rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
			       rxrpc_propose_ack_input_data);
		goto out;
	if (before(seq, window)) {
		ack_reason = RXRPC_ACK_DUPLICATE;
		goto send_ack;
	}
	if (after(seq, wlimit)) {
		ack_reason = RXRPC_ACK_EXCEEDS_WINDOW;
		goto send_ack;
	}

	if (call->rxtx_buffer[ix]) {
		rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
			       rxrpc_propose_ack_input_data);
		goto out;
	/* Queue the packet. */
	if (seq == window) {
		rxrpc_seq_t reset_from;
		bool reset_sack = false;

		if (sp->hdr.flags & RXRPC_REQUEST_ACK)
			ack_reason = RXRPC_ACK_REQUESTED;
		/* Send an immediate ACK if we fill in a hole */
		else if (!skb_queue_empty(&call->rx_oos_queue))
			ack_reason = RXRPC_ACK_DELAY;

		window++;
		if (after(window, wtop))
			wtop = window;

		spin_lock(&call->recvmsg_queue.lock);
		rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
		skb = NULL;

		while ((oos = skb_peek(&call->rx_oos_queue))) {
			struct rxrpc_skb_priv *osp = rxrpc_skb(oos);

			if (after(osp->hdr.seq, window))
				break;

			__skb_unlink(oos, &call->rx_oos_queue);
			last = osp->hdr.flags & RXRPC_LAST_PACKET;
			seq = osp->hdr.seq;
			if (!reset_sack) {
				reset_from = seq;
				reset_sack = true;
			}

	if (after(seq, hard_ack + call->rx_winsize)) {
		rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
			       rxrpc_propose_ack_input_data);
		goto out;
			window++;
			rxrpc_input_queue_data(call, oos, window, wtop,
						 rxrpc_receive_queue_oos);
		}

	if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
		rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
			       rxrpc_propose_ack_input_data);
		acked = true;
		spin_unlock(&call->recvmsg_queue.lock);

		if (reset_sack) {
			do {
				call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
			} while (reset_from++, before(reset_from, window));
		}
	} else {
		bool keep = false;

	if (after(seq, call->ackr_highest_seq))
		call->ackr_highest_seq = seq;
		ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;

	/* Queue the packet.  We use a couple of memory barriers here as need
	 * to make sure that rx_top is perceived to be set after the buffer
	 * pointer and that the buffer pointer is set after the annotation and
	 * the skb data.
	 *
	 * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
	 * and also rxrpc_fill_out_ack().
	 */
	call->rxtx_annotations[ix] = 1;
	smp_wmb();
	call->rxtx_buffer[ix] = skb;
	if (after(seq, call->rx_top)) {
		smp_store_release(&call->rx_top, seq);
	} else if (before(seq, call->rx_top)) {
		/* Send an immediate ACK if we fill in a hole */
		if (!acked) {
			rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
				       rxrpc_propose_ack_input_data_hole);
			acked = true;
		if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
			call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
			keep = 1;
		}

		if (after(seq + 1, wtop)) {
			wtop = seq + 1;
			rxrpc_input_update_ack_window(call, window, wtop);
		}

		if (!keep) {
			ack_reason = RXRPC_ACK_DUPLICATE;
			goto send_ack;
		}

	/* From this point on, we're not allowed to touch the packet any longer
	 * as its ref now belongs to the Rx ring.
	 */
	skb = NULL;
	sp = NULL;
		skb_queue_walk(&call->rx_oos_queue, oos) {
			struct rxrpc_skb_priv *osp = rxrpc_skb(oos);

	if (last) {
		set_bit(RXRPC_CALL_RX_LAST, &call->flags);
		trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
	} else {
		trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
			if (after(osp->hdr.seq, seq)) {
				__skb_queue_before(&call->rx_oos_queue, oos, skb);
				goto oos_queued;
			}
		}

	if (after_eq(seq, call->rx_expect_next)) {
		if (after(seq, call->rx_expect_next)) {
			_net("OOS %u > %u", seq, call->rx_expect_next);
			rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
				       rxrpc_propose_ack_input_data);
			acked = true;
		__skb_queue_tail(&call->rx_oos_queue, skb);
	oos_queued:
		trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
				    sp->hdr.serial, sp->hdr.seq);
		skb = NULL;
	}
		call->rx_expect_next = seq + 1;

send_ack:
	if (ack_reason < 0 &&
	    atomic_inc_return(&call->ackr_nr_unacked) > 2 &&
	    test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
		ack_reason = RXRPC_ACK_IDLE;
	} else if (ack_reason >= 0) {
		set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
	}

out:
	if (!acked &&
	    atomic_inc_return(&call->ackr_nr_unacked) > 2)
		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
	if (ack_reason >= 0)
		rxrpc_send_ACK(call, ack_reason, serial,
			       rxrpc_propose_ack_input_data);
	else
		rxrpc_propose_delay_ACK(call, serial,
					rxrpc_propose_ack_input_data);

	trace_rxrpc_notify_socket(call->debug_id, serial);
	rxrpc_notify_socket(call);

err_free:
	rxrpc_free_skb(skb, rxrpc_skb_freed);
	_leave(" [queued]");
}

/*
@@ -498,8 +546,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
	rxrpc_serial_t serial = sp->hdr.serial;
	rxrpc_seq_t seq0 = sp->hdr.seq;

	_enter("{%u,%u},{%u,%u}",
	       call->rx_hard_ack, call->rx_top, skb->len, seq0);
	_enter("{%llx,%x},{%u,%x}",
	       atomic64_read(&call->ackr_window), call->rx_highest_seq,
	       skb->len, seq0);

	_proto("Rx DATA %%%u { #%u f=%02x }",
	       sp->hdr.serial, seq0, sp->hdr.flags);
Loading