Commit 2eb2b935 authored by Chuck Lever's avatar Chuck Lever
Browse files

SUNRPC: Convert svc_tcp_sendmsg to use bio_vecs directly



Add a helper to convert a whole xdr_buf directly into an array of
bio_vecs, then send this array instead of iterating piecemeal over
the xdr_buf containing the outbound RPC message.

Reviewed-by: default avatarDavid Howells <dhowells@redhat.com>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent d4247970
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -139,6 +139,8 @@ void xdr_terminate_string(const struct xdr_buf *, const u32);
size_t	xdr_buf_pagecount(const struct xdr_buf *buf);
int	xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
void	xdr_free_bvec(struct xdr_buf *buf);
unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
			     const struct xdr_buf *xdr);

static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
{
+20 −44
Original line number Diff line number Diff line
@@ -36,6 +36,8 @@
#include <linux/skbuff.h>
#include <linux/file.h>
#include <linux/freezer.h>
#include <linux/bvec.h>

#include <net/sock.h>
#include <net/checksum.h>
#include <net/ip.h>
@@ -1194,77 +1196,52 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
	return 0;	/* record not complete */
}

static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
			      int flags)
{
	struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };

	iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
	return sock_sendmsg(sock, &msg);
}

/*
 * MSG_SPLICE_PAGES is used exclusively to reduce the number of
 * copy operations in this path. Therefore the caller must ensure
 * that the pages backing @xdr are unchanging.
 *
 * In addition, the logic assumes that * .bv_len is never larger
 * than PAGE_SIZE.
 * Note that the send is non-blocking. The caller has incremented
 * the reference count on each page backing the RPC message, and
 * the network layer will "put" these pages when transmission is
 * complete.
 *
 * This is safe for our RPC services because the memory backing
 * the head and tail components is never kmalloc'd. These always
 * come from pages in the svc_rqst::rq_pages array.
 */
static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
static int svc_tcp_sendmsg(struct svc_sock *svsk, struct svc_rqst *rqstp,
			   rpc_fraghdr marker, unsigned int *sentp)
{
	const struct kvec *head = xdr->head;
	const struct kvec *tail = xdr->tail;
	struct kvec rm = {
		.iov_base	= &marker,
		.iov_len	= sizeof(marker),
	};
	struct msghdr msg = {
		.msg_flags	= 0,
		.msg_flags	= MSG_MORE,
	};
	unsigned int count;
	int ret;

	*sentp = 0;
	ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
	if (ret < 0)
		return ret;

	ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
	ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
	if (ret < 0)
		return ret;
	*sentp += ret;
	if (ret != rm.iov_len)
		return -EAGAIN;

	ret = svc_tcp_send_kvec(sock, head, 0);
	if (ret < 0)
		return ret;
	*sentp += ret;
	if (ret != head->iov_len)
		goto out;

	if (xdr_buf_pagecount(xdr)) {
		xdr->bvec[0].bv_offset = offset_in_page(xdr->page_base);
		xdr->bvec[0].bv_len -= offset_in_page(xdr->page_base);
	}
	count = xdr_buf_to_bvec(rqstp->rq_bvec, ARRAY_SIZE(rqstp->rq_bvec),
				&rqstp->rq_res);

	msg.msg_flags = MSG_SPLICE_PAGES;
	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
		      xdr_buf_pagecount(xdr), xdr->page_len);
	ret = sock_sendmsg(sock, &msg);
	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, rqstp->rq_bvec,
		      count, rqstp->rq_res.len);
	ret = sock_sendmsg(svsk->sk_sock, &msg);
	if (ret < 0)
		return ret;
	*sentp += ret;

	if (tail->iov_len) {
		ret = svc_tcp_send_kvec(sock, tail, 0);
		if (ret < 0)
			return ret;
		*sentp += ret;
	}

out:
	return 0;
}

@@ -1295,8 +1272,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
	if (svc_xprt_is_dead(xprt))
		goto out_notconn;
	tcp_sock_set_cork(svsk->sk_sk, true);
	err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
	xdr_free_bvec(xdr);
	err = svc_tcp_sendmsg(svsk, rqstp, marker, &sent);
	trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
	if (err < 0 || sent != (xdr->len + sizeof(marker)))
		goto out_close;
+50 −0
Original line number Diff line number Diff line
@@ -164,6 +164,56 @@ xdr_free_bvec(struct xdr_buf *buf)
	buf->bvec = NULL;
}

/**
 * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
 * @bvec: bio_vec array to populate
 * @bvec_size: element count of @bio_vec
 * @xdr: xdr_buf to be copied
 *
 * Returns the number of entries consumed in @bvec.
 */
unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
			     const struct xdr_buf *xdr)
{
	const struct kvec *head = xdr->head;
	const struct kvec *tail = xdr->tail;
	unsigned int count = 0;

	if (head->iov_len) {
		bvec_set_virt(bvec++, head->iov_base, head->iov_len);
		++count;
	}

	if (xdr->page_len) {
		unsigned int offset, len, remaining;
		struct page **pages = xdr->pages;

		offset = offset_in_page(xdr->page_base);
		remaining = xdr->page_len;
		while (remaining > 0) {
			len = min_t(unsigned int, remaining,
				    PAGE_SIZE - offset);
			bvec_set_page(bvec++, *pages++, len, offset);
			remaining -= len;
			offset = 0;
			if (unlikely(++count > bvec_size))
				goto bvec_overflow;
		}
	}

	if (tail->iov_len) {
		bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
		if (unlikely(++count > bvec_size))
			goto bvec_overflow;
	}

	return count;

bvec_overflow:
	pr_warn_once("%s: bio_vec array overflow\n", __func__);
	return count - 1;
}

/**
 * xdr_inline_pages - Prepare receive buffer for a large reply
 * @xdr: xdr_buf into which reply will be placed