Commit 50be9417 authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'io_uring-5.14-2021-07-09' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "A few fixes that should go into this merge.

  One fixes a regression introduced in this release, others are just
  generic fixes, mostly related to handling fallback task_work"

* tag 'io_uring-5.14-2021-07-09' of git://git.kernel.dk/linux-block:
  io_uring: remove dead non-zero 'poll' check
  io_uring: mitigate unlikely iopoll lag
  io_uring: fix drain alloc fail return code
  io_uring: fix exiting io_req_task_work_add leaks
  io_uring: simplify task_work func
  io_uring: fix stuck fallback reqs
parents a022f7d5 9ce85ef2
Loading
Loading
Loading
Loading
+68 −123
Original line number Diff line number Diff line
@@ -465,7 +465,8 @@ struct io_ring_ctx {
		struct mm_struct		*mm_account;

		/* ctx exit and cancelation */
		struct callback_head		*exit_task_work;
		struct llist_head		fallback_llist;
		struct delayed_work		fallback_work;
		struct work_struct		exit_work;
		struct list_head		tctx_list;
		struct completion		ref_comp;
@@ -784,9 +785,14 @@ struct async_poll {
	struct io_poll_iocb	*double_poll;
};

typedef void (*io_req_tw_func_t)(struct io_kiocb *req);

struct io_task_work {
	union {
		struct io_wq_work_node	node;
	task_work_func_t	func;
		struct llist_node	fallback_node;
	};
	io_req_tw_func_t		func;
};

enum {
@@ -849,10 +855,7 @@ struct io_kiocb {

	/* used with ctx->iopoll_list with reads/writes */
	struct list_head		inflight_entry;
	union {
	struct io_task_work		io_task_work;
		struct callback_head	task_work;
	};
	/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
	struct hlist_node		hash_node;
	struct async_poll		*apoll;
@@ -1071,6 +1074,8 @@ static void io_submit_flush_completions(struct io_ring_ctx *ctx);
static bool io_poll_remove_waitqs(struct io_kiocb *req);
static int io_req_prep_async(struct io_kiocb *req);

static void io_fallback_req_func(struct work_struct *unused);

static struct kmem_cache *req_cachep;

static const struct file_operations io_uring_fops;
@@ -1202,6 +1207,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
	INIT_LIST_HEAD(&ctx->tctx_list);
	INIT_LIST_HEAD(&ctx->submit_state.comp.free_list);
	INIT_LIST_HEAD(&ctx->locked_free_list);
	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
	return ctx;
err:
	kfree(ctx->dummy_ubuf);
@@ -1929,7 +1935,7 @@ static void tctx_task_work(struct callback_head *cb)
				ctx = req->ctx;
				percpu_ref_get(&ctx->refs);
			}
			req->task_work.func(&req->task_work);
			req->io_task_work.func(req);
			node = next;
		}
		if (wq_list_empty(&tctx->task_list)) {
@@ -1946,17 +1952,13 @@ static void tctx_task_work(struct callback_head *cb)
	ctx_flush_and_put(ctx);
}

static int io_req_task_work_add(struct io_kiocb *req)
static void io_req_task_work_add(struct io_kiocb *req)
{
	struct task_struct *tsk = req->task;
	struct io_uring_task *tctx = tsk->io_uring;
	enum task_work_notify_mode notify;
	struct io_wq_work_node *node, *prev;
	struct io_wq_work_node *node;
	unsigned long flags;
	int ret = 0;

	if (unlikely(tsk->flags & PF_EXITING))
		return -ESRCH;

	WARN_ON_ONCE(!tctx);

@@ -1967,7 +1969,9 @@ static int io_req_task_work_add(struct io_kiocb *req)
	/* task_work already pending, we're done */
	if (test_bit(0, &tctx->task_state) ||
	    test_and_set_bit(0, &tctx->task_state))
		return 0;
		return;
	if (unlikely(tsk->flags & PF_EXITING))
		goto fail;

	/*
	 * SQPOLL kernel thread doesn't need notification, just a wakeup. For
@@ -1976,72 +1980,28 @@ static int io_req_task_work_add(struct io_kiocb *req)
	 * will do the job.
	 */
	notify = (req->ctx->flags & IORING_SETUP_SQPOLL) ? TWA_NONE : TWA_SIGNAL;

	if (!task_work_add(tsk, &tctx->task_work, notify)) {
		wake_up_process(tsk);
		return 0;
		return;
	}

	/*
	 * Slow path - we failed, find and delete work. if the work is not
	 * in the list, it got run and we're fine.
	 */
fail:
	clear_bit(0, &tctx->task_state);
	spin_lock_irqsave(&tctx->task_lock, flags);
	wq_list_for_each(node, prev, &tctx->task_list) {
		if (&req->io_task_work.node == node) {
			wq_list_del(&tctx->task_list, node, prev);
			ret = 1;
			break;
		}
	}
	node = tctx->task_list.first;
	INIT_WQ_LIST(&tctx->task_list);
	spin_unlock_irqrestore(&tctx->task_lock, flags);
	clear_bit(0, &tctx->task_state);
	return ret;
}

static bool io_run_task_work_head(struct callback_head **work_head)
{
	struct callback_head *work, *next;
	bool executed = false;

	do {
		work = xchg(work_head, NULL);
		if (!work)
			break;

		do {
			next = work->next;
			work->func(work);
			work = next;
			cond_resched();
		} while (work);
		executed = true;
	} while (1);

	return executed;
}

static void io_task_work_add_head(struct callback_head **work_head,
				  struct callback_head *task_work)
{
	struct callback_head *head;

	do {
		head = READ_ONCE(*work_head);
		task_work->next = head;
	} while (cmpxchg(work_head, head, task_work) != head);
	while (node) {
		req = container_of(node, struct io_kiocb, io_task_work.node);
		node = node->next;
		if (llist_add(&req->io_task_work.fallback_node,
			      &req->ctx->fallback_llist))
			schedule_delayed_work(&req->ctx->fallback_work, 1);
	}

static void io_req_task_work_add_fallback(struct io_kiocb *req,
					  task_work_func_t cb)
{
	init_task_work(&req->task_work, cb);
	io_task_work_add_head(&req->ctx->exit_task_work, &req->task_work);
}

static void io_req_task_cancel(struct callback_head *cb)
static void io_req_task_cancel(struct io_kiocb *req)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
	struct io_ring_ctx *ctx = req->ctx;

	/* ctx is guaranteed to stay alive while we hold uring_lock */
@@ -2050,7 +2010,7 @@ static void io_req_task_cancel(struct callback_head *cb)
	mutex_unlock(&ctx->uring_lock);
}

static void __io_req_task_submit(struct io_kiocb *req)
static void io_req_task_submit(struct io_kiocb *req)
{
	struct io_ring_ctx *ctx = req->ctx;

@@ -2063,28 +2023,17 @@ static void __io_req_task_submit(struct io_kiocb *req)
	mutex_unlock(&ctx->uring_lock);
}

static void io_req_task_submit(struct callback_head *cb)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);

	__io_req_task_submit(req);
}

