Commit c547d89a authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'for-5.15/io_uring-2021-08-30' of git://git.kernel.dk/linux-block

Pull io_uring updates from Jens Axboe:

 - cancellation cleanups (Hao, Pavel)

 - io-wq accounting cleanup (Hao)

 - io_uring submit locking fix (Hao)

 - io_uring link handling fixes (Hao)

 - fixed file improvements (wangyangbo, Pavel)

 - allow updates of linked timeouts like regular timeouts (Pavel)

 - IOPOLL fix (Pavel)

 - remove batched file get optimization (Pavel)

 - improve reference handling (Pavel)

 - IRQ task_work batching (Pavel)

 - allow pure fixed file, and add support for open/accept (Pavel)

 - GFP_ATOMIC RT kernel fix

 - multiple CQ ring waiter improvement

 - funnel IRQ completions through task_work

 - add support for limiting async workers explicitly

 - add different clocksource support for timeouts

 - io-wq wakeup race fix

 - lots of cleanups and improvement (Pavel et al)

* tag 'for-5.15/io_uring-2021-08-30' of git://git.kernel.dk/linux-block: (87 commits)
  io-wq: fix wakeup race when adding new work
  io-wq: wqe and worker locks no longer need to be IRQ safe
  io-wq: check max_worker limits if a worker transitions bound state
  io_uring: allow updating linked timeouts
  io_uring: keep ltimeouts in a list
  io_uring: support CLOCK_BOOTTIME/REALTIME for timeouts
  io-wq: provide a way to limit max number of workers
  io_uring: add build check for buf_index overflows
  io_uring: clarify io_req_task_cancel() locking
  io_uring: add task-refs-get helper
  io_uring: fix failed linkchain code logic
  io_uring: remove redundant req_set_fail()
  io_uring: don't free request to slab
  io_uring: accept directly into fixed file table
  io_uring: hand code io_accept() fd installing
  io_uring: openat directly into fixed fd table
  net: add accept helper not installing fd
  io_uring: fix io_try_cancel_userdata race for iowq
  io_uring: IRQ rw completion batching
  io_uring: batch task work locking
  ...
parents 44d7d3b0 87df7fb9
Loading
Loading
Loading
Loading
+135 −73
Original line number Diff line number Diff line
@@ -51,6 +51,10 @@ struct io_worker {

	struct completion ref_done;

	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

	struct rcu_head rcu;
};

@@ -174,7 +178,7 @@ static void io_worker_exit(struct io_worker *worker)
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);

	raw_spin_lock_irq(&wqe->lock);
	raw_spin_lock(&wqe->lock);
	if (worker->flags & IO_WORKER_F_FREE)
		hlist_nulls_del_rcu(&worker->nulls_node);
	list_del_rcu(&worker->all_list);
@@ -184,7 +188,7 @@ static void io_worker_exit(struct io_worker *worker)
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
	raw_spin_unlock_irq(&wqe->lock);
	raw_spin_unlock(&wqe->lock);

	kfree_rcu(worker, rcu);
	io_worker_ref_put(wqe->wq);
@@ -250,20 +254,21 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
	if (!ret) {
		bool do_create = false, first = false;

		raw_spin_lock_irq(&wqe->lock);
		raw_spin_lock(&wqe->lock);
		if (acct->nr_workers < acct->max_workers) {
			atomic_inc(&acct->nr_running);
			atomic_inc(&wqe->wq->worker_refs);
			if (!acct->nr_workers)
				first = true;
			acct->nr_workers++;
			do_create = true;
		}
		raw_spin_unlock_irq(&wqe->lock);
		if (do_create)
		raw_spin_unlock(&wqe->lock);
		if (do_create) {
			atomic_inc(&acct->nr_running);
			atomic_inc(&wqe->wq->worker_refs);
			create_io_worker(wqe->wq, wqe, acct->index, first);
		}
	}
}

static void io_wqe_inc_running(struct io_worker *worker)
{
@@ -272,60 +277,63 @@ static void io_wqe_inc_running(struct io_worker *worker)
	atomic_inc(&acct->nr_running);
}

