Commit 5e6ef4f1 authored by David Howells's avatar David Howells
Browse files

rxrpc: Make the I/O thread take over the call and local processor work



Move the functions from the call->processor and local->processor work items
into the domain of the I/O thread.

The call event processor, now called from the I/O thread, then takes over
the job of cranking the call state machine, processing incoming packets and
transmitting DATA, ACK and ABORT packets.  In a future patch,
rxrpc_send_ACK() will transmit the ACK on the spot rather than queuing it
for later transmission.

The call event processor becomes purely received-skb driven.  It only
transmits things in response to events.  We use "pokes" to queue a dummy
skb to make it do things like start/resume transmitting data.  Timer expiry
also results in pokes.

The connection event processor, becomes similar, though crypto events, such
as dealing with CHALLENGE and RESPONSE packets is offloaded to a work item
to avoid doing crypto in the I/O thread.

The local event processor is removed and VERSION response packets are
generated directly from the packet parser.  Similarly, ABORTs generated in
response to protocol errors will be transmitted immediately rather than
being pushed onto a queue for later transmission.

Changes:
========
ver #2)
 - Fix a couple of introduced lock context imbalances.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 393a2a20
Loading
Loading
Loading
Loading
+20 −22
Original line number Diff line number Diff line
@@ -26,7 +26,6 @@
#define rxrpc_skb_traces \
	EM(rxrpc_skb_eaten_by_unshare,		"ETN unshare  ") \
	EM(rxrpc_skb_eaten_by_unshare_nomem,	"ETN unshar-nm") \
	EM(rxrpc_skb_get_ack,			"GET ack      ") \
	EM(rxrpc_skb_get_conn_work,		"GET conn-work") \
	EM(rxrpc_skb_get_local_work,		"GET locl-work") \
	EM(rxrpc_skb_get_reject_work,		"GET rej-work ") \
@@ -36,7 +35,6 @@
	EM(rxrpc_skb_new_error_report,		"NEW error-rpt") \
	EM(rxrpc_skb_new_jumbo_subpacket,	"NEW jumbo-sub") \
	EM(rxrpc_skb_new_unshared,		"NEW unshared ") \
	EM(rxrpc_skb_put_ack,			"PUT ack      ") \
	EM(rxrpc_skb_put_conn_work,		"PUT conn-work") \
	EM(rxrpc_skb_put_error_report,		"PUT error-rep") \
	EM(rxrpc_skb_put_input,			"PUT input    ") \
@@ -45,7 +43,6 @@
	EM(rxrpc_skb_put_rotate,		"PUT rotate   ") \
	EM(rxrpc_skb_put_unknown,		"PUT unknown  ") \
	EM(rxrpc_skb_see_conn_work,		"SEE conn-work") \
	EM(rxrpc_skb_see_local_work,		"SEE locl-work") \
	EM(rxrpc_skb_see_recvmsg,		"SEE recvmsg  ") \
	EM(rxrpc_skb_see_reject,		"SEE reject   ") \
	EM(rxrpc_skb_see_rotate,		"SEE rotate   ") \
@@ -58,10 +55,7 @@
	EM(rxrpc_local_get_for_use,		"GET for-use ") \
	EM(rxrpc_local_get_peer,		"GET peer    ") \
	EM(rxrpc_local_get_prealloc_conn,	"GET conn-pre") \
	EM(rxrpc_local_get_queue,		"GET queue   ") \
	EM(rxrpc_local_new,			"NEW         ") \
	EM(rxrpc_local_processing,		"PROCESSING  ") \
	EM(rxrpc_local_put_already_queued,	"PUT alreadyq") \
	EM(rxrpc_local_put_bind,		"PUT bind    ") \
	EM(rxrpc_local_put_call,		"PUT call    ") \
	EM(rxrpc_local_put_for_use,		"PUT for-use ") \