static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
{
	req->result = ret;
	req->task_work.func = io_req_task_cancel;

	if (unlikely(io_req_task_work_add(req)))
		io_req_task_work_add_fallback(req, io_req_task_cancel);
	req->io_task_work.func = io_req_task_cancel;
	io_req_task_work_add(req);
}

static void io_req_task_queue(struct io_kiocb *req)
{
	req->task_work.func = io_req_task_submit;

	if (unlikely(io_req_task_work_add(req)))
		io_req_task_queue_fail(req, -ECANCELED);
	req->io_task_work.func = io_req_task_submit;
	io_req_task_work_add(req);
}

static inline void io_queue_next(struct io_kiocb *req)
@@ -2195,18 +2144,10 @@ static inline void io_put_req(struct io_kiocb *req)
		io_free_req(req);
}

static void io_put_req_deferred_cb(struct callback_head *cb)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);

	io_free_req(req);
}

static void io_free_req_deferred(struct io_kiocb *req)
{
	req->task_work.func = io_put_req_deferred_cb;
	if (unlikely(io_req_task_work_add(req)))
		io_req_task_work_add_fallback(req, io_put_req_deferred_cb);
	req->io_task_work.func = io_free_req;
	io_req_task_work_add(req);
}

static inline void io_put_req_deferred(struct io_kiocb *req, int refs)
@@ -2415,11 +2356,15 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
		 * very same mutex.
		 */
		if (list_empty(&ctx->iopoll_list)) {
			u32 tail = ctx->cached_cq_tail;

			mutex_unlock(&ctx->uring_lock);
			io_run_task_work();
			mutex_lock(&ctx->uring_lock);

			if (list_empty(&ctx->iopoll_list))
			/* some requests don't go through iopoll_list */
			if (tail != ctx->cached_cq_tail ||
			    list_empty(&ctx->iopoll_list))
				break;
		}
		ret = io_do_iopoll(ctx, &nr_events, min);
