Commit 60f8fbaa authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'for-5.15/io_uring-2021-09-04' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "As sometimes happens, two reports came in around the merge window open
  that led to some fixes. Hence this one is a bit bigger than usual
  followup fixes, but most of it will be going towards stable, outside
  of the fixes that are addressing regressions from this merge window.

  In detail:

   - postgres is a heavy user of signals between tasks, and if we're
     unlucky this can interfere with io-wq worker creation. Make sure
     we're resilient against unrelated signal handling. This set of
     changes also includes hardening against allocation failures, which
     could previously had led to stalls.

   - Some use cases that end up having a mix of bounded and unbounded
     work would have starvation issues related to that. Split the
     pending work lists to handle that better.

   - Completion trace int -> unsigned -> long fix

   - Fix issue with REGISTER_IOWQ_MAX_WORKERS and SQPOLL

   - Fix regression with hash wait lock in this merge window

   - Fix retry issued on block devices (Ming)

   - Fix regression with links in this merge window (Pavel)

   - Fix race with multi-shot poll and completions (Xiaoguang)

   - Ensure regular file IO doesn't inadvertently skip completion
     batching (Pavel)

   - Ensure submissions are flushed after running task_work (Pavel)"

* tag 'for-5.15/io_uring-2021-09-04' of git://git.kernel.dk/linux-block:
  io_uring: io_uring_complete() trace should take an integer
  io_uring: fix possible poll event lost in multi shot mode
  io_uring: prolong tctx_task_work() with flushing
  io_uring: don't disable kiocb_done() CQE batching
  io_uring: ensure IORING_REGISTER_IOWQ_MAX_WORKERS works with SQPOLL
  io-wq: make worker creation resilient against signals
  io-wq: get rid of FIXED worker flag
  io-wq: only exit on fatal signals
  io-wq: split bounded and unbounded work into separate lists
  io-wq: fix queue stalling race
  io_uring: don't submit half-prepared drain request
  io_uring: fix queueing half-created requests
  io-wq: ensure that hash wait lock is IRQ disabling
  io_uring: retry in case of short read on block device
  io_uring: IORING_OP_WRITE needs hash_reg_file set
  io-wq: fix race between adding work and activating a free worker
parents 20fbb11f 2fc2a7a6
Loading
Loading
Loading
Loading
+254 −170
Original line number Diff line number Diff line
@@ -23,8 +23,7 @@ enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
	IO_WORKER_F_FIXED	= 8,	/* static idle worker */
	IO_WORKER_F_BOUND	= 16,	/* is doing bounded work */
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
};

enum {
@@ -32,7 +31,7 @@ enum {
};

enum {
	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
};

/*
@@ -55,7 +54,10 @@ struct io_worker {
	struct callback_head create_work;
	int create_index;

	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
};

#if BITS_PER_LONG == 64
@@ -71,25 +73,24 @@ struct io_wqe_acct {
	unsigned max_workers;
	int index;
	atomic_t nr_running;
	struct io_wq_work_list work_list;
	unsigned long flags;
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
	IO_WQ_ACCT_NR,
};

/*
 * Per-node worker thread pool
 */
struct io_wqe {
	struct {
	raw_spinlock_t lock;
		struct io_wq_work_list work_list;
		unsigned flags;
	} ____cacheline_aligned_in_smp;
	struct io_wqe_acct acct[2];

	int node;
	struct io_wqe_acct acct[2];

	struct hlist_nulls_head free_list;
	struct list_head all_list;
@@ -133,8 +134,11 @@ struct io_cb_cancel_data {
	bool cancel_all;
};

static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
static void io_wqe_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);

static bool io_worker_get(struct io_worker *worker)
{
@@ -195,11 +199,10 @@ static void io_worker_exit(struct io_worker *worker)
	do_exit(0);
}

static inline bool io_wqe_run_queue(struct io_wqe *wqe)
	__must_hold(wqe->lock)
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
{
	if (!wq_list_empty(&wqe->work_list) &&
	    !(wqe->flags & IO_WQE_FLAG_STALLED))
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
		return true;
	return false;
}
@@ -208,7 +211,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
 * Check head of free list for an available worker. If one isn't available,
 * caller must create one.
 */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
@@ -222,6 +226,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
@@ -236,9 +244,9 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 * We need a worker. If we find a free one, we're good. If not, and we're
 * below the max number of workers, create one.
 */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	bool ret;
	bool do_create = false;

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
@@ -247,17 +255,8 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");

