Commit b48c312b authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe
Browse files

io_uring/net: simplify zerocopy send user API



Following user feedback, this patch simplifies zerocopy send API. One of
the main complaints is that the current API is difficult with the
userspace managing notification slots, and then send retries with error
handling make it even worse.

Instead of keeping notification slots change it to the per-request
notifications model, which posts both completion and notification CQEs
for each request when any data has been sent, and only one CQE if it
fails. All notification CQEs will have IORING_CQE_F_NOTIF set and
IORING_CQE_F_MORE in completion CQEs indicates whether to wait a
notification or not.

IOSQE_CQE_SKIP_SUCCESS is disallowed with zerocopy sends for now.

This is less flexible, but greatly simplifies the user API and also the
kernel implementation. We reuse notif helpers in this patch, but in the
future there won't be need for keeping two requests.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/95287640ab98fc9417370afb16e310677c63e6ce.1662027856.git.asml.silence@gmail.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 57f33224
Loading
Loading
Loading
Loading
+5 −2
Original line number Diff line number Diff line
@@ -71,8 +71,8 @@ struct io_uring_sqe {
		__s32	splice_fd_in;
		__u32	file_index;
		struct {
			__u16	notification_idx;
			__u16	addr_len;
			__u16	__pad3[1];
		};
	};
	union {
@@ -205,7 +205,7 @@ enum io_uring_op {
	IORING_OP_GETXATTR,
	IORING_OP_SOCKET,
	IORING_OP_URING_CMD,
	IORING_OP_SENDZC_NOTIF,
	IORING_OP_SEND_ZC,

	/* this goes last, obviously */
	IORING_OP_LAST,
@@ -326,10 +326,13 @@ struct io_uring_cqe {
 * IORING_CQE_F_BUFFER	If set, the upper 16 bits are the buffer ID
 * IORING_CQE_F_MORE	If set, parent SQE will generate more CQE entries
 * IORING_CQE_F_SOCK_NONEMPTY	If set, more data to read after socket recv
 * IORING_CQE_F_NOTIF	Set for notification CQEs. Can be used to distinct
 * 			them from sends.
 */
#define IORING_CQE_F_BUFFER		(1U << 0)
#define IORING_CQE_F_MORE		(1U << 1)
#define IORING_CQE_F_SOCK_NONEMPTY	(1U << 2)
#define IORING_CQE_F_NOTIF		(1U << 3)

enum {
	IORING_CQE_BUFFER_SHIFT		= 16,
+2 −2
Original line number Diff line number Diff line
@@ -3923,8 +3923,8 @@ static int __init io_uring_init(void)
	BUILD_BUG_SQE_ELEM(42, __u16,  personality);
	BUILD_BUG_SQE_ELEM(44, __s32,  splice_fd_in);
	BUILD_BUG_SQE_ELEM(44, __u32,  file_index);
	BUILD_BUG_SQE_ELEM(44, __u16,  notification_idx);
	BUILD_BUG_SQE_ELEM(46, __u16,  addr_len);
	BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
	BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
	BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
	BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
	BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
+33 −20
Original line number Diff line number Diff line
@@ -65,12 +65,12 @@ struct io_sendzc {
	struct file			*file;
	void __user			*buf;
	size_t				len;
	u16				slot_idx;
	unsigned			msg_flags;
	unsigned			flags;
	unsigned			addr_len;
	void __user			*addr;
	size_t				done_io;
	struct io_kiocb 		*notif;
};

#define IO_APOLL_MULTI_POLLED (REQ_F_APOLL_MULTISHOT | REQ_F_POLLED)
@@ -879,12 +879,26 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
	return ret;
}

void io_sendzc_cleanup(struct io_kiocb *req)
{
	struct io_sendzc *zc = io_kiocb_to_cmd(req, struct io_sendzc);

	zc->notif->flags |= REQ_F_CQE_SKIP;
	io_notif_flush(zc->notif);
	zc->notif = NULL;
}

int io_sendzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
	struct io_sendzc *zc = io_kiocb_to_cmd(req, struct io_sendzc);
	struct io_ring_ctx *ctx = req->ctx;
	struct io_kiocb *notif;

	if (READ_ONCE(sqe->__pad2[0]) || READ_ONCE(sqe->addr3))
	if (READ_ONCE(sqe->__pad2[0]) || READ_ONCE(sqe->addr3) ||
	    READ_ONCE(sqe->__pad3[0]))
		return -EINVAL;
	/* we don't support IOSQE_CQE_SKIP_SUCCESS just yet */
	if (req->flags & REQ_F_CQE_SKIP)
		return -EINVAL;

	zc->flags = READ_ONCE(sqe->ioprio);
@@ -900,11 +914,17 @@ int io_sendzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
		req->imu = READ_ONCE(ctx->user_bufs[idx]);
		io_req_set_rsrc_node(req, ctx, 0);
	}
	notif = zc->notif = io_alloc_notif(ctx);
	if (!notif)
		return -ENOMEM;
	notif->cqe.user_data = req->cqe.user_data;
	notif->cqe.res = 0;
	notif->cqe.flags = IORING_CQE_F_NOTIF;
	req->flags |= REQ_F_NEED_CLEANUP;

	zc->buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
	zc->len = READ_ONCE(sqe->len);
	zc->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
	zc->slot_idx = READ_ONCE(sqe->notification_idx);
	if (zc->msg_flags & MSG_DONTWAIT)
		req->flags |= REQ_F_NOWAIT;

@@ -976,33 +996,20 @@ static int io_sg_from_iter(struct sock *sk, struct sk_buff *skb,
int io_sendzc(struct io_kiocb *req, unsigned int issue_flags)
{
	struct sockaddr_storage __address, *addr = NULL;
	struct io_ring_ctx *ctx = req->ctx;
	struct io_sendzc *zc = io_kiocb_to_cmd(req, struct io_sendzc);
	struct io_notif_slot *notif_slot;
	struct io_kiocb *notif;
	struct msghdr msg;
	struct iovec iov;
	struct socket *sock;
	unsigned msg_flags;
	unsigned msg_flags, cflags;
	int ret, min_ret = 0;

	if (!(req->flags & REQ_F_POLLED) &&
	    (zc->flags & IORING_RECVSEND_POLL_FIRST))
		return -EAGAIN;

	if (issue_flags & IO_URING_F_UNLOCKED)
		return -EAGAIN;
	sock = sock_from_file(req->file);
	if (unlikely(!sock))
		return -ENOTSOCK;

	notif_slot = io_get_notif_slot(ctx, zc->slot_idx);
	if (!notif_slot)
		return -EINVAL;
	notif = io_get_notif(ctx, notif_slot);
	if (!notif)
		return -ENOMEM;

	msg.msg_name = NULL;
	msg.msg_control = NULL;
	msg.msg_controllen = 0;
@@ -1033,7 +1040,7 @@ int io_sendzc(struct io_kiocb *req, unsigned int issue_flags)
					  &msg.msg_iter);
		if (unlikely(ret))
			return ret;
		ret = io_notif_account_mem(notif, zc->len);
		ret = io_notif_account_mem(zc->notif, zc->len);
		if (unlikely(ret))
			return ret;
	}
@@ -1045,7 +1052,7 @@ int io_sendzc(struct io_kiocb *req, unsigned int issue_flags)
		min_ret = iov_iter_count(&msg.msg_iter);

	msg.msg_flags = msg_flags;
	msg.msg_ubuf = &io_notif_to_data(notif)->uarg;
	msg.msg_ubuf = &io_notif_to_data(zc->notif)->uarg;
	msg.sg_from_iter = io_sg_from_iter;
	ret = sock_sendmsg(sock, &msg);

@@ -1060,6 +1067,8 @@ int io_sendzc(struct io_kiocb *req, unsigned int issue_flags)
			req->flags |= REQ_F_PARTIAL_IO;
			return io_setup_async_addr(req, addr, issue_flags);
		}
		if (ret < 0 && !zc->done_io)
			zc->notif->flags |= REQ_F_CQE_SKIP;
		if (ret == -ERESTARTSYS)
			ret = -EINTR;
		req_set_fail(req);
@@ -1069,7 +1078,11 @@ int io_sendzc(struct io_kiocb *req, unsigned int issue_flags)
		ret += zc->done_io;
	else if (zc->done_io)
		ret = zc->done_io;
	io_req_set_res(req, ret, 0);

	io_notif_flush(zc->notif);
	req->flags &= ~REQ_F_NEED_CLEANUP;
	cflags = ret >= 0 ? IORING_CQE_F_MORE : 0;
	io_req_set_res(req, ret, cflags);
	return IOU_OK;
}