@@ -69,8 +63,6 @@
	EM(rxrpc_local_put_peer,		"PUT peer    ") \
	EM(rxrpc_local_put_prealloc_conn,	"PUT conn-pre") \
	EM(rxrpc_local_put_release_sock,	"PUT rel-sock") \
	EM(rxrpc_local_put_queue,		"PUT queue   ") \
	EM(rxrpc_local_queued,			"QUEUED      ") \
	EM(rxrpc_local_see_tx_ack,		"SEE tx-ack  ") \
	EM(rxrpc_local_stop,			"STOP        ") \
	EM(rxrpc_local_stopped,			"STOPPED     ") \
@@ -78,11 +70,9 @@
	EM(rxrpc_local_unuse_conn_work,		"UNU conn-wrk") \
	EM(rxrpc_local_unuse_peer_keepalive,	"UNU peer-kpa") \
	EM(rxrpc_local_unuse_release_sock,	"UNU rel-sock") \
	EM(rxrpc_local_unuse_work,		"UNU work    ") \
	EM(rxrpc_local_use_conn_work,		"USE conn-wrk") \
	EM(rxrpc_local_use_lookup,		"USE lookup  ") \
	EM(rxrpc_local_use_peer_keepalive,	"USE peer-kpa") \
	E_(rxrpc_local_use_work,		"USE work    ")
	E_(rxrpc_local_use_peer_keepalive,	"USE peer-kpa")

#define rxrpc_peer_traces \
	EM(rxrpc_peer_free,			"FREE        ") \
@@ -90,6 +80,7 @@
	EM(rxrpc_peer_get_activate_call,	"GET act-call") \
	EM(rxrpc_peer_get_bundle,		"GET bundle  ") \
	EM(rxrpc_peer_get_client_conn,		"GET cln-conn") \
	EM(rxrpc_peer_get_input,		"GET input   ") \
	EM(rxrpc_peer_get_input_error,		"GET inpt-err") \
	EM(rxrpc_peer_get_keepalive,		"GET keepaliv") \
	EM(rxrpc_peer_get_lookup_client,	"GET look-cln") \
@@ -100,6 +91,7 @@
	EM(rxrpc_peer_put_call,			"PUT call    ") \
	EM(rxrpc_peer_put_conn,			"PUT conn    ") \
	EM(rxrpc_peer_put_discard_tmp,		"PUT disc-tmp") \
	EM(rxrpc_peer_put_input,		"PUT input   ") \
	EM(rxrpc_peer_put_input_error,		"PUT inpt-err") \
	E_(rxrpc_peer_put_keepalive,		"PUT keepaliv")

@@ -180,11 +172,6 @@
	EM(rxrpc_call_put_sendmsg,		"PUT sendmsg ") \
	EM(rxrpc_call_put_unnotify,		"PUT unnotify") \
	EM(rxrpc_call_put_userid_exists,	"PUT u-exists") \
	EM(rxrpc_call_queue_abort,		"QUE abort   ") \
	EM(rxrpc_call_queue_requeue,		"QUE requeue ") \
	EM(rxrpc_call_queue_resend,		"QUE resend  ") \
	EM(rxrpc_call_queue_timer,		"QUE timer   ") \
	EM(rxrpc_call_queue_tx_data,		"QUE tx-data ") \
	EM(rxrpc_call_see_accept,		"SEE accept  ") \
	EM(rxrpc_call_see_activate_client,	"SEE act-clnt") \
	EM(rxrpc_call_see_connect_failed,	"SEE con-fail") \
@@ -282,6 +269,7 @@
	EM(rxrpc_propose_ack_respond_to_ping,	"Rsp2Png") \
	EM(rxrpc_propose_ack_retry_tx,		"RetryTx") \
	EM(rxrpc_propose_ack_rotate_rx,		"RxAck  ") \
	EM(rxrpc_propose_ack_rx_idle,		"RxIdle ") \
	E_(rxrpc_propose_ack_terminal_ack,	"ClTerm ")

