Commit 96b4059f authored by David Howells's avatar David Howells
Browse files

rxrpc: Remove call->state_lock



All the setters of call->state are now in the I/O thread and thus the state
lock is now unnecessary.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 93368b6b
Loading
Loading
Loading
Loading
+22 −11
Original line number Diff line number Diff line
@@ -631,14 +631,13 @@ struct rxrpc_call {
	unsigned long		flags;
	unsigned long		events;
	spinlock_t		notify_lock;	/* Kernel notification lock */
	rwlock_t		state_lock;	/* lock for state transition */
	unsigned int		send_abort_why; /* Why the abort [enum rxrpc_abort_reason] */
	s32			send_abort;	/* Abort code to be sent */
	short			send_abort_err;	/* Error to be associated with the abort */
	rxrpc_seq_t		send_abort_seq;	/* DATA packet that incurred the abort (or 0) */
	s32			abort_code;	/* Local/remote abort code */
	int			error;		/* Local error incurred */
	enum rxrpc_call_state	state;		/* current state of call */
	enum rxrpc_call_state	_state;		/* Current state of call (needs barrier) */
	enum rxrpc_call_completion completion;	/* Call completion condition */
	refcount_t		ref;
	u8			security_ix;	/* Security type */
@@ -889,25 +888,37 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call)
/*
 * call_state.c
 */
bool __rxrpc_set_call_completion(struct rxrpc_call *call,
				 enum rxrpc_call_completion compl,
				 u32 abort_code,
				 int error);
bool rxrpc_set_call_completion(struct rxrpc_call *call,
			       enum rxrpc_call_completion compl,
			       u32 abort_code,
			       int error);
bool __rxrpc_call_completed(struct rxrpc_call *call);
bool rxrpc_call_completed(struct rxrpc_call *call);
bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
			u32 abort_code, int error, enum rxrpc_abort_reason why);
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
		      u32 abort_code, int error, enum rxrpc_abort_reason why);
void rxrpc_prefail_call(struct rxrpc_call *call, enum rxrpc_call_completion compl,
			int error);

static inline void rxrpc_set_call_state(struct rxrpc_call *call,
					enum rxrpc_call_state state)
{
	/* Order write of completion info before write of ->state. */
	smp_store_release(&call->_state, state);
}

static inline enum rxrpc_call_state __rxrpc_call_state(const struct rxrpc_call *call)
{
	return call->_state; /* Only inside I/O thread */
}

static inline bool __rxrpc_call_is_complete(const struct rxrpc_call *call)
{
	return __rxrpc_call_state(call) == RXRPC_CALL_COMPLETE;
}

static inline enum rxrpc_call_state rxrpc_call_state(const struct rxrpc_call *call)
{
	/* Order read ->state before read ->error. */
	return smp_load_acquire(&call->state);
	/* Order read ->state before read of completion info. */
	return smp_load_acquire(&call->_state);
}

static inline bool rxrpc_call_is_complete(const struct rxrpc_call *call)
+1 −1
Original line number Diff line number Diff line
@@ -99,7 +99,7 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
	if (!call)
		return -ENOMEM;
	call->flags |= (1 << RXRPC_CALL_IS_SERVICE);
	call->state = RXRPC_CALL_SERVER_PREALLOC;
	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_PREALLOC);
	__set_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events);

	trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
+17 −25
Original line number Diff line number Diff line
@@ -257,22 +257,15 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 */
static void rxrpc_begin_service_reply(struct rxrpc_call *call)
{
	unsigned long now;

	write_lock(&call->state_lock);
	unsigned long now = jiffies;

	if (call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
		now = jiffies;
		call->state = RXRPC_CALL_SERVER_SEND_REPLY;
	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY);
	WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
	if (call->ackr_reason == RXRPC_ACK_DELAY)
		call->ackr_reason = 0;
	trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
}

	write_unlock(&call->state_lock);
}