	rcu_read_lock();
	ret = io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

	if (!ret) {
		bool do_create = false, first = false;

	raw_spin_lock(&wqe->lock);
	if (acct->nr_workers < acct->max_workers) {
			if (!acct->nr_workers)
				first = true;
		acct->nr_workers++;
		do_create = true;
	}
@@ -265,9 +264,10 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
	if (do_create) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
			create_io_worker(wqe->wq, wqe, acct->index, first);
		}
		return create_io_worker(wqe->wq, wqe, acct->index);
	}

	return true;
}

static void io_wqe_inc_running(struct io_worker *worker)
@@ -283,7 +283,7 @@ static void create_worker_cb(struct callback_head *cb)
	struct io_wq *wq;
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
	bool do_create = false, first = false;
	bool do_create = false;

	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
@@ -291,14 +291,12 @@ static void create_worker_cb(struct callback_head *cb)
	acct = &wqe->acct[worker->create_index];
	raw_spin_lock(&wqe->lock);
	if (acct->nr_workers < acct->max_workers) {
		if (!acct->nr_workers)
			first = true;
		acct->nr_workers++;
		do_create = true;
	}
	raw_spin_unlock(&wqe->lock);
	if (do_create) {
		create_io_worker(wq, wqe, worker->create_index, first);
		create_io_worker(wq, wqe, worker->create_index);
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
@@ -307,9 +305,11 @@ static void create_worker_cb(struct callback_head *cb)
	io_worker_release(worker);
}

static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
				   struct io_wqe_acct *acct)
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
{
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
@@ -327,16 +327,17 @@ static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;

	init_task_work(&worker->create_work, create_worker_cb);
	init_task_work(&worker->create_work, func);
	worker->create_index = acct->index;
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
		return;
		return true;
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
	return false;
}

static void io_wqe_dec_running(struct io_worker *worker)
@@ -348,10 +349,10 @@ static void io_wqe_dec_running(struct io_worker *worker)
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		io_queue_worker_create(wqe, worker, acct);
		io_queue_worker_create(worker, acct, create_worker_cb);
	}
}

@@ -363,29 +364,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	bool worker_bound, work_bound;

	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);

	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}

	/*
	 * If worker is moving from bound to unbound (or vice versa), then
	 * ensure we update the running accounting.
	 */
	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
	if (worker_bound != work_bound) {
		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
		io_wqe_dec_running(worker);
		worker->flags ^= IO_WORKER_F_BOUND;
		wqe->acct[index].nr_workers--;
		wqe->acct[index ^ 1].nr_workers++;
		io_wqe_inc_running(worker);
	 }
}

/*
@@ -413,7 +395,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

	spin_lock(&wq->hash->wait.lock);
	spin_lock_irq(&wq->hash->wait.lock);
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
@@ -421,48 +403,26 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
			list_del_init(&wqe->wait.entry);
		}
	}
	spin_unlock(&wq->hash->wait.lock);
}

/*
 * We can always run the work if the worker is currently the same type as
 * the work (eg both are bound, or both are unbound). If they are not the
 * same, only allow it if incrementing the worker count would be allowed.
 */
static bool io_worker_can_run_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
	struct io_wqe_acct *acct;

	if (!(worker->flags & IO_WORKER_F_BOUND) !=
	    !(work->flags & IO_WQ_WORK_UNBOUND))
		return true;

	/* not the same type, check if we'd go over the limit */
	acct = io_work_get_acct(worker->wqe, work);
	return acct->nr_workers < acct->max_workers;
	spin_unlock_irq(&wq->hash->wait.lock);
}