#define rxrpc_congest_modes \
@@ -1532,6 +1520,7 @@ TRACE_EVENT(rxrpc_connect_call,
		    __field(unsigned long,		user_call_ID	)
		    __field(u32,			cid		)
		    __field(u32,			call_id		)
		    __field_struct(struct sockaddr_rxrpc, srx		)
			     ),

	    TP_fast_assign(
@@ -1539,33 +1528,42 @@ TRACE_EVENT(rxrpc_connect_call,
		    __entry->user_call_ID = call->user_call_ID;
		    __entry->cid = call->cid;
		    __entry->call_id = call->call_id;
		    __entry->srx = call->dest_srx;
			   ),

	    TP_printk("c=%08x u=%p %08x:%08x",
	    TP_printk("c=%08x u=%p %08x:%08x dst=%pISp",
		      __entry->call,
		      (void *)__entry->user_call_ID,
		      __entry->cid,
		      __entry->call_id)
		      __entry->call_id,
		      &__entry->srx.transport)
	    );

TRACE_EVENT(rxrpc_resend,
	    TP_PROTO(struct rxrpc_call *call),
	    TP_PROTO(struct rxrpc_call *call, struct sk_buff *ack),

	    TP_ARGS(call),
	    TP_ARGS(call, ack),

	    TP_STRUCT__entry(
		    __field(unsigned int,		call		)
		    __field(rxrpc_seq_t,		seq		)
		    __field(rxrpc_seq_t,		transmitted	)
		    __field(rxrpc_serial_t,		ack_serial	)
			     ),

	    TP_fast_assign(
		    struct rxrpc_skb_priv *sp = ack ? rxrpc_skb(ack) : NULL;
		    __entry->call = call->debug_id;
		    __entry->seq = call->acks_hard_ack;
		    __entry->transmitted = call->tx_transmitted;
		    __entry->ack_serial = sp ? sp->hdr.serial : 0;
			   ),

	    TP_printk("c=%08x q=%x",
	    TP_printk("c=%08x r=%x q=%x tq=%x",
		      __entry->call,
		      __entry->seq)
		      __entry->ack_serial,
		      __entry->seq,
		      __entry->transmitted)
	    );

