Unverified Commit fd46506a authored by openeuler-ci-bot's avatar openeuler-ci-bot Committed by Gitee
Browse files

!847 [OLK-5.10] net/smc: support cork option

Merge Pull Request from: @giree2 
 
backport cork patchset from upstream:
1. net/smc: Send directly when TCP_CORK is cleared
2. net/smc: Remove corked dealyed work
3. net/smc: Cork when sendpage with MSG_SENDPAGE_NOTLAST flag
4. net/smc: Add comment for smc_tx_pending
5. net/smc: Call trace_smc_tx_sendmsg when data corked
6. net/smc: add autocorking support
7. net/smc: send directly on setting TCP_NODELAY
8. net/smc: don't send in the BH context if sock_owned_by_user
9. net/smc: Send out the remaining data in sndbuf before close 
 
Link:https://gitee.com/openeuler/kernel/pulls/847

 

Reviewed-by: default avatarJialin Zhang <zhangjialin11@huawei.com>
Signed-off-by: default avatarJialin Zhang <zhangjialin11@huawei.com>
parents bc8d0e2f bd4713d5
Loading
Loading
Loading
Loading
+29 −8
Original line number Diff line number Diff line
@@ -104,12 +104,27 @@ void smc_unhash_sk(struct sock *sk)
}
EXPORT_SYMBOL_GPL(smc_unhash_sk);

/* This will be called before user really release sock_lock. So do the
 * work which we didn't do because of user hold the sock_lock in the
 * BH context
 */
static void smc_release_cb(struct sock *sk)
{
	struct smc_sock *smc = smc_sk(sk);

	if (smc->conn.tx_in_release_sock) {
		smc_tx_pending(&smc->conn);
		smc->conn.tx_in_release_sock = false;
	}
}