+1 −0
Original line number Diff line number Diff line
@@ -55,6 +55,7 @@ int io_connect(struct io_kiocb *req, unsigned int issue_flags);

int io_sendzc(struct io_kiocb *req, unsigned int issue_flags);
int io_sendzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
void io_sendzc_cleanup(struct io_kiocb *req);

void io_netmsg_cache_free(struct io_cache_entry *entry);
#else
+2 −10
Original line number Diff line number Diff line
@@ -42,8 +42,7 @@ static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
	}
}

struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx,
				struct io_notif_slot *slot)
struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx)
	__must_hold(&ctx->uring_lock)
{
	struct io_kiocb *notif;
@@ -59,27 +58,20 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx,
	io_get_task_refs(1);
	notif->rsrc_node = NULL;
	io_req_set_rsrc_node(notif, ctx, 0);
	notif->cqe.user_data = slot->tag;
	notif->cqe.flags = slot->seq++;
	notif->cqe.res = 0;

	nd = io_notif_to_data(notif);
	nd->account_pages = 0;
	nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
	nd->uarg.callback = io_uring_tx_zerocopy_callback;
	/* master ref owned by io_notif_slot, will be dropped on flush */
	refcount_set(&nd->uarg.refcnt, 1);
	return notif;
}

void io_notif_slot_flush(struct io_notif_slot *slot)
void io_notif_flush(struct io_kiocb *notif)
	__must_hold(&slot->notif->ctx->uring_lock)
{
	struct io_kiocb *notif = slot->notif;
	struct io_notif_data *nd = io_notif_to_data(notif);

	slot->notif = NULL;

	/* drop slot's master ref */
	if (refcount_dec_and_test(&nd->uarg.refcnt)) {
		notif->io_task_work.func = __io_notif_complete_tw;
Loading