Commit 09899b19 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe
Browse files

io_uring: cache task struct refs



tctx in submission part is always synchronised because is executed from
the task's context, so we can batch allocate tctx/task references and
store them across syscall boundaries. It avoids enough of operations,
including an atomic for getting task ref and a percpu_counter_add()
function call, which still fallback to spinlock for large batching
cases (around >=32). Should be good for SQPOLL submitting in small
portions and coming at some moment bpf submissions.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/14b327b973410a3eec1f702ecf650e100513aca9.1623634181.git.asml.silence@gmail.com


Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 2d091d62
Loading
Loading
Loading
Loading
+28 −9
Original line number Diff line number Diff line
@@ -110,6 +110,8 @@
				IOSQE_IO_HARDLINK | IOSQE_ASYNC | \
				IOSQE_BUFFER_SELECT)

#define IO_TCTX_REFS_CACHE_NR	(1U << 10)

struct io_uring {
	u32 head ____cacheline_aligned_in_smp;
	u32 tail ____cacheline_aligned_in_smp;
@@ -472,6 +474,7 @@ struct io_ring_ctx {

struct io_uring_task {
	/* submission side */
	int			cached_refs;
	struct xarray		xa;
	struct wait_queue_head	wait;
	const struct io_ring_ctx *last;
@@ -6707,16 +6710,23 @@ static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)

static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
{
	struct io_uring_task *tctx;
	int submitted = 0;

	/* make sure SQ entry isn't read before tail */
	nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx));

	if (!percpu_ref_tryget_many(&ctx->refs, nr))
		return -EAGAIN;

	percpu_counter_add(&current->io_uring->inflight, nr);
	refcount_add(nr, &current->usage);
	tctx = current->io_uring;
	tctx->cached_refs -= nr;
	if (unlikely(tctx->cached_refs < 0)) {
		unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;

		percpu_counter_add(&tctx->inflight, refill);
		refcount_add(refill, &current->usage);
		tctx->cached_refs += refill;
	}
	io_submit_state_start(&ctx->submit_state, nr);

	while (submitted < nr) {
@@ -6742,12 +6752,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)

	if (unlikely(submitted != nr)) {
		int ref_used = (submitted == -EAGAIN) ? 0 : submitted;
		struct io_uring_task *tctx = current->io_uring;
		int unused = nr - ref_used;

		current->io_uring->cached_refs += unused;
		percpu_ref_put_many(&ctx->refs, unused);
		percpu_counter_sub(&tctx->inflight, unused);
		put_task_struct_many(current, unused);
	}

	io_submit_state_end(&ctx->submit_state, ctx);
@@ -7929,7 +7937,7 @@ static int io_uring_alloc_task_context(struct task_struct *task,
	struct io_uring_task *tctx;
	int ret;

	tctx = kmalloc(sizeof(*tctx), GFP_KERNEL);
	tctx = kzalloc(sizeof(*tctx), GFP_KERNEL);
	if (unlikely(!tctx))
		return -ENOMEM;

@@ -7949,13 +7957,11 @@ static int io_uring_alloc_task_context(struct task_struct *task,

	xa_init(&tctx->xa);
	init_waitqueue_head(&tctx->wait);
	tctx->last = NULL;
	atomic_set(&tctx->in_idle, 0);
	atomic_set(&tctx->inflight_tracked, 0);
	task->io_uring = tctx;
	spin_lock_init(&tctx->task_lock);
	INIT_WQ_LIST(&tctx->task_list);
	tctx->task_state = 0;
	init_task_work(&tctx->task_work, tctx_task_work);
	return 0;
}
@@ -7966,6 +7972,7 @@ void __io_uring_free(struct task_struct *tsk)

	WARN_ON_ONCE(!xa_empty(&tctx->xa));
	WARN_ON_ONCE(tctx->io_wq);
	WARN_ON_ONCE(tctx->cached_refs);

	percpu_counter_destroy(&tctx->inflight);
	kfree(tctx);
@@ -9110,6 +9117,16 @@ static void io_uring_try_cancel(bool cancel_all)
	}
}

static void io_uring_drop_tctx_refs(struct task_struct *task)
{
	struct io_uring_task *tctx = task->io_uring;
	unsigned int refs = tctx->cached_refs;

	tctx->cached_refs = 0;
	percpu_counter_sub(&tctx->inflight, refs);
	put_task_struct_many(task, refs);
}

/* should only be called by SQPOLL task */
static void io_uring_cancel_sqpoll(struct io_sq_data *sqd)
{
@@ -9125,6 +9142,7 @@ static void io_uring_cancel_sqpoll(struct io_sq_data *sqd)

	WARN_ON_ONCE(!sqd || sqd->thread != current);

	io_uring_drop_tctx_refs(current);
	atomic_inc(&tctx->in_idle);
	do {
		/* read completions before cancelations */
@@ -9162,6 +9180,7 @@ void __io_uring_cancel(struct files_struct *files)
		io_wq_exit_start(tctx->io_wq);

	/* make sure overflow events are dropped */
	io_uring_drop_tctx_refs(current);
	atomic_inc(&tctx->in_idle);
	do {
		/* read completions before cancelations */