struct proto smc_proto = {
	.name		= "SMC",
	.owner		= THIS_MODULE,
	.keepalive	= smc_set_keepalive,
	.hash		= smc_hash_sk,
	.unhash		= smc_unhash_sk,
	.release_cb	= smc_release_cb,
	.obj_size	= sizeof(struct smc_sock),
	.h.smc_hash	= &smc_v4_hashinfo,
	.slab_flags	= SLAB_TYPESAFE_BY_RCU,
@@ -122,6 +137,7 @@ struct proto smc_proto6 = {
	.keepalive	= smc_set_keepalive,
	.hash		= smc_hash_sk,
	.unhash		= smc_unhash_sk,
	.release_cb	= smc_release_cb,
	.obj_size	= sizeof(struct smc_sock),
	.h.smc_hash	= &smc_v6_hashinfo,
	.slab_flags	= SLAB_TYPESAFE_BY_RCU,
@@ -2228,18 +2244,20 @@ static int smc_setsockopt(struct socket *sock, int level, int optname,
		if (sk->sk_state != SMC_INIT &&
		    sk->sk_state != SMC_LISTEN &&
		    sk->sk_state != SMC_CLOSED) {
			if (val)
				mod_delayed_work(smc->conn.lgr->tx_wq,
						 &smc->conn.tx_work, 0);
			if (val) {
				smc_tx_pending(&smc->conn);
				cancel_delayed_work(&smc->conn.tx_work);
			}
		}
		break;
	case TCP_CORK:
		if (sk->sk_state != SMC_INIT &&
		    sk->sk_state != SMC_LISTEN &&
		    sk->sk_state != SMC_CLOSED) {
			if (!val)
				mod_delayed_work(smc->conn.lgr->tx_wq,
						 &smc->conn.tx_work, 0);
			if (!val) {
				smc_tx_pending(&smc->conn);
				cancel_delayed_work(&smc->conn.tx_work);
			}
		}
		break;
	case TCP_DEFER_ACCEPT:
@@ -2365,8 +2383,11 @@ static ssize_t smc_sendpage(struct socket *sock, struct page *page,
	if (smc->use_fallback)
		rc = kernel_sendpage(smc->clcsock, page, offset,
				     size, flags);
	else
		rc = sock_no_sendpage(sock, page, offset, size, flags);
	else {
		lock_sock(sk);
		rc = smc_tx_sendpage(smc, page, offset, size, flags);
		release_sock(sk);
	}

out:
	return rc;
+6 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@
#define SMC_MAX_ISM_DEVS	8	/* max # of proposed non-native ISM
					 * devices
					 */
#define SMC_AUTOCORKING_DEFAULT_SIZE	0x10000	/* 64K by default */

#define SMC_MAX_HOSTNAME_LEN	32
#define SMC_MAX_EID_LEN		32
@@ -175,6 +176,7 @@ struct smc_connection {
						 * - dec on polled tx cqe
						 */
	wait_queue_head_t	cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/
	atomic_t		tx_pushing;     /* nr_threads trying tx push */
	struct delayed_work	tx_work;	/* retry of smc_cdc_msg_send */
	u32			tx_off;		/* base offset in peer rmb */

@@ -194,6 +196,10 @@ struct smc_connection {
						 * data still pending
						 */
	char			urg_rx_byte;	/* urgent byte */
	bool			tx_in_release_sock;
						/* flush pending tx data in
						 * sock release_cb()
						 */
	atomic_t		bytes_to_rcv;	/* arrived data,
						 * not yet received
						 */
+19 −5
Original line number Diff line number Diff line
@@ -48,9 +48,19 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
		conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
	}

	if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) &&
	    unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
	if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) {
		/* If user owns the sock_lock, mark the connection need sending.
		 * User context will later try to send when it release sock_lock
		 * in smc_release_cb()
		 */
		if (sock_owned_by_user(&smc->sk))
			conn->tx_in_release_sock = true;
		else
			smc_tx_pending(conn);

		if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
			wake_up(&conn->cdc_pend_tx_wq);
	}
	WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);

	smc_tx_sndbuf_nonfull(smc);
@@ -352,8 +362,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
	/* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
	if ((diff_cons && smc_tx_prepared_sends(conn)) ||
	    conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
	    conn->local_rx_ctrl.prod_flags.urg_data_pending)
		smc_tx_sndbuf_nonempty(conn);
	    conn->local_rx_ctrl.prod_flags.urg_data_pending) {
		if (!sock_owned_by_user(&smc->sk))
			smc_tx_pending(conn);
		else
			conn->tx_in_release_sock = true;
	}

	if (diff_cons && conn->urg_tx_pend &&
	    atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
+3 −0
Original line number Diff line number Diff line
@@ -57,6 +57,9 @@ static void smc_close_stream_wait(struct smc_sock *smc, long timeout)
	if (!smc_tx_prepared_sends(&smc->conn))
		return;

	/* Send out corked data remaining in sndbuf */
	smc_tx_pending(&smc->conn);

	smc->wait_close_tx_prepared = 1;
	add_wait_queue(sk_sleep(sk), &wait);
	while (!signal_pending(current) && timeout) {
+128 −24
Original line number Diff line number Diff line
@@ -29,7 +29,6 @@
#include "smc_tx.h"

#define SMC_TX_WORK_DELAY	0
#define SMC_TX_CORK_DELAY	(HZ >> 2)	/* 250 ms */

/***************************** sndbuf producer *******************************/

@@ -128,6 +127,51 @@ static bool smc_tx_is_corked(struct smc_sock *smc)
	return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
}

/* If we have pending CDC messages, do not send:
 * Because CQE of this CDC message will happen shortly, it gives
 * a chance to coalesce future sendmsg() payload in to one RDMA Write,
 * without need for a timer, and with no latency trade off.
 * Algorithm here:
 *  1. First message should never cork
 *  2. If we have pending Tx CDC messages, wait for the first CDC
 *     message's completion
 *  3. Don't cork to much data in a single RDMA Write to prevent burst
 *     traffic, total corked message should not exceed sendbuf/2
 */
static bool smc_should_autocork(struct smc_sock *smc)
{
	struct smc_connection *conn = &smc->conn;
	int corking_size;

	corking_size = min(SMC_AUTOCORKING_DEFAULT_SIZE,
			   conn->sndbuf_desc->len >> 1);

	if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
	    smc_tx_prepared_sends(conn) > corking_size)
		return false;
	return true;
}

static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
{
	struct smc_connection *conn = &smc->conn;

	if (smc_should_autocork(smc))
		return true;

	/* for a corked socket defer the RDMA writes if
	 * sndbuf_space is still available. The applications
	 * should known how/when to uncork it.
	 */
	if ((msg->msg_flags & MSG_MORE ||
	     smc_tx_is_corked(smc) ||
	     msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
	    atomic_read(&conn->sndbuf_space))
		return true;

	return false;
}

/* sndbuf producer: main API called by socket layer.
 * called under sock lock.
 */
@@ -221,15 +265,10 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
		 */
		if ((msg->msg_flags & MSG_OOB) && !send_remaining)
			conn->urg_tx_pend = true;
		if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
		    (atomic_read(&conn->sndbuf_space) >
						(conn->sndbuf_desc->len >> 1)))
			/* for a corked socket defer the RDMA writes if there
			 * is still sufficient sndbuf_space available
		/* If we need to cork, do nothing and wait for the next
		 * sendmsg() call or push on tx completion
		 */
			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
					   SMC_TX_CORK_DELAY);
		else
		if (!smc_tx_should_cork(smc, msg))
			smc_tx_sndbuf_nonempty(conn);
	} /* while (msg_data_left(msg)) */