struct create_worker_data {
	struct callback_head work;
	struct io_wqe *wqe;
	int index;
};

static void create_worker_cb(struct callback_head *cb)
{
	struct create_worker_data *cwd;
	struct io_worker *worker;
	struct io_wq *wq;
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
	bool do_create = false, first = false;

	cwd = container_of(cb, struct create_worker_data, work);
	wqe = cwd->wqe;
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
	wq = wqe->wq;
	acct = &wqe->acct[cwd->index];
	raw_spin_lock_irq(&wqe->lock);
	acct = &wqe->acct[worker->create_index];
	raw_spin_lock(&wqe->lock);
	if (acct->nr_workers < acct->max_workers) {
		if (!acct->nr_workers)
			first = true;
		acct->nr_workers++;
		do_create = true;
	}
	raw_spin_unlock_irq(&wqe->lock);
	raw_spin_unlock(&wqe->lock);
	if (do_create) {
		create_io_worker(wq, wqe, cwd->index, first);
		create_io_worker(wq, wqe, worker->create_index, first);
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
	kfree(cwd);
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
}

static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
				   struct io_wqe_acct *acct)
{
	struct create_worker_data *cwd;
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;

	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
	if (cwd) {
		init_task_work(&cwd->work, create_worker_cb);
		cwd->wqe = wqe;
		cwd->index = acct->index;
		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
	init_task_work(&worker->create_work, create_worker_cb);
	worker->create_index = acct->index;
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
		return;

		kfree(cwd);
	}
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
@@ -343,7 +351,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		io_queue_worker_create(wqe, acct);
		io_queue_worker_create(wqe, worker, acct);
	}
}

@@ -416,7 +424,28 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
	spin_unlock(&wq->hash->wait.lock);
}

static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
/*
 * We can always run the work if the worker is currently the same type as
 * the work (eg both are bound, or both are unbound). If they are not the
 * same, only allow it if incrementing the worker count would be allowed.
 */
static bool io_worker_can_run_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
	struct io_wqe_acct *acct;

	if (!(worker->flags & IO_WORKER_F_BOUND) !=
	    !(work->flags & IO_WQ_WORK_UNBOUND))
		return true;

	/* not the same type, check if we'd go over the limit */
	acct = io_work_get_acct(worker->wqe, work);
	return acct->nr_workers < acct->max_workers;
}

static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
					   struct io_worker *worker,
					   bool *stalled)
	__must_hold(wqe->lock)
{
	struct io_wq_work_node *node, *prev;
@@ -428,6 +457,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)

		work = container_of(node, struct io_wq_work, list);

		if (!io_worker_can_run_work(worker, work))
			break;

		/* not hashed, can run anytime */
		if (!io_wq_is_hashed(work)) {
			wq_list_del(&wqe->work_list, node, prev);
@@ -454,6 +486,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
		*stalled = true;
	}

	return NULL;
@@ -477,9 +510,9 @@ static void io_assign_current_work(struct io_worker *worker,
		cond_resched();
	}

	spin_lock_irq(&worker->lock);
	spin_lock(&worker->lock);
	worker->cur_work = work;
	spin_unlock_irq(&worker->lock);
	spin_unlock(&worker->lock);
}