static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
					   struct io_worker *worker,
					   bool *stalled)
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
					   struct io_worker *worker)
	__must_hold(wqe->lock)
{
	struct io_wq_work_node *node, *prev;
	struct io_wq_work *work, *tail;
	unsigned int stall_hash = -1U;
	struct io_wqe *wqe = worker->wqe;

	wq_list_for_each(node, prev, &wqe->work_list) {
	wq_list_for_each(node, prev, &acct->work_list) {
		unsigned int hash;

		work = container_of(node, struct io_wq_work, list);

		if (!io_worker_can_run_work(worker, work))
			break;

		/* not hashed, can run anytime */
		if (!io_wq_is_hashed(work)) {
			wq_list_del(&wqe->work_list, node, prev);
			wq_list_del(&acct->work_list, node, prev);
			return work;
		}

@@ -473,7 +433,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
			wqe->hash_tail[hash] = NULL;
			wq_list_cut(&wqe->work_list, &tail->list, prev);
			wq_list_cut(&acct->work_list, &tail->list, prev);
			return work;
		}
		if (stall_hash == -1U)
@@ -483,10 +443,14 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
	}

	if (stall_hash != -1U) {
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
		*stalled = true;
	}

	return NULL;
@@ -520,13 +484,13 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);

	do {
		struct io_wq_work *work;
		bool stalled;
get_next:
		/*
		 * If we got some work, mark us as busy. If we didn't, but
@@ -535,12 +499,9 @@ static void io_worker_handle_work(struct io_worker *worker)
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
		stalled = false;
		work = io_get_next_work(wqe, worker, &stalled);
		work = io_get_next_work(acct, worker);
		if (work)
			__io_worker_busy(wqe, worker, work);
		else if (stalled)
			wqe->flags |= IO_WQE_FLAG_STALLED;

		raw_spin_unlock(&wqe->lock);
		if (!work)
@@ -572,10 +533,10 @@ static void io_worker_handle_work(struct io_worker *worker)

			if (hash != -1U && !next_hashed) {
				clear_bit(hash, &wq->hash->map);
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
				raw_spin_lock(&wqe->lock);
				wqe->flags &= ~IO_WQE_FLAG_STALLED;
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
@@ -590,8 +551,10 @@ static void io_worker_handle_work(struct io_worker *worker)
static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
	bool last_timeout = false;
	char buf[TASK_COMM_LEN];

	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
@@ -605,10 +568,17 @@ static int io_wqe_worker(void *data)
		set_current_state(TASK_INTERRUPTIBLE);
loop:
		raw_spin_lock(&wqe->lock);
		if (io_wqe_run_queue(wqe)) {
		if (io_acct_run_queue(acct)) {
			io_worker_handle_work(worker);
			goto loop;
		}
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
		__io_worker_idle(wqe, worker);
		raw_spin_unlock(&wqe->lock);
		if (io_flush_signals())
@@ -619,13 +589,11 @@ static int io_wqe_worker(void *data)

			if (!get_signal(&ksig))
				continue;
			if (fatal_signal_pending(current))
				break;
		}
		if (ret)
			continue;
		/* timed out, exit unless we're the fixed worker */
		if (!(worker->flags & IO_WORKER_F_FIXED))
			break;
		}
		last_timeout = !ret;
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -676,7 +644,91 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
	raw_spin_unlock(&worker->wqe->lock);
}

static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

	if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
	}
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
	struct io_wqe_acct *acct = &wqe->acct[index];
	struct io_worker *worker;
@@ -685,42 +737,35 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
	__set_current_state(TASK_RUNNING);

	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	if (!worker)
		goto fail;

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (IS_ERR(tsk)) {
		kfree(worker);
	if (!worker) {
fail:
		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wq);
		return;
		return false;
	}

	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
	if (first && (worker->flags & IO_WORKER_F_BOUND))
		worker->flags |= IO_WORKER_F_FIXED;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
}

/*
@@ -755,11 +800,6 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
	return false;
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
{
	struct io_wq *wq = wqe->wq;
@@ -773,12 +813,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)

static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
		wq_list_add_tail(&work->list, &wqe->work_list);
		wq_list_add_tail(&work->list, &acct->work_list);
		return;
	}

@@ -788,13 +829,14 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
	if (!tail)
		goto append;

	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
}

static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
	bool do_wake;
	unsigned work_flags = work->flags;
	bool do_create;

	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
@@ -802,19 +844,36 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
run_cancel:
		io_run_cancel(work, wqe);
		return;
	}

	raw_spin_lock(&wqe->lock);
	io_wqe_insert_work(wqe, work);
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
	do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) ||
			!atomic_read(&acct->nr_running);
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);

	rcu_read_lock();
	do_create = !io_wqe_activate_free_worker(wqe, acct);
	rcu_read_unlock();

	raw_spin_unlock(&wqe->lock);

	if (do_wake)
		io_wqe_wake_worker(wqe, acct);
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
		if (unlikely(!did_create)) {
			raw_spin_lock(&wqe->lock);
			/* fatal condition, failed to create the first worker */
			if (!acct->nr_workers) {
				raw_spin_unlock(&wqe->lock);
				goto run_cancel;
			}
			raw_spin_unlock(&wqe->lock);
		}
	}
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
@@ -859,6 +918,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

