Commit eb42cebb authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe
Browse files

io_uring: add zc notification infrastructure



Add internal part of send zerocopy notifications. There are two main
structures, the first one is struct io_notif, which carries inside
struct ubuf_info and maps 1:1 to it. io_uring will be binding a number
of zerocopy send requests to it and ask to complete (aka flush) it. When
flushed and all attached requests and skbs complete, it'll generate one
and only one CQE. There are intended to be passed into the network layer
as struct msghdr::msg_ubuf.

The second concept is notification slots. The userspace will be able to
register an array of slots and subsequently addressing them by the index
in the array. Slots are independent of each other. Each slot can have
only one notifier at a time (called active notifier) but many notifiers
during the lifetime. When active, a notifier not going to post any
completion but the userspace can attach requests to it by specifying
the corresponding slot while issueing send zc requests. Eventually, the
userspace will want to "flush" the notifier losing any way to attach
new requests to it, however it can use the next atomatically added
notifier of this slot or of any other slot.

When the network layer is done with all enqueued skbs attached to a
notifier and doesn't need the specified in them user data, the flushed
notifier will post a CQE.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/3ecf54c31a85762bf679b0a432c9f43ecf7e61cc.1657643355.git.asml.silence@gmail.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent e70cb608
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -34,6 +34,9 @@ struct io_file_table {
	unsigned int alloc_hint;
};

struct io_notif;
struct io_notif_slot;

struct io_hash_bucket {
	spinlock_t		lock;
	struct hlist_head	list;
@@ -237,6 +240,8 @@ struct io_ring_ctx {
		unsigned		nr_user_files;
		unsigned		nr_user_bufs;
		struct io_mapped_ubuf	**user_bufs;
		struct io_notif_slot	*notif_slots;
		unsigned		nr_notif_slots;

		struct io_submit_state	submit_state;

+1 −1
Original line number Diff line number Diff line
@@ -7,5 +7,5 @@ obj-$(CONFIG_IO_URING) += io_uring.o xattr.o nop.o fs.o splice.o \
					openclose.o uring_cmd.o epoll.o \
					statx.o net.o msg_ring.o timeout.o \
					sqpoll.o fdinfo.o tctx.o poll.o \
					cancel.o kbuf.o rsrc.o rw.o opdef.o
					cancel.o kbuf.o rsrc.o rw.o opdef.o notif.o
obj-$(CONFIG_IO_WQ)		+= io-wq.o
+5 −3
Original line number Diff line number Diff line
@@ -90,6 +90,7 @@
#include "rsrc.h"
#include "cancel.h"
#include "net.h"
#include "notif.h"

#include "timeout.h"
#include "poll.h"
@@ -732,8 +733,7 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
	return &rings->cqes[off];
}

static bool io_fill_cqe_aux(struct io_ring_ctx *ctx,
			    u64 user_data, s32 res, u32 cflags,
bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
		     bool allow_overflow)
{
	struct io_uring_cqe *cqe;
@@ -2491,6 +2491,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
	}
#endif
	WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
	WARN_ON_ONCE(ctx->notif_slots || ctx->nr_notif_slots);

	io_mem_free(ctx->rings);
	io_mem_free(ctx->sq_sqes);
@@ -2667,6 +2668,7 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
		io_unregister_personality(ctx, index);
	if (ctx->rings)
		io_poll_remove_all(ctx, NULL, true);
	io_notif_unregister(ctx);
	mutex_unlock(&ctx->uring_lock);

	/* failed during ring init, it couldn't have issued any requests */
+2 −0
Original line number Diff line number Diff line
@@ -33,6 +33,8 @@ void io_req_complete_post(struct io_kiocb *req);
void __io_req_complete_post(struct io_kiocb *req);
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
		     bool allow_overflow);
bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
		     bool allow_overflow);
void __io_commit_cqring_flush(struct io_ring_ctx *ctx);

struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages);

io_uring/notif.c

0 → 100644
+102 −0
Original line number Diff line number Diff line
#include <linux/kernel.h>
#include <linux/errno.h>
#include <linux/file.h>
#include <linux/slab.h>
#include <linux/net.h>
#include <linux/io_uring.h>

#include "io_uring.h"
#include "notif.h"

static void __io_notif_complete_tw(struct callback_head *cb)
{
	struct io_notif *notif = container_of(cb, struct io_notif, task_work);
	struct io_ring_ctx *ctx = notif->ctx;

	io_cq_lock(ctx);
	io_fill_cqe_aux(ctx, notif->tag, 0, notif->seq, true);
	io_cq_unlock_post(ctx);

	percpu_ref_put(&ctx->refs);
	kfree(notif);
}

static inline void io_notif_complete(struct io_notif *notif)
{
	__io_notif_complete_tw(&notif->task_work);
}

static void io_notif_complete_wq(struct work_struct *work)
{
	struct io_notif *notif = container_of(work, struct io_notif, commit_work);

	io_notif_complete(notif);
}

static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
					  struct ubuf_info *uarg,
					  bool success)
{
	struct io_notif *notif = container_of(uarg, struct io_notif, uarg);

	if (!refcount_dec_and_test(&uarg->refcnt))
		return;
	INIT_WORK(&notif->commit_work, io_notif_complete_wq);
	queue_work(system_unbound_wq, &notif->commit_work);
}

struct io_notif *io_alloc_notif(struct io_ring_ctx *ctx,
				struct io_notif_slot *slot)
	__must_hold(&ctx->uring_lock)
{
	struct io_notif *notif;

	notif = kzalloc(sizeof(*notif), GFP_ATOMIC | __GFP_ACCOUNT);
	if (!notif)
		return NULL;

	notif->seq = slot->seq++;
	notif->tag = slot->tag;
	notif->ctx = ctx;
	notif->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
	notif->uarg.callback = io_uring_tx_zerocopy_callback;
	/* master ref owned by io_notif_slot, will be dropped on flush */
	refcount_set(&notif->uarg.refcnt, 1);
	percpu_ref_get(&ctx->refs);
	return notif;
}

static void io_notif_slot_flush(struct io_notif_slot *slot)
	__must_hold(&ctx->uring_lock)
{
	struct io_notif *notif = slot->notif;

	slot->notif = NULL;

	if (WARN_ON_ONCE(in_interrupt()))
		return;
	/* drop slot's master ref */
	if (refcount_dec_and_test(&notif->uarg.refcnt))
		io_notif_complete(notif);
}

__cold int io_notif_unregister(struct io_ring_ctx *ctx)
	__must_hold(&ctx->uring_lock)
{
	int i;

	if (!ctx->notif_slots)
		return -ENXIO;

	for (i = 0; i < ctx->nr_notif_slots; i++) {
		struct io_notif_slot *slot = &ctx->notif_slots[i];

		if (slot->notif)
			io_notif_slot_flush(slot);
	}

	kvfree(ctx->notif_slots);
	ctx->notif_slots = NULL;
	ctx->nr_notif_slots = 0;
	return 0;
}
 No newline at end of file
Loading