static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
@@ -493,6 +526,7 @@ static void io_worker_handle_work(struct io_worker *worker)

	do {
		struct io_wq_work *work;
		bool stalled;
get_next:
		/*
		 * If we got some work, mark us as busy. If we didn't, but
@@ -501,13 +535,14 @@ static void io_worker_handle_work(struct io_worker *worker)
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
		work = io_get_next_work(wqe);
		stalled = false;
		work = io_get_next_work(wqe, worker, &stalled);
		if (work)
			__io_worker_busy(wqe, worker, work);
		else if (!wq_list_empty(&wqe->work_list))
		else if (stalled)
			wqe->flags |= IO_WQE_FLAG_STALLED;

		raw_spin_unlock_irq(&wqe->lock);
		raw_spin_unlock(&wqe->lock);
		if (!work)
			break;
		io_assign_current_work(worker, work);
@@ -539,16 +574,16 @@ static void io_worker_handle_work(struct io_worker *worker)
				clear_bit(hash, &wq->hash->map);
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
				raw_spin_lock_irq(&wqe->lock);
				raw_spin_lock(&wqe->lock);
				wqe->flags &= ~IO_WQE_FLAG_STALLED;
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
				raw_spin_unlock_irq(&wqe->lock);
				raw_spin_unlock(&wqe->lock);
			}
		} while (work);

		raw_spin_lock_irq(&wqe->lock);
		raw_spin_lock(&wqe->lock);
	} while (1);
}

@@ -569,13 +604,13 @@ static int io_wqe_worker(void *data)

		set_current_state(TASK_INTERRUPTIBLE);
loop:
		raw_spin_lock_irq(&wqe->lock);
		raw_spin_lock(&wqe->lock);
		if (io_wqe_run_queue(wqe)) {
			io_worker_handle_work(worker);
			goto loop;
		}
		__io_worker_idle(wqe, worker);
		raw_spin_unlock_irq(&wqe->lock);
		raw_spin_unlock(&wqe->lock);
		if (io_flush_signals())
			continue;
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
@@ -594,7 +629,7 @@ static int io_wqe_worker(void *data)
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
		raw_spin_lock_irq(&wqe->lock);
		raw_spin_lock(&wqe->lock);
		io_worker_handle_work(worker);
	}

@@ -636,9 +671,9 @@ void io_wq_worker_sleeping(struct task_struct *tsk)

	worker->flags &= ~IO_WORKER_F_RUNNING;

	raw_spin_lock_irq(&worker->wqe->lock);
	raw_spin_lock(&worker->wqe->lock);
	io_wqe_dec_running(worker);
	raw_spin_unlock_irq(&worker->wqe->lock);
	raw_spin_unlock(&worker->wqe->lock);
}

static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
@@ -664,9 +699,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
		kfree(worker);
fail:
		atomic_dec(&acct->nr_running);
		raw_spin_lock_irq(&wqe->lock);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock_irq(&wqe->lock);
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wq);
		return;
	}
@@ -676,7 +711,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock_irq(&wqe->lock);
	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
@@ -684,7 +719,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
		worker->flags |= IO_WORKER_F_BOUND;
	if (first && (worker->flags & IO_WORKER_F_BOUND))
		worker->flags |= IO_WORKER_F_FIXED;
	raw_spin_unlock_irq(&wqe->lock);
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

@@ -759,8 +794,7 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
	int work_flags;
	unsigned long flags;
	bool do_wake;

	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
@@ -772,14 +806,14 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
		return;
	}

	work_flags = work->flags;
	raw_spin_lock_irqsave(&wqe->lock, flags);
	raw_spin_lock(&wqe->lock);
	io_wqe_insert_work(wqe, work);
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
	do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) ||
			!atomic_read(&acct->nr_running);
	raw_spin_unlock(&wqe->lock);

	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
	    !atomic_read(&acct->nr_running))
	if (do_wake)
		io_wqe_wake_worker(wqe, acct);
}

@@ -805,19 +839,18 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{
	struct io_cb_cancel_data *match = data;
	unsigned long flags;

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
	 * may dereference the passed in work.
	 */
	spin_lock_irqsave(&worker->lock, flags);
	spin_lock(&worker->lock);
	if (worker->cur_work &&
	    match->fn(worker->cur_work, match->data)) {
		set_notify_signal(worker->task);
		match->nr_running++;
	}
	spin_unlock_irqrestore(&worker->lock, flags);
	spin_unlock(&worker->lock);

	return match->nr_running && !match->cancel_all;
}
@@ -845,16 +878,15 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
{
	struct io_wq_work_node *node, *prev;
	struct io_wq_work *work;
	unsigned long flags;

retry:
	raw_spin_lock_irqsave(&wqe->lock, flags);
	raw_spin_lock(&wqe->lock);
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock_irqrestore(&wqe->lock, flags);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		if (!match->cancel_all)
@@ -863,7 +895,7 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
		/* not safe to continue after unlock */
		goto retry;
	}
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
	raw_spin_unlock(&wqe->lock);
}