TRACE_EVENT(rxrpc_rx_icmp,
+19 −31
Original line number Diff line number Diff line
@@ -283,14 +283,11 @@ struct rxrpc_local {
	struct rxrpc_net	*rxnet;		/* The network ns in which this resides */
	struct hlist_node	link;
	struct socket		*socket;	/* my UDP socket */
	struct work_struct	processor;
	struct task_struct	*io_thread;
	struct list_head	ack_tx_queue;	/* List of ACKs that need sending */
	spinlock_t		ack_tx_lock;	/* ACK list lock */
	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
	struct sk_buff_head	rx_queue;	/* Received packets */
	struct list_head	call_attend_q;	/* Calls requiring immediate attention */
	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
@@ -524,23 +521,19 @@ enum rxrpc_call_flag {
	RXRPC_CALL_RETRANS_TIMEOUT,	/* Retransmission due to timeout occurred */
	RXRPC_CALL_BEGAN_RX_TIMER,	/* We began the expect_rx_by timer */
	RXRPC_CALL_RX_HEARD,		/* The peer responded at least once to this call */
	RXRPC_CALL_RX_UNDERRUN,		/* Got data underrun */
	RXRPC_CALL_DISCONNECTED,	/* The call has been disconnected */
	RXRPC_CALL_KERNEL,		/* The call was made by the kernel */
	RXRPC_CALL_UPGRADE,		/* Service upgrade was requested for the call */
	RXRPC_CALL_DELAY_ACK_PENDING,	/* DELAY ACK generation is pending */
	RXRPC_CALL_IDLE_ACK_PENDING,	/* IDLE ACK generation is pending */
	RXRPC_CALL_EXCLUSIVE,		/* The call uses a once-only connection */
	RXRPC_CALL_RX_IS_IDLE,		/* Reception is idle - send an ACK */
};

/*
 * Events that can be raised on a call.
 */
enum rxrpc_call_event {
	RXRPC_CALL_EV_ABORT,		/* need to generate abort */
	RXRPC_CALL_EV_RESEND,		/* Tx resend required */
	RXRPC_CALL_EV_EXPIRED,		/* Expiry occurred */
	RXRPC_CALL_EV_ACK_LOST,		/* ACK may be lost, send ping */
	RXRPC_CALL_EV_INITIAL_PING,	/* Send initial ping for a new service call */
};

/*
@@ -611,7 +604,6 @@ struct rxrpc_call {
	u32			next_rx_timo;	/* Timeout for next Rx packet (jif) */
	u32			next_req_timo;	/* Timeout for next Rx request packet (jif) */
	struct timer_list	timer;		/* Combined event timer */
	struct work_struct	processor;	/* Event processor */
	struct work_struct	destroyer;	/* In-process-context destroyer */
	rxrpc_notify_rx_t	notify_rx;	/* kernel service Rx notification function */
	struct list_head	link;		/* link in master call list */
@@ -705,11 +697,7 @@ struct rxrpc_call {
	rxrpc_seq_t		acks_prev_seq;	/* Highest previousPacket received */
	rxrpc_seq_t		acks_hard_ack;	/* Latest hard-ack point */
	rxrpc_seq_t		acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */
	rxrpc_seq_t		acks_lost_top;	/* tx_top at the time lost-ack ping sent */
	rxrpc_serial_t		acks_lost_ping;	/* Serial number of probe ACK */
	rxrpc_serial_t		acks_highest_serial; /* Highest serial number ACK'd */
	struct sk_buff		*acks_soft_tbl;	/* The last ACK packet with NAKs in it */
	spinlock_t		acks_ack_lock;	/* Access to ->acks_last_ack */
};

/*
@@ -822,9 +810,8 @@ extern struct workqueue_struct *rxrpc_workqueue;
 */
int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
void rxrpc_discard_prealloc(struct rxrpc_sock *);
struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *,
					   struct rxrpc_sock *,
					   struct sockaddr_rxrpc *,
bool rxrpc_new_incoming_call(struct rxrpc_local *, struct rxrpc_peer *,
			     struct rxrpc_connection *, struct sockaddr_rxrpc *,
			     struct sk_buff *);
void rxrpc_accept_incoming_calls(struct rxrpc_local *);
int rxrpc_user_charge_accept(struct rxrpc_sock *, unsigned long);
@@ -838,13 +825,15 @@ void rxrpc_send_ACK(struct rxrpc_call *, u8, rxrpc_serial_t, enum rxrpc_propose_
void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
			     enum rxrpc_propose_ack_trace);
void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
void rxrpc_process_call(struct work_struct *);
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);

void rxrpc_reduce_call_timer(struct rxrpc_call *call,
			     unsigned long expire_at,
			     unsigned long now,
			     enum rxrpc_timer_trace why);

void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb);

/*
 * call_object.c
 */
@@ -864,9 +853,8 @@ void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
			 struct sk_buff *);
void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
void rxrpc_queue_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_see_call(struct rxrpc_call *, enum rxrpc_call_trace);
bool rxrpc_try_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
struct rxrpc_call *rxrpc_try_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace);
void rxrpc_cleanup_call(struct rxrpc_call *);
@@ -908,6 +896,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *);
 */
void rxrpc_process_connection(struct work_struct *);
void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);

/*
 * conn_object.c
@@ -916,10 +905,9 @@ extern unsigned int rxrpc_connection_expiry;
extern unsigned int rxrpc_closed_conn_expiry;

struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *,
							  struct sockaddr_rxrpc *,
						   struct sk_buff *,
						   struct rxrpc_peer **);
							  struct sk_buff *);
void __rxrpc_disconnect_call(struct rxrpc_connection *, struct rxrpc_call *);
void rxrpc_disconnect_call(struct rxrpc_call *);
void rxrpc_kill_client_conn(struct rxrpc_connection *);
@@ -962,8 +950,8 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
/*
 * input.c
 */
void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
void rxrpc_input_implicit_end_call(struct rxrpc_connection *, struct rxrpc_call *);
void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
void rxrpc_implicit_end_call(struct rxrpc_call *, struct sk_buff *);