@@ -243,6 +282,22 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
	return rc;
}

int smc_tx_sendpage(struct smc_sock *smc, struct page *page, int offset,
		    size_t size, int flags)
{
	struct msghdr msg = {.msg_flags = flags};
	char *kaddr = kmap(page);
	struct kvec iov;
	int rc;

	iov.iov_base = kaddr + offset;
	iov.iov_len = size;
	iov_iter_kvec(&msg.msg_iter, WRITE, &iov, 1, size);
	rc = smc_tx_sendmsg(smc, &msg, size);
	kunmap(page);
	return rc;
}

/***************************** sndbuf consumer *******************************/

/* sndbuf consumer: actual data transfer of one target chunk with ISM write */
@@ -555,13 +610,24 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
	return rc;
}

int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn)
{
	int rc;
	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
	int rc = 0;

	/* No data in the send queue */
	if (unlikely(smc_tx_prepared_sends(conn) <= 0))
		goto out;

	/* Peer don't have RMBE space */
	if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0))
		goto out;

	if (conn->killed ||
	    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
		return -EPIPE;	/* connection being aborted */
	    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
		rc = -EPIPE;    /* connection being aborted */
		goto out;
	}
	if (conn->lgr->is_smcd)
		rc = smcd_tx_sndbuf_nonempty(conn);
	else
@@ -569,34 +635,72 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn)

	if (!rc) {
		/* trigger socket release if connection is closing */
		struct smc_sock *smc = container_of(conn, struct smc_sock,
						    conn);
		smc_close_wake_tx_prepared(smc);
	}

out:
	return rc;
}

int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
{
	int rc;

	/* This make sure only one can send simultaneously to prevent wasting
	 * of CPU and CDC slot.
	 * Record whether someone has tried to push while we are pushing.
	 */
	if (atomic_inc_return(&conn->tx_pushing) > 1)
		return 0;

again:
	atomic_set(&conn->tx_pushing, 1);
	smp_wmb(); /* Make sure tx_pushing is 1 before real send */
	rc = __smc_tx_sndbuf_nonempty(conn);

	/* We need to check whether someone else have added some data into
	 * the send queue and tried to push but failed after the atomic_set()
	 * when we are pushing.
	 * If so, we need to push again to prevent those data hang in the send
	 * queue.
	 */
	if (unlikely(!atomic_dec_and_test(&conn->tx_pushing)))
		goto again;

	return rc;
}

/* Wakeup sndbuf consumers from process context
 * since there is more data to transmit
 * since there is more data to transmit. The caller
 * must hold sock lock.
 */
void smc_tx_work(struct work_struct *work)
void smc_tx_pending(struct smc_connection *conn)
{
	struct smc_connection *conn = container_of(to_delayed_work(work),
						   struct smc_connection,
						   tx_work);
	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
	int rc;

	lock_sock(&smc->sk);
	if (smc->sk.sk_err)
		goto out;
		return;

	rc = smc_tx_sndbuf_nonempty(conn);
	if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
	    !atomic_read(&conn->bytes_to_rcv))
		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
}

out:
/* Wakeup sndbuf consumers from process context
 * since there is more data to transmit in locked
 * sock.
 */
void smc_tx_work(struct work_struct *work)
{
	struct smc_connection *conn = container_of(to_delayed_work(work),
						   struct smc_connection,
						   tx_work);
	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);

	lock_sock(&smc->sk);
	smc_tx_pending(conn);
	release_sock(&smc->sk);
}

Loading