@@ -2485,6 +2430,17 @@ static bool io_rw_should_reissue(struct io_kiocb *req)
}
#endif

static void io_fallback_req_func(struct work_struct *work)
{
	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
						fallback_work.work);
	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
	struct io_kiocb *req, *tmp;

	llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
		req->io_task_work.func(req);
}

static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
			     unsigned int issue_flags)
{
@@ -4850,10 +4806,8 @@ struct io_poll_table {
};

static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
			   __poll_t mask, task_work_func_t func)
			   __poll_t mask, io_req_tw_func_t func)
{
	int ret;

	/* for instances that support it check for an event match first: */
	if (mask && !(mask & poll->events))
		return 0;
@@ -4863,7 +4817,7 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
	list_del_init(&poll->wait.entry);

	req->result = mask;
	req->task_work.func = func;
	req->io_task_work.func = func;

	/*
	 * If this fails, then the task is exiting. When a task exits, the
@@ -4871,11 +4825,7 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
	 * of executing it. We can't safely execute it anyway, as we may not
	 * have the needed state needed for it anyway.
	 */
	ret = io_req_task_work_add(req);
	if (unlikely(ret)) {
		WRITE_ONCE(poll->canceled, true);
		io_req_task_work_add_fallback(req, func);
	}
	io_req_task_work_add(req);
	return 1;
}

@@ -4884,6 +4834,9 @@ static bool io_poll_rewait(struct io_kiocb *req, struct io_poll_iocb *poll)
{
	struct io_ring_ctx *ctx = req->ctx;

	if (unlikely(req->task->flags & PF_EXITING))
		WRITE_ONCE(poll->canceled, true);

	if (!req->result && !READ_ONCE(poll->canceled)) {
		struct poll_table_struct pt = { ._key = poll->events };

@@ -4960,9 +4913,8 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
	return !(flags & IORING_CQE_F_MORE);
}

static void io_poll_task_func(struct callback_head *cb)
static void io_poll_task_func(struct io_kiocb *req)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
	struct io_ring_ctx *ctx = req->ctx;
	struct io_kiocb *nxt;

@@ -4984,7 +4936,7 @@ static void io_poll_task_func(struct callback_head *cb)
		if (done) {
			nxt = io_put_req_find_next(req);
			if (nxt)
				__io_req_task_submit(nxt);
				io_req_task_submit(nxt);
		}
	}
}
@@ -5004,7 +4956,7 @@ static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode,

	list_del_init(&wait->entry);

	if (poll && poll->head) {
	if (poll->head) {
		bool done;

		spin_lock(&poll->head->lock);
@@ -5093,9 +5045,8 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
	__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
}

static void io_async_task_func(struct callback_head *cb)
static void io_async_task_func(struct io_kiocb *req)
{
	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
	struct async_poll *apoll = req->apoll;
	struct io_ring_ctx *ctx = req->ctx;

@@ -5111,7 +5062,7 @@ static void io_async_task_func(struct callback_head *cb)
	spin_unlock_irq(&ctx->completion_lock);

	if (!READ_ONCE(apoll->poll.canceled))
		__io_req_task_submit(req);
		io_req_task_submit(req);
	else
		io_req_complete_failed(req, -ECANCELED);
}
@@ -6072,7 +6023,7 @@ static bool io_drain_req(struct io_kiocb *req)
	io_prep_async_link(req);
	de = kmalloc(sizeof(*de), GFP_KERNEL);
	if (!de) {
		io_req_complete_failed(req, ret);
		io_req_complete_failed(req, -ENOMEM);
		return true;
	}

@@ -8767,11 +8718,6 @@ static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
	return -EINVAL;
}

static inline bool io_run_ctx_fallback(struct io_ring_ctx *ctx)
{
	return io_run_task_work_head(&ctx->exit_task_work);
}

struct io_tctx_exit {
	struct callback_head		task_work;
	struct completion		completion;
@@ -8837,7 +8783,7 @@ static void io_ring_exit_work(struct work_struct *work)
	/*
	 * Some may use context even when all refs and requests have been put,
	 * and they are free to do so while still holding uring_lock or
	 * completion_lock, see __io_req_task_submit(). Apart from other work,
	 * completion_lock, see io_req_task_submit(). Apart from other work,
	 * this lock/unlock section also waits them to finish.
	 */
	mutex_lock(&ctx->uring_lock);
@@ -9036,7 +8982,6 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
		ret |= io_kill_timeouts(ctx, task, cancel_all);
		if (task)
			ret |= io_run_task_work();
		ret |= io_run_ctx_fallback(ctx);
		if (!ret)
			break;
		cond_resched();