/*
 * io_thread.c
@@ -993,7 +981,9 @@ int rxrpc_get_server_data_key(struct rxrpc_connection *, const void *, time64_t,
/*
 * local_event.c
 */
extern void rxrpc_process_local_events(struct rxrpc_local *);
void rxrpc_send_version_request(struct rxrpc_local *local,
				struct rxrpc_host_header *hdr,
				struct sk_buff *skb);

/*
 * local_object.c
@@ -1004,7 +994,6 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *, enum rxrpc_local
void rxrpc_put_local(struct rxrpc_local *, enum rxrpc_local_trace);
struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *, enum rxrpc_local_trace);
void rxrpc_unuse_local(struct rxrpc_local *, enum rxrpc_local_trace);
void rxrpc_queue_local(struct rxrpc_local *);
void rxrpc_destroy_local(struct rxrpc_local *local);
void rxrpc_destroy_all_locals(struct rxrpc_net *);

@@ -1068,7 +1057,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
void rxrpc_transmit_ack_packets(struct rxrpc_local *);
int rxrpc_send_abort_packet(struct rxrpc_call *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
void rxrpc_reject_packets(struct rxrpc_local *);
void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
void rxrpc_send_keepalive(struct rxrpc_peer *);
void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb);

@@ -1178,7 +1167,6 @@ int rxrpc_server_keyring(struct rxrpc_sock *, sockptr_t, int);
 * skbuff.c
 */
void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
void rxrpc_packet_destructor(struct sk_buff *);
void rxrpc_new_skb(struct sk_buff *, enum rxrpc_skb_trace);
void rxrpc_see_skb(struct sk_buff *, enum rxrpc_skb_trace);
void rxrpc_eaten_skb(struct sk_buff *, enum rxrpc_skb_trace);
+57 −69
Original line number Diff line number Diff line
@@ -100,6 +100,7 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
		return -ENOMEM;
	call->flags |= (1 << RXRPC_CALL_IS_SERVICE);
	call->state = RXRPC_CALL_SERVER_PREALLOC;
	__set_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events);

	trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
			 user_call_ID, rxrpc_call_new_prealloc_service);
@@ -234,21 +235,6 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
	kfree(b);
}

/*
 * Ping the other end to fill our RTT cache and to retrieve the rwind
 * and MTU parameters.
 */
static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb)
{
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
	ktime_t now = skb->tstamp;

	if (call->peer->rtt_count < 3 ||
	    ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now))
		rxrpc_send_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
			       rxrpc_propose_ack_ping_for_params);
}

/*
 * Allocate a new incoming call from the prealloc pool, along with a connection
 * and a peer as necessary.
@@ -330,33 +316,56 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
}

/*
 * Set up a new incoming call.  Called in BH context with the RCU read lock
 * held.
 * Set up a new incoming call.  Called from the I/O thread.
 *
 * If this is for a kernel service, when we allocate the call, it will have
 * three refs on it: (1) the kernel service, (2) the user_call_ID tree, (3) the
 * retainer ref obtained from the backlog buffer.  Prealloc calls for userspace
 * services only have the ref from the backlog buffer.  We pass this ref to the
 * caller.
 * services only have the ref from the backlog buffer.
 *
 * If we want to report an error, we mark the skb with the packet type and
 * abort code and return NULL.
 *
 * The call is returned with the user access mutex held and a ref on it.
 * abort code and return false.
 */
struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
					   struct rxrpc_sock *rx,
bool rxrpc_new_incoming_call(struct rxrpc_local *local,
			     struct rxrpc_peer *peer,
			     struct rxrpc_connection *conn,
			     struct sockaddr_rxrpc *peer_srx,
			     struct sk_buff *skb)
{
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
	const struct rxrpc_security *sec = NULL;
	struct rxrpc_connection *conn;
	struct rxrpc_peer *peer = NULL;
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
	struct rxrpc_call *call = NULL;
	struct rxrpc_sock *rx;

	_enter("");