@@ -870,18 +930,18 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
		else
			wqe->hash_tail[hash] = NULL;
	}
	wq_list_del(&wqe->work_list, &work->list, prev);
	wq_list_del(&acct->work_list, &work->list, prev);
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
{
	struct io_wq_work_node *node, *prev;
	struct io_wq_work *work;

retry:
	raw_spin_lock(&wqe->lock);
	wq_list_for_each(node, prev, &wqe->work_list) {
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
@@ -889,11 +949,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		if (!match->cancel_all)
			return;

		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
retry:
	raw_spin_lock(&wqe->lock);
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);

		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
		}
	}
	raw_spin_unlock(&wqe->lock);
}
@@ -954,18 +1030,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
	int i;

	list_del_init(&wait->entry);

	rcu_read_lock();
	io_wqe_activate_free_worker(wqe);
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
	rcu_read_unlock();
	return 1;
}

struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
	int ret, node;
	int ret, node, i;
	struct io_wq *wq;

	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
@@ -1000,18 +1082,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
		wq->wqes[node] = wqe;
		wqe->node = alloc_node;
		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
					task_rlimit(current, RLIMIT_NPROC);
		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
		wqe->wait.func = io_wqe_hash_wake;
		INIT_LIST_HEAD(&wqe->wait.entry);
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
		wqe->wq = wq;
		raw_spin_lock_init(&wqe->lock);
		INIT_WQ_LIST(&wqe->work_list);
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
		INIT_LIST_HEAD(&wqe->all_list);
	}
@@ -1038,7 +1122,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data)
{
	struct io_worker *worker;

	if (cb->func != create_worker_cb)
	if (cb->func != create_worker_cb || cb->func != create_worker_cont)
		return false;
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
@@ -1193,7 +1277,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
	for_each_node(node) {
		struct io_wqe_acct *acct;

		for (i = 0; i < 2; i++) {
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			acct = &wq->wqes[node]->acct[i];
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
+66 −10
Original line number Diff line number Diff line
@@ -1021,6 +1021,7 @@ static const struct io_op_def io_op_defs[] = {
	},
	[IORING_OP_WRITE] = {
		.needs_file		= 1,
		.hash_reg_file		= 1,
		.unbound_nonreg_file	= 1,
		.pollout		= 1,
		.plug			= 1,
@@ -1851,6 +1852,17 @@ static void io_req_complete_failed(struct io_kiocb *req, long res)
	io_req_complete_post(req, res, 0);
}

static void io_req_complete_fail_submit(struct io_kiocb *req)
{
	/*
	 * We don't submit, fail them all, for that replace hardlinks with
	 * normal links. Extra REQ_F_LINK is tolerated.
	 */
	req->flags &= ~REQ_F_HARDLINK;
	req->flags |= REQ_F_LINK;
	io_req_complete_failed(req, req->result);
}

/*
 * Don't initialise the fields below on every allocation, but do that in
 * advance and keep them valid across allocations.
@@ -2119,6 +2131,9 @@ static void tctx_task_work(struct callback_head *cb)
	while (1) {
		struct io_wq_work_node *node;

		if (!tctx->task_list.first && locked && ctx->submit_state.compl_nr)
			io_submit_flush_completions(ctx);

		spin_lock_irq(&tctx->task_lock);
		node = tctx->task_list.first;
		INIT_WQ_LIST(&tctx->task_list);
@@ -2673,7 +2688,7 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
{
	if (__io_complete_rw_common(req, res))
		return;
	__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
	__io_req_complete(req, issue_flags, req->result, io_put_rw_kbuf(req));
}

static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
@@ -3410,6 +3425,12 @@ static inline int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
		return -EINVAL;
}

static bool need_read_all(struct io_kiocb *req)
{
	return req->flags & REQ_F_ISREG ||
		S_ISBLK(file_inode(req->file)->i_mode);
}

static int io_read(struct io_kiocb *req, unsigned int issue_flags)
{
	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
@@ -3464,7 +3485,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
	} else if (ret == -EIOCBQUEUED) {
		goto out_free;
	} else if (ret <= 0 || ret == io_size || !force_nonblock ||
		   (req->flags & REQ_F_NOWAIT) || !(req->flags & REQ_F_ISREG)) {
		   (req->flags & REQ_F_NOWAIT) || !need_read_all(req)) {
		/* read all, failed, already did sync or don't want to retry */
		goto done;
	}
@@ -5249,7 +5270,7 @@ static void io_poll_remove_double(struct io_kiocb *req)
	}
}