/*
 * Close the transmission phase.  After this point there is no more data to be
 * transmitted in the call.
@@ -281,18 +274,16 @@ static void rxrpc_close_tx_phase(struct rxrpc_call *call)
{
	_debug("________awaiting reply/ACK__________");

	write_lock(&call->state_lock);
	switch (call->state) {
	switch (__rxrpc_call_state(call)) {
	case RXRPC_CALL_CLIENT_SEND_REQUEST:
		call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
		rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY);
		break;
	case RXRPC_CALL_SERVER_SEND_REPLY:
		call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK);
		break;
	default:
		break;
	}
	write_unlock(&call->state_lock);
}

static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
@@ -341,7 +332,7 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)

static void rxrpc_transmit_some_data(struct rxrpc_call *call)
{
	switch (call->state) {
	switch (__rxrpc_call_state(call)) {
	case RXRPC_CALL_SERVER_ACK_REQUEST:
		if (list_empty(&call->tx_sendmsg))
			return;
@@ -390,9 +381,10 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)

	//printk("\n--------------------\n");
	_enter("{%d,%s,%lx}",
	       call->debug_id, rxrpc_call_states[call->state], call->events);
	       call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
	       call->events);

	if (call->state == RXRPC_CALL_COMPLETE)
	if (__rxrpc_call_is_complete(call))
		goto out;

	/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
@@ -415,7 +407,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
	}

	t = READ_ONCE(call->expect_req_by);
	if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
	if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST &&
	    time_after_eq(now, t)) {
		trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
		expired = true;
@@ -499,7 +491,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
			       rxrpc_propose_ack_ping_for_lost_ack);

	if (resend && call->state != RXRPC_CALL_CLIENT_RECV_REPLY)
	if (resend && __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY)
		rxrpc_resend(call, NULL);

	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
@@ -511,7 +503,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
			       rxrpc_propose_ack_input_data);

	/* Make sure the timer is restarted */
	if (call->state != RXRPC_CALL_COMPLETE) {
	if (!__rxrpc_call_is_complete(call)) {
		next = call->expect_rx_by;

#define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; }
@@ -532,7 +524,7 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
	}

out:
	if (call->state == RXRPC_CALL_COMPLETE) {
	if (__rxrpc_call_is_complete(call)) {
		del_timer_sync(&call->timer);
		if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
			rxrpc_disconnect_call(call);
+14 −16
Original line number Diff line number Diff line
@@ -69,7 +69,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t)

	_enter("%d", call->debug_id);

	if (call->state < RXRPC_CALL_COMPLETE) {
	if (!__rxrpc_call_is_complete(call)) {
		trace_rxrpc_timer_expired(call, jiffies);
		rxrpc_poke_call(call, rxrpc_call_poke_timer);
	}
@@ -162,7 +162,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
	init_waitqueue_head(&call->waitq);
	spin_lock_init(&call->notify_lock);
	spin_lock_init(&call->tx_lock);
	rwlock_init(&call->state_lock);
	refcount_set(&call->ref, 1);
	call->debug_id = debug_id;
	call->tx_total_len = -1;
@@ -211,7 +210,6 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
	now = ktime_get_real();
	call->acks_latest_ts	= now;
	call->cong_tstamp	= now;
	call->state		= RXRPC_CALL_CLIENT_AWAIT_CONN;
	call->dest_srx		= *srx;
	call->interruptibility	= p->interruptibility;
	call->tx_total_len	= p->tx_total_len;
@@ -227,11 +225,13 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,

	ret = rxrpc_init_client_call_security(call);
	if (ret < 0) {
		__rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
		rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret);
		rxrpc_put_call(call, rxrpc_call_put_discard_error);
		return ERR_PTR(ret);
	}

	rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_CONN);

	trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
			 p->user_call_ID, rxrpc_call_new_client);

@@ -384,8 +384,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
error_dup_user_ID:
	write_unlock(&rx->call_lock);
	release_sock(&rx->sk);
	__rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
				    RX_CALL_DEAD, -EEXIST);
	rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, -EEXIST);
	trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0,
			 rxrpc_call_see_userid_exists);
	rxrpc_release_call(rx, call);