	/* Don't set up a call for anything other than the first DATA packet. */
	if (sp->hdr.seq != 1 ||
	    sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
		return true; /* Just discard */

	rcu_read_lock();

	/* Weed out packets to services we're not offering.  Packets that would
	 * begin a call are explicitly rejected and the rest are just
	 * discarded.
	 */
	rx = rcu_dereference(local->service);
	if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
		    sp->hdr.serviceId != rx->second_service)
	    ) {
		if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
		    sp->hdr.seq == 1)
			goto unsupported_service;
		goto discard;
	}

	if (!conn) {
		sec = rxrpc_get_incoming_security(rx, skb);
		if (!sec)
			goto reject;
	}

	spin_lock(&rx->incoming_lock);
	if (rx->sk.sk_state == RXRPC_SERVER_LISTEN_DISABLED ||
	    rx->sk.sk_state == RXRPC_CLOSE) {
@@ -367,19 +376,6 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
		goto no_call;
	}

	/* The peer, connection and call may all have sprung into existence due
	 * to a duplicate packet being handled on another CPU in parallel, so
	 * we have to recheck the routing.  However, we're now holding
	 * rx->incoming_lock, so the values should remain stable.
	 */
	conn = rxrpc_find_connection_rcu(local, peer_srx, skb, &peer);

	if (!conn) {
		sec = rxrpc_get_incoming_security(rx, skb);
		if (!sec)
			goto no_call;
	}

	call = rxrpc_alloc_incoming_call(rx, local, peer, conn, sec, peer_srx,
					 skb);
	if (!call) {
@@ -398,35 +394,15 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
		rx->notify_new_call(&rx->sk, call, call->user_call_ID);

	spin_lock(&conn->state_lock);
	switch (conn->state) {
	case RXRPC_CONN_SERVICE_UNSECURED:
	if (conn->state == RXRPC_CONN_SERVICE_UNSECURED) {
		conn->state = RXRPC_CONN_SERVICE_CHALLENGING;
		set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events);
		rxrpc_queue_conn(call->conn, rxrpc_conn_queue_challenge);
		break;

	case RXRPC_CONN_SERVICE:
		write_lock(&call->state_lock);
		if (call->state < RXRPC_CALL_COMPLETE)
			call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
		write_unlock(&call->state_lock);
		break;

	case RXRPC_CONN_REMOTELY_ABORTED:
		rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
					  conn->abort_code, conn->error);
		break;
	case RXRPC_CONN_LOCALLY_ABORTED:
		rxrpc_abort_call("CON", call, sp->hdr.seq,
				 conn->abort_code, conn->error);
		break;
	default:
		BUG();
	}
	spin_unlock(&conn->state_lock);
	spin_unlock(&rx->incoming_lock);

	rxrpc_send_ping(call, skb);
	spin_unlock(&rx->incoming_lock);
	rcu_read_unlock();

	if (hlist_unhashed(&call->error_link)) {
		spin_lock(&call->peer->lock);
@@ -435,12 +411,24 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
	}

	_leave(" = %p{%d}", call, call->debug_id);
	return call;
	rxrpc_input_call_event(call, skb);
	rxrpc_put_call(call, rxrpc_call_put_input);
	return true;

unsupported_service:
	trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
			  RX_INVALID_OPERATION, EOPNOTSUPP);
	skb->priority = RX_INVALID_OPERATION;
	goto reject;
no_call:
	spin_unlock(&rx->incoming_lock);
	_leave(" = NULL [%u]", skb->mark);
	return NULL;
reject:
	rcu_read_unlock();
	_leave(" = f [%u]", skb->mark);
	return false;
discard:
	rcu_read_unlock();
	return true;
}

