Commit 7cbf1722 authored by Jens Axboe's avatar Jens Axboe
Browse files

io_uring: provide FIFO ordering for task_work



task_work is a LIFO list, due to how it's implemented as a lockless
list. For long chains of task_work, this can be problematic as the
first entry added is the last one processed. Similarly, we'd waste
a lot of CPU cycles reversing this list.

Wrap the task_work so we have a single task_work entry per task per
ctx, and use that to run it in the right order.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 1b4c351f
Loading
Loading
Loading
Loading
+0 −9
Original line number Diff line number Diff line
@@ -27,15 +27,6 @@ enum io_wq_cancel {
	IO_WQ_CANCEL_NOTFOUND,	/* work not found */
};

struct io_wq_work_node {
	struct io_wq_work_node *next;
};

struct io_wq_work_list {
	struct io_wq_work_node *first;
	struct io_wq_work_node *last;
};

static inline void wq_list_add_after(struct io_wq_work_node *node,
				     struct io_wq_work_node *pos,
				     struct io_wq_work_list *list)
+94 −7
Original line number Diff line number Diff line
@@ -721,6 +721,11 @@ struct async_poll {
	struct io_poll_iocb	*double_poll;
};

struct io_task_work {
	struct io_wq_work_node	node;
	task_work_func_t	func;
};

/*
 * NOTE! Each of the iocb union members has the file pointer
 * as the first entry in their struct definition. So you can
@@ -779,7 +784,10 @@ struct io_kiocb {
	 * 2. to track reqs with ->files (see io_op_def::file_table)
	 */
	struct list_head		inflight_entry;
	union {
		struct io_task_work	io_task_work;
		struct callback_head	task_work;
	};
	/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
	struct hlist_node		hash_node;
	struct async_poll		*apoll;
@@ -2129,6 +2137,81 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
	return __io_req_find_next(req);
}

static bool __tctx_task_work(struct io_uring_task *tctx)
{
	struct io_wq_work_list list;
	struct io_wq_work_node *node;

	if (wq_list_empty(&tctx->task_list))
		return false;

	spin_lock(&tctx->task_lock);
	list = tctx->task_list;
	INIT_WQ_LIST(&tctx->task_list);
	spin_unlock(&tctx->task_lock);

	node = list.first;
	while (node) {
		struct io_wq_work_node *next = node->next;
		struct io_kiocb *req;

		req = container_of(node, struct io_kiocb, io_task_work.node);
		req->task_work.func(&req->task_work);
		node = next;
	}

	return list.first != NULL;
}

static void tctx_task_work(struct callback_head *cb)
{
	struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work);

	while (__tctx_task_work(tctx))
		cond_resched();

	clear_bit(0, &tctx->task_state);
}

static int io_task_work_add(struct task_struct *tsk, struct io_kiocb *req,
			    enum task_work_notify_mode notify)
{
	struct io_uring_task *tctx = tsk->io_uring;
	struct io_wq_work_node *node, *prev;
	int ret;

	WARN_ON_ONCE(!tctx);

	spin_lock(&tctx->task_lock);
	wq_list_add_tail(&req->io_task_work.node, &tctx->task_list);
	spin_unlock(&tctx->task_lock);

	/* task_work already pending, we're done */
	if (test_bit(0, &tctx->task_state) ||
	    test_and_set_bit(0, &tctx->task_state))
		return 0;

	if (!task_work_add(tsk, &tctx->task_work, notify))
		return 0;

	/*
	 * Slow path - we failed, find and delete work. if the work is not
	 * in the list, it got run and we're fine.
	 */
	ret = 0;
	spin_lock(&tctx->task_lock);
	wq_list_for_each(node, prev, &tctx->task_list) {
		if (&req->io_task_work.node == node) {
			wq_list_del(&tctx->task_list, node, prev);
			ret = 1;
			break;
		}
	}
	spin_unlock(&tctx->task_lock);
	clear_bit(0, &tctx->task_state);
	return ret;
}

static int io_req_task_work_add(struct io_kiocb *req)
{
	struct task_struct *tsk = req->task;
@@ -2149,7 +2232,7 @@ static int io_req_task_work_add(struct io_kiocb *req)
	if (!(ctx->flags & IORING_SETUP_SQPOLL))
		notify = TWA_SIGNAL;

	ret = task_work_add(tsk, &req->task_work, notify);
	ret = io_task_work_add(tsk, req, notify);
	if (!ret)
		wake_up_process(tsk);

@@ -2157,7 +2240,7 @@ static int io_req_task_work_add(struct io_kiocb *req)
}

static void io_req_task_work_add_fallback(struct io_kiocb *req,
					  void (*cb)(struct callback_head *))
					  task_work_func_t cb)
{
	struct task_struct *tsk = io_wq_get_task(req->ctx->io_wq);

@@ -2216,7 +2299,7 @@ static void io_req_task_queue(struct io_kiocb *req)
{
	int ret;

	init_task_work(&req->task_work, io_req_task_submit);
	req->task_work.func = io_req_task_submit;
	percpu_ref_get(&req->ctx->refs);

	ret = io_req_task_work_add(req);
@@ -2347,7 +2430,7 @@ static void io_free_req_deferred(struct io_kiocb *req)
{
	int ret;

	init_task_work(&req->task_work, io_put_req_deferred_cb);
	req->task_work.func = io_put_req_deferred_cb;
	ret = io_req_task_work_add(req);
	if (unlikely(ret))
		io_req_task_work_add_fallback(req, io_put_req_deferred_cb);
@@ -3392,7 +3475,7 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
	req->rw.kiocb.ki_flags &= ~IOCB_WAITQ;
	list_del_init(&wait->entry);

	init_task_work(&req->task_work, io_req_task_submit);
	req->task_work.func = io_req_task_submit;
	percpu_ref_get(&req->ctx->refs);

	/* submit ref gets dropped, acquire a new one */
@@ -5083,7 +5166,7 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
	list_del_init(&poll->wait.entry);

	req->result = mask;
	init_task_work(&req->task_work, func);
	req->task_work.func = func;
	percpu_ref_get(&req->ctx->refs);

	/*
@@ -8086,6 +8169,10 @@ static int io_uring_alloc_task_context(struct task_struct *task)
	io_init_identity(&tctx->__identity);
	tctx->identity = &tctx->__identity;
	task->io_uring = tctx;
	spin_lock_init(&tctx->task_lock);
	INIT_WQ_LIST(&tctx->task_list);
	tctx->task_state = 0;
	init_task_work(&tctx->task_work, tctx_task_work);
	return 0;
}

+14 −0
Original line number Diff line number Diff line
@@ -22,6 +22,15 @@ struct io_identity {
	refcount_t			count;
};

struct io_wq_work_node {
	struct io_wq_work_node *next;
};

struct io_wq_work_list {
	struct io_wq_work_node *first;
	struct io_wq_work_node *last;
};

struct io_uring_task {
	/* submission side */
	struct xarray		xa;
@@ -32,6 +41,11 @@ struct io_uring_task {
	struct io_identity	*identity;
	atomic_t		in_idle;
	bool			sqpoll;

	spinlock_t		task_lock;
	struct io_wq_work_list	task_list;
	unsigned long		task_state;
	struct callback_head	task_work;
};

#if defined(CONFIG_IO_URING)