@@ -403,8 +402,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
	trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), ret,
			 rxrpc_call_see_connect_failed);
	set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
	__rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
				    RX_CALL_DEAD, ret);
	rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret);
	_leave(" = c=%08x [err]", call->debug_id);
	return call;
}
@@ -427,24 +425,24 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
	call->call_id		= sp->hdr.callNumber;
	call->dest_srx.srx_service = sp->hdr.serviceId;
	call->cid		= sp->hdr.cid;
	call->state		= RXRPC_CALL_SERVER_SECURING;
	call->cong_tstamp	= skb->tstamp;

	__set_bit(RXRPC_CALL_EXPOSED, &call->flags);
	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SECURING);

	spin_lock(&conn->state_lock);

	switch (conn->state) {
	case RXRPC_CONN_SERVICE_UNSECURED:
	case RXRPC_CONN_SERVICE_CHALLENGING:
		call->state = RXRPC_CALL_SERVER_SECURING;
		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SECURING);
		break;
	case RXRPC_CONN_SERVICE:
		call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_RECV_REQUEST);
		break;

	case RXRPC_CONN_ABORTED:
		__rxrpc_set_call_completion(call, conn->completion,
		rxrpc_set_call_completion(call, conn->completion,
					  conn->abort_code, conn->error);
		break;
	default:
@@ -614,7 +612,7 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
	dead = __refcount_dec_and_test(&call->ref, &r);
	trace_rxrpc_call(debug_id, r - 1, 0, why);
	if (dead) {
		ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
		ASSERTCMP(__rxrpc_call_state(call), ==, RXRPC_CALL_COMPLETE);

		if (!list_empty(&call->link)) {
			spin_lock(&rxnet->call_lock);
@@ -677,7 +675,7 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
{
	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));

	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
	ASSERTCMP(__rxrpc_call_state(call), ==, RXRPC_CALL_COMPLETE);
	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));

	del_timer(&call->timer);
@@ -715,7 +713,7 @@ void rxrpc_destroy_all_calls(struct rxrpc_net *rxnet)

			pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",
			       call, refcount_read(&call->ref),
			       rxrpc_call_states[call->state],
			       rxrpc_call_states[__rxrpc_call_state(call)],
			       call->flags, call->events);

			spin_unlock(&rxnet->call_lock);
+33 −54
Original line number Diff line number Diff line
@@ -10,81 +10,60 @@
/*
 * Transition a call to the complete state.
 */
bool __rxrpc_set_call_completion(struct rxrpc_call *call,
bool rxrpc_set_call_completion(struct rxrpc_call *call,
				 enum rxrpc_call_completion compl,
				 u32 abort_code,
				 int error)
{
	if (call->state < RXRPC_CALL_COMPLETE) {
	if (__rxrpc_call_state(call) == RXRPC_CALL_COMPLETE)
		return false;

	call->abort_code = abort_code;
	call->error = error;
	call->completion = compl;
	/* Allow reader of completion state to operate locklessly */
		smp_store_release(&call->state, RXRPC_CALL_COMPLETE);
	rxrpc_set_call_state(call, RXRPC_CALL_COMPLETE);
	trace_rxrpc_call_complete(call);
	wake_up(&call->waitq);
	rxrpc_notify_socket(call);
	return true;
}
	return false;
}

bool rxrpc_set_call_completion(struct rxrpc_call *call,
			       enum rxrpc_call_completion compl,
			       u32 abort_code,
			       int error)
{
	bool ret = false;

	if (call->state < RXRPC_CALL_COMPLETE) {
		write_lock(&call->state_lock);
		ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
		write_unlock(&call->state_lock);
	}
	return ret;
}

/*
 * Record that a call successfully completed.
 */
bool __rxrpc_call_completed(struct rxrpc_call *call)
{
	return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}

bool rxrpc_call_completed(struct rxrpc_call *call)
{
	bool ret = false;

	if (call->state < RXRPC_CALL_COMPLETE) {
		write_lock(&call->state_lock);
		ret = __rxrpc_call_completed(call);
		write_unlock(&call->state_lock);
	}
	return ret;
	return rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}

/*
 * Record that a call is locally aborted.
 */
bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
		      u32 abort_code, int error, enum rxrpc_abort_reason why)
{
	trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
			  abort_code, error);
	return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
					   abort_code, error);
	if (!rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
				       abort_code, error))
		return false;
	if (test_bit(RXRPC_CALL_EXPOSED, &call->flags))
		rxrpc_send_abort_packet(call);
	return true;
}

bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
		      u32 abort_code, int error, enum rxrpc_abort_reason why)
/*
 * Record that a call errored out before even getting off the ground, thereby
 * setting the state to allow it to be destroyed.
 */
void rxrpc_prefail_call(struct rxrpc_call *call, enum rxrpc_call_completion compl,
			int error)
{
	bool ret;

	write_lock(&call->state_lock);
	ret = __rxrpc_abort_call(call, seq, abort_code, error, why);
	write_unlock(&call->state_lock);
	if (ret && test_bit(RXRPC_CALL_EXPOSED, &call->flags))
		rxrpc_send_abort_packet(call);
	return ret;
	call->abort_code	= RX_CALL_DEAD;
	call->error		= error;
	call->completion	= compl;
	call->_state		= RXRPC_CALL_COMPLETE;
	trace_rxrpc_call_complete(call);
	__set_bit(RXRPC_CALL_RELEASED, &call->flags);
}
Loading