static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -1004,12 +1036,12 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)

static bool io_task_work_match(struct callback_head *cb, void *data)
{
	struct create_worker_data *cwd;
	struct io_worker *worker;

	if (cb->func != create_worker_cb)
		return false;
	cwd = container_of(cb, struct create_worker_data, work);
	return cwd->wqe->wq == data;
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
}

void io_wq_exit_start(struct io_wq *wq)
@@ -1026,12 +1058,13 @@ static void io_wq_exit_workers(struct io_wq *wq)
		return;

	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
		struct create_worker_data *cwd;
		struct io_worker *worker;

		cwd = container_of(cb, struct create_worker_data, work);
		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
		worker = container_of(cb, struct io_worker, create_work);
		atomic_dec(&worker->wqe->acct[worker->create_index].nr_running);
		io_worker_ref_put(wq);
		kfree(cwd);
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
	}

	rcu_read_lock();
@@ -1143,6 +1176,35 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
	return 0;
}

/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	int i, node, prev = 0;

	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe_acct *acct;

		for (i = 0; i < 2; i++) {
			acct = &wq->wqes[node]->acct[i];
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
				acct->max_workers = new_count[i];
			new_count[i] = prev;
		}
	}
	rcu_read_unlock();
	return 0;
}

static __init int io_wq_init(void)
{
	int ret;
+2 −1
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ static inline void wq_list_add_after(struct io_wq_work_node *node,
static inline void wq_list_add_tail(struct io_wq_work_node *node,
				    struct io_wq_work_list *list)
{
	node->next = NULL;
	if (!list->first) {
		list->last = node;
		WRITE_ONCE(list->first, node);
@@ -51,7 +52,6 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node,
		list->last->next = node;
		list->last = node;
	}
	node->next = NULL;
}

static inline void wq_list_cut(struct io_wq_work_list *list,
@@ -128,6 +128,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val);

int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
int io_wq_max_workers(struct io_wq *wq, int *new_count);

static inline bool io_wq_is_hashed(struct io_wq_work *work)
{
+1029 −738

File changed.

Preview size limit exceeded, changes collapsed.

+6 −5
Original line number Diff line number Diff line
@@ -7,17 +7,18 @@

#if defined(CONFIG_IO_URING)
struct sock *io_uring_get_socket(struct file *file);
void __io_uring_cancel(struct files_struct *files);
void __io_uring_cancel(bool cancel_all);
void __io_uring_free(struct task_struct *tsk);

static inline void io_uring_files_cancel(struct files_struct *files)
static inline void io_uring_files_cancel(void)
{
	if (current->io_uring)
		__io_uring_cancel(files);
		__io_uring_cancel(false);
}
static inline void io_uring_task_cancel(void)
{
	return io_uring_files_cancel(NULL);
	if (current->io_uring)
		__io_uring_cancel(true);
}
static inline void io_uring_free(struct task_struct *tsk)
{
@@ -32,7 +33,7 @@ static inline struct sock *io_uring_get_socket(struct file *file)
static inline void io_uring_task_cancel(void)
{
}
static inline void io_uring_files_cancel(struct files_struct *files)
static inline void io_uring_files_cancel(void)
{
}
static inline void io_uring_free(struct task_struct *tsk)
+3 −0
Original line number Diff line number Diff line
@@ -421,6 +421,9 @@ extern int __sys_accept4_file(struct file *file, unsigned file_flags,
			struct sockaddr __user *upeer_sockaddr,
			 int __user *upeer_addrlen, int flags,
			 unsigned long nofile);
extern struct file *do_accept(struct file *file, unsigned file_flags,
			      struct sockaddr __user *upeer_sockaddr,
			      int __user *upeer_addrlen, int flags);
extern int __sys_accept4(int fd, struct sockaddr __user *upeer_sockaddr,
			 int __user *upeer_addrlen, int flags);
extern int __sys_socket(int family, int type, int protocol);
Loading