static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
static bool __io_poll_complete(struct io_kiocb *req, __poll_t mask)
	__must_hold(&req->ctx->completion_lock)
{
	struct io_ring_ctx *ctx = req->ctx;
@@ -5271,10 +5292,19 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
	if (flags & IORING_CQE_F_MORE)
		ctx->cq_extra++;

	io_commit_cqring(ctx);
	return !(flags & IORING_CQE_F_MORE);
}

static inline bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
	__must_hold(&req->ctx->completion_lock)
{
	bool done;

	done = __io_poll_complete(req, mask);
	io_commit_cqring(req->ctx);
	return done;
}

static void io_poll_task_func(struct io_kiocb *req, bool *locked)
{
	struct io_ring_ctx *ctx = req->ctx;
@@ -5285,7 +5315,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
	} else {
		bool done;

		done = io_poll_complete(req, req->result);
		done = __io_poll_complete(req, req->result);
		if (done) {
			io_poll_remove_double(req);
			hash_del(&req->hash_node);
@@ -5293,6 +5323,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
			req->result = 0;
			add_wait_queue(req->poll.head, &req->poll.wait);
		}
		io_commit_cqring(ctx);
		spin_unlock(&ctx->completion_lock);
		io_cqring_ev_posted(ctx);

@@ -6398,6 +6429,11 @@ static bool io_drain_req(struct io_kiocb *req)
	int ret;
	u32 seq;

	if (req->flags & REQ_F_FAIL) {
		io_req_complete_fail_submit(req);
		return true;
	}

	/*
	 * If we need to drain a request in the middle of a link, drain the
	 * head request and the next request/link after the current link.
@@ -6914,7 +6950,7 @@ static inline void io_queue_sqe(struct io_kiocb *req)
	if (likely(!(req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL)))) {
		__io_queue_sqe(req);
	} else if (req->flags & REQ_F_FAIL) {
		io_req_complete_failed(req, req->result);
		io_req_complete_fail_submit(req);
	} else {
		int ret = io_req_prep_async(req);

@@ -10498,26 +10534,46 @@ static int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
static int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
					void __user *arg)
{
	struct io_uring_task *tctx = current->io_uring;
	struct io_uring_task *tctx = NULL;
	struct io_sq_data *sqd = NULL;
	__u32 new_count[2];
	int i, ret;

	if (!tctx || !tctx->io_wq)
		return -EINVAL;
	if (copy_from_user(new_count, arg, sizeof(new_count)))
		return -EFAULT;
	for (i = 0; i < ARRAY_SIZE(new_count); i++)
		if (new_count[i] > INT_MAX)
			return -EINVAL;

	if (ctx->flags & IORING_SETUP_SQPOLL) {
		sqd = ctx->sq_data;
		if (sqd) {
			mutex_lock(&sqd->lock);
			tctx = sqd->thread->io_uring;
		}
	} else {
		tctx = current->io_uring;
	}

	ret = -EINVAL;
	if (!tctx || !tctx->io_wq)
		goto err;

	ret = io_wq_max_workers(tctx->io_wq, new_count);
	if (ret)
		return ret;
		goto err;

	if (sqd)
		mutex_unlock(&sqd->lock);

	if (copy_to_user(arg, new_count, sizeof(new_count)))
		return -EFAULT;

	return 0;
err:
	if (sqd)
		mutex_unlock(&sqd->lock);
	return ret;
}

static bool io_register_op_must_quiesce(int op)
+3 −3

File changed.

Preview size limit exceeded, changes collapsed.