/*
+70 −101

File changed.

Preview size limit exceeded, changes collapsed.

+34 −22
Original line number Diff line number Diff line
@@ -71,7 +71,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t)

	if (call->state < RXRPC_CALL_COMPLETE) {
		trace_rxrpc_timer_expired(call, jiffies);
		rxrpc_queue_call(call, rxrpc_call_queue_timer);
		rxrpc_poke_call(call, rxrpc_call_poke_timer);
	}
}

@@ -148,7 +148,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
				  &rxrpc_call_user_mutex_lock_class_key);

	timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
	INIT_WORK(&call->processor, rxrpc_process_call);
	INIT_WORK(&call->destroyer, rxrpc_destroy_call);
	INIT_LIST_HEAD(&call->link);
	INIT_LIST_HEAD(&call->chan_wait_link);
@@ -163,7 +162,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
	init_waitqueue_head(&call->waitq);
	spin_lock_init(&call->notify_lock);
	spin_lock_init(&call->tx_lock);
	spin_lock_init(&call->acks_ack_lock);
	rwlock_init(&call->state_lock);
	refcount_set(&call->ref, 1);
	call->debug_id = debug_id;
@@ -252,6 +250,7 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
	call->ack_lost_at = j;
	call->resend_at = j;
	call->ping_at = j;
	call->keepalive_at = j;
	call->expect_rx_by = j;
	call->expect_req_by = j;
	call->expect_term_by = j;
@@ -430,6 +429,29 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
	call->state		= RXRPC_CALL_SERVER_SECURING;
	call->cong_tstamp	= skb->tstamp;

	spin_lock(&conn->state_lock);

	switch (conn->state) {
	case RXRPC_CONN_SERVICE_UNSECURED:
	case RXRPC_CONN_SERVICE_CHALLENGING:
		call->state = RXRPC_CALL_SERVER_SECURING;
		break;
	case RXRPC_CONN_SERVICE:
		call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
		break;

	case RXRPC_CONN_REMOTELY_ABORTED:
		__rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
					    conn->abort_code, conn->error);
		break;
	case RXRPC_CONN_LOCALLY_ABORTED:
		__rxrpc_abort_call("CON", call, 1,
				   conn->abort_code, conn->error);
		break;
	default:
		BUG();
	}

	/* Set the channel for this call.  We don't get channel_lock as we're
	 * only defending against the data_ready handler (which we're called
	 * from) and the RESPONSE packet parser (which is only really
@@ -440,6 +462,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
	conn->channels[chan].call_counter = call->call_id;
	conn->channels[chan].call_id = call->call_id;
	rcu_assign_pointer(conn->channels[chan].call, call);
	spin_unlock(&conn->state_lock);

	spin_lock(&conn->peer->lock);
	hlist_add_head(&call->error_link, &conn->peer->error_targets);
@@ -449,15 +472,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
	_leave("");
}

/*
 * Queue a call's work processor.
 */
void rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
{
	if (rxrpc_queue_work(&call->processor))
		trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, why);
}

/*
 * Note the re-emergence of a call.
 */
@@ -470,14 +484,15 @@ void rxrpc_see_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
	}
}

bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
struct rxrpc_call *rxrpc_try_get_call(struct rxrpc_call *call,
				      enum rxrpc_call_trace why)
{
	int r;

	if (!__refcount_inc_not_zero(&call->ref, &r))
		return false;
	if (!call || !__refcount_inc_not_zero(&call->ref, &r))
		return NULL;
	trace_rxrpc_call(call->debug_id, r + 1, 0, why);
	return true;
	return call;
}

/*
@@ -637,8 +652,6 @@ static void rxrpc_destroy_call(struct work_struct *work)
	struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
	struct rxrpc_txbuf *txb;

	del_timer_sync(&call->timer);
	cancel_work_sync(&call->processor); /* The processor may restart the timer */
	del_timer_sync(&call->timer);

	rxrpc_cleanup_ring(call);
@@ -652,8 +665,8 @@ static void rxrpc_destroy_call(struct work_struct *work)
		list_del(&txb->call_link);
		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
	}

	rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
	rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
	rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
	rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
	rxrpc_put_local(call->local, rxrpc_local_put_call);
@@ -670,10 +683,9 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));

	del_timer_sync(&call->timer);
	cancel_work(&call->processor);
	del_timer(&call->timer);

	if (rcu_read_lock_held() || work_busy(&call->processor))
	if (rcu_read_lock_held())
		/* Can't use the rxrpc workqueue as we need to cancel/flush
		 * something that may be running/waiting there.
		 */
Loading