Commit 37d1e2e3 authored by Jens Axboe's avatar Jens Axboe
Browse files

io_uring: move SQPOLL thread io-wq forked worker



Don't use a kthread for SQPOLL, use a forked worker just like the io-wq
workers. With that done, we can drop the various context grabbing we do
for SQPOLL, it already has everything it needs.

Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 843bbfd4
Loading
Loading
Loading
Loading
+181 −292
Original line number Diff line number Diff line
@@ -57,7 +57,6 @@
#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/kthread.h>
#include <linux/blkdev.h>
#include <linux/bvec.h>
#include <linux/net.h>
@@ -254,6 +253,11 @@ struct io_restriction {
	bool registered;
};

enum {
	IO_SQ_THREAD_SHOULD_STOP = 0,
	IO_SQ_THREAD_SHOULD_PARK,
};

struct io_sq_data {
	refcount_t		refs;
	struct mutex		lock;
@@ -267,6 +271,13 @@ struct io_sq_data {
	struct wait_queue_head	wait;

	unsigned		sq_thread_idle;
	int			sq_cpu;
	pid_t			task_pid;

	unsigned long		state;
	struct completion	startup;
	struct completion	completion;
	struct completion	exited;
};

#define IO_IOPOLL_BATCH			8
@@ -367,18 +378,13 @@ struct io_ring_ctx {
	struct io_rings	*rings;

	/*
	 * For SQPOLL usage - we hold a reference to the parent task, so we
	 * have access to the ->files
	 * For SQPOLL usage
	 */
	struct task_struct	*sqo_task;

	/* Only used for accounting purposes */
	struct mm_struct	*mm_account;

#ifdef CONFIG_BLK_CGROUP
	struct cgroup_subsys_state	*sqo_blkcg_css;
#endif

	struct io_sq_data	*sq_data;	/* if using sq thread polling */

	struct wait_queue_head	sqo_sq_wait;
@@ -398,13 +404,6 @@ struct io_ring_ctx {

	struct user_struct	*user;

	const struct cred	*creds;

#ifdef CONFIG_AUDIT
	kuid_t			loginuid;
	unsigned int		sessionid;
#endif

	struct completion	ref_comp;
	struct completion	sq_thread_comp;

@@ -989,6 +988,7 @@ static const struct io_op_def io_op_defs[] = {
static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
					 struct task_struct *task,
					 struct files_struct *files);
static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx);
static void destroy_fixed_rsrc_ref_node(struct fixed_rsrc_ref_node *ref_node);
static struct fixed_rsrc_ref_node *alloc_fixed_rsrc_ref_node(
			struct io_ring_ctx *ctx);
@@ -1100,118 +1100,6 @@ static bool io_match_task(struct io_kiocb *head,
	return false;
}

static void io_sq_thread_drop_mm_files(void)
{
	struct files_struct *files = current->files;
	struct mm_struct *mm = current->mm;

	if (mm) {
		kthread_unuse_mm(mm);
		mmput(mm);
		current->mm = NULL;
	}
	if (files) {
		struct nsproxy *nsproxy = current->nsproxy;

		task_lock(current);
		current->files = NULL;
		current->nsproxy = NULL;
		task_unlock(current);
		put_files_struct(files);
		put_nsproxy(nsproxy);
	}
}

static int __io_sq_thread_acquire_files(struct io_ring_ctx *ctx)
{
	if (!current->files) {
		struct files_struct *files;
		struct nsproxy *nsproxy;

		task_lock(ctx->sqo_task);
		files = ctx->sqo_task->files;
		if (!files) {
			task_unlock(ctx->sqo_task);
			return -EOWNERDEAD;
		}
		atomic_inc(&files->count);
		get_nsproxy(ctx->sqo_task->nsproxy);
		nsproxy = ctx->sqo_task->nsproxy;
		task_unlock(ctx->sqo_task);

		task_lock(current);
		current->files = files;
		current->nsproxy = nsproxy;
		task_unlock(current);
	}
	return 0;
}

static int __io_sq_thread_acquire_mm(struct io_ring_ctx *ctx)
{
	struct mm_struct *mm;

	if (current->mm)
		return 0;

	task_lock(ctx->sqo_task);
	mm = ctx->sqo_task->mm;
	if (unlikely(!mm || !mmget_not_zero(mm)))
		mm = NULL;
	task_unlock(ctx->sqo_task);

	if (mm) {
		kthread_use_mm(mm);
		return 0;
	}

	return -EFAULT;
}

static int __io_sq_thread_acquire_mm_files(struct io_ring_ctx *ctx,
					   struct io_kiocb *req)
{
	int ret;

	ret = __io_sq_thread_acquire_mm(ctx);
	if (unlikely(ret))
		return ret;

	ret = __io_sq_thread_acquire_files(ctx);
	if (unlikely(ret))
		return ret;

	return 0;
}

static inline int io_sq_thread_acquire_mm_files(struct io_ring_ctx *ctx,
						struct io_kiocb *req)
{
	if (!(ctx->flags & IORING_SETUP_SQPOLL))
		return 0;
	return __io_sq_thread_acquire_mm_files(ctx, req);
}

static void io_sq_thread_associate_blkcg(struct io_ring_ctx *ctx,
					 struct cgroup_subsys_state **cur_css)

{
#ifdef CONFIG_BLK_CGROUP
	/* puts the old one when swapping */
	if (*cur_css != ctx->sqo_blkcg_css) {
		kthread_associate_blkcg(ctx->sqo_blkcg_css);
		*cur_css = ctx->sqo_blkcg_css;
	}
#endif
}

static void io_sq_thread_unassociate_blkcg(void)
{
#ifdef CONFIG_BLK_CGROUP
	kthread_associate_blkcg(NULL);
#endif
}

static inline void req_set_fail_links(struct io_kiocb *req)
{
	if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
@@ -2132,15 +2020,11 @@ static void __io_req_task_submit(struct io_kiocb *req)

	/* ctx stays valid until unlock, even if we drop all ours ctx->refs */
	mutex_lock(&ctx->uring_lock);
	if (!ctx->sqo_dead && !(current->flags & PF_EXITING) &&
	    !io_sq_thread_acquire_mm_files(ctx, req))
	if (!ctx->sqo_dead && !(current->flags & PF_EXITING))
		__io_queue_sqe(req);
	else
		__io_req_task_cancel(req, -EFAULT);
	mutex_unlock(&ctx->uring_lock);

	if (ctx->flags & IORING_SETUP_SQPOLL)
		io_sq_thread_drop_mm_files();
}

static void io_req_task_submit(struct callback_head *cb)
@@ -2604,7 +2488,6 @@ static bool io_rw_reissue(struct io_kiocb *req)
{
#ifdef CONFIG_BLOCK
	umode_t mode = file_inode(req->file)->i_mode;
	int ret;

	if (!S_ISBLK(mode) && !S_ISREG(mode))
		return false;
@@ -2613,9 +2496,7 @@ static bool io_rw_reissue(struct io_kiocb *req)

	lockdep_assert_held(&req->ctx->uring_lock);

	ret = io_sq_thread_acquire_mm_files(req->ctx, req);

	if (!ret && io_resubmit_prep(req)) {
	if (io_resubmit_prep(req)) {
		refcount_inc(&req->refs);
		io_queue_async_work(req);
		return true;
@@ -6461,9 +6342,6 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
	if (unlikely(req->opcode >= IORING_OP_LAST))
		return -EINVAL;

	if (unlikely(io_sq_thread_acquire_mm_files(ctx, req)))
		return -EFAULT;

	if (unlikely(!io_check_restriction(ctx, req, sqe_flags)))
		return -EACCES;

@@ -6779,71 +6657,97 @@ static void io_sqd_init_new(struct io_sq_data *sqd)
	io_sqd_update_thread_idle(sqd);
}

static bool io_sq_thread_should_stop(struct io_sq_data *sqd)
{
	return test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
}

static bool io_sq_thread_should_park(struct io_sq_data *sqd)
{
	return test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
}

static void io_sq_thread_parkme(struct io_sq_data *sqd)
{
	for (;;) {
		/*
		 * TASK_PARKED is a special state; we must serialize against
		 * possible pending wakeups to avoid store-store collisions on
		 * task->state.
		 *
		 * Such a collision might possibly result in the task state
		 * changin from TASK_PARKED and us failing the
		 * wait_task_inactive() in kthread_park().
		 */
		set_special_state(TASK_PARKED);
		if (!test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state))
			break;

		/*
		 * Thread is going to call schedule(), do not preempt it,
		 * or the caller of kthread_park() may spend more time in
		 * wait_task_inactive().
		 */
		preempt_disable();
		complete(&sqd->completion);
		schedule_preempt_disabled();
		preempt_enable();
	}
	__set_current_state(TASK_RUNNING);
}

static int io_sq_thread(void *data)
{
	struct cgroup_subsys_state *cur_css = NULL;
	struct files_struct *old_files = current->files;
	struct nsproxy *old_nsproxy = current->nsproxy;
	const struct cred *old_cred = NULL;
	struct io_sq_data *sqd = data;
	struct io_ring_ctx *ctx;
	unsigned long timeout = 0;
	char buf[TASK_COMM_LEN];
	DEFINE_WAIT(wait);

	task_lock(current);
	current->files = NULL;
	current->nsproxy = NULL;
	task_unlock(current);
	sprintf(buf, "iou-sqp-%d", sqd->task_pid);
	set_task_comm(current, buf);
	sqd->thread = current;
	current->pf_io_worker = NULL;

	if (sqd->sq_cpu != -1)
		set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
	else
		set_cpus_allowed_ptr(current, cpu_online_mask);
	current->flags |= PF_NO_SETAFFINITY;

	complete(&sqd->completion);

	while (!kthread_should_stop()) {
	wait_for_completion(&sqd->startup);

	while (!io_sq_thread_should_stop(sqd)) {
		int ret;
		bool cap_entries, sqt_spin, needs_sched;

		/*
		 * Any changes to the sqd lists are synchronized through the
		 * kthread parking. This synchronizes the thread vs users,
		 * thread parking. This synchronizes the thread vs users,
		 * the users are synchronized on the sqd->ctx_lock.
		 */
		if (kthread_should_park()) {
			kthread_parkme();
			/*
			 * When sq thread is unparked, in case the previous park operation
			 * comes from io_put_sq_data(), which means that sq thread is going
			 * to be stopped, so here needs to have a check.
			 */
			if (kthread_should_stop())
				break;
		if (io_sq_thread_should_park(sqd)) {
			io_sq_thread_parkme(sqd);
			continue;
		}

		if (unlikely(!list_empty(&sqd->ctx_new_list))) {
			io_sqd_init_new(sqd);
			timeout = jiffies + sqd->sq_thread_idle;
		}

		if (fatal_signal_pending(current))
			break;
		sqt_spin = false;
		cap_entries = !list_is_singular(&sqd->ctx_list);
		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
			if (current->cred != ctx->creds) {
				if (old_cred)
					revert_creds(old_cred);
				old_cred = override_creds(ctx->creds);
			}
			io_sq_thread_associate_blkcg(ctx, &cur_css);
#ifdef CONFIG_AUDIT
			current->loginuid = ctx->loginuid;
			current->sessionid = ctx->sessionid;
#endif

			ret = __io_sq_thread(ctx, cap_entries);
			if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
				sqt_spin = true;

			io_sq_thread_drop_mm_files();
		}

		if (sqt_spin || !time_after(jiffies, timeout)) {
			io_run_task_work();
			io_sq_thread_drop_mm_files();
			cond_resched();
			if (sqt_spin)
				timeout = jiffies + sqd->sq_thread_idle;
@@ -6864,7 +6768,7 @@ static int io_sq_thread(void *data)
			}
		}

		if (needs_sched && !kthread_should_park()) {
		if (needs_sched && !io_sq_thread_should_park(sqd)) {
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
				io_ring_set_wakeup_flag(ctx);

@@ -6877,22 +6781,21 @@ static int io_sq_thread(void *data)
		timeout = jiffies + sqd->sq_thread_idle;
	}

	io_run_task_work();
	io_sq_thread_drop_mm_files();

	if (cur_css)
		io_sq_thread_unassociate_blkcg();
	if (old_cred)
		revert_creds(old_cred);
	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
		io_uring_cancel_sqpoll(ctx);

	task_lock(current);
	current->files = old_files;
	current->nsproxy = old_nsproxy;
	task_unlock(current);
	io_run_task_work();

	kthread_parkme();
	/*
	 * Clear thread under lock so that concurrent parks work correctly
	 */
	complete_all(&sqd->completion);
	mutex_lock(&sqd->lock);
	sqd->thread = NULL;
	mutex_unlock(&sqd->lock);

	return 0;
	complete(&sqd->exited);
	do_exit(0);
}

struct io_wait_queue {
@@ -7182,20 +7085,73 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
	return 0;
}

static void io_sq_thread_unpark(struct io_sq_data *sqd)
	__releases(&sqd->lock)
{
	if (!sqd->thread)
		return;
	if (sqd->thread == current)
		return;
	clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
	wake_up_state(sqd->thread, TASK_PARKED);
	mutex_unlock(&sqd->lock);
}

static bool io_sq_thread_park(struct io_sq_data *sqd)
	__acquires(&sqd->lock)
{
	if (sqd->thread == current)
		return true;
	mutex_lock(&sqd->lock);
	if (!sqd->thread) {
		mutex_unlock(&sqd->lock);
		return false;
	}
	set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
	wake_up_process(sqd->thread);
	wait_for_completion(&sqd->completion);
	return true;
}

static void io_sq_thread_stop(struct io_sq_data *sqd)
{
	if (!sqd->thread)
		return;

	set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
	WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state));
	wake_up_process(sqd->thread);
	wait_for_completion(&sqd->exited);
}

static void io_put_sq_data(struct io_sq_data *sqd)
{
	if (refcount_dec_and_test(&sqd->refs)) {
		/*
		 * The park is a bit of a work-around, without it we get
		 * warning spews on shutdown with SQPOLL set and affinity
		 * set to a single CPU.
		 */
		io_sq_thread_stop(sqd);
		kfree(sqd);
	}
}

static void io_sq_thread_finish(struct io_ring_ctx *ctx)
{
	struct io_sq_data *sqd = ctx->sq_data;

	if (sqd) {
		if (sqd->thread) {
			kthread_park(sqd->thread);
			kthread_stop(sqd->thread);
			wait_for_completion(&ctx->sq_thread_comp);
			io_sq_thread_park(sqd);
		}

		kfree(sqd);
		mutex_lock(&sqd->ctx_lock);
		list_del(&ctx->sqd_list);
		io_sqd_update_thread_idle(sqd);
		mutex_unlock(&sqd->ctx_lock);

		if (sqd->thread)
			io_sq_thread_unpark(sqd);

		io_put_sq_data(sqd);
		ctx->sq_data = NULL;
	}
}

@@ -7242,58 +7198,12 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p)
	mutex_init(&sqd->ctx_lock);
	mutex_init(&sqd->lock);
	init_waitqueue_head(&sqd->wait);
	init_completion(&sqd->startup);
	init_completion(&sqd->completion);
	init_completion(&sqd->exited);
	return sqd;
}

static void io_sq_thread_unpark(struct io_sq_data *sqd)
	__releases(&sqd->lock)
{
	if (!sqd->thread)
		return;
	kthread_unpark(sqd->thread);
	mutex_unlock(&sqd->lock);
}

static void io_sq_thread_park(struct io_sq_data *sqd)
	__acquires(&sqd->lock)
{
	if (!sqd->thread)
		return;
	mutex_lock(&sqd->lock);
	kthread_park(sqd->thread);
}

static void io_sq_thread_stop(struct io_ring_ctx *ctx)
{
	struct io_sq_data *sqd = ctx->sq_data;

	if (sqd) {
		if (sqd->thread) {
			/*
			 * We may arrive here from the error branch in
			 * io_sq_offload_create() where the kthread is created
			 * without being waked up, thus wake it up now to make
			 * sure the wait will complete.
			 */
			wake_up_process(sqd->thread);
			wait_for_completion(&ctx->sq_thread_comp);

			io_sq_thread_park(sqd);
		}

		mutex_lock(&sqd->ctx_lock);
		list_del(&ctx->sqd_list);
		io_sqd_update_thread_idle(sqd);
		mutex_unlock(&sqd->ctx_lock);

		if (sqd->thread)
			io_sq_thread_unpark(sqd);

		io_put_sq_data(sqd);
		ctx->sq_data = NULL;
	}
}

#if defined(CONFIG_UNIX)
/*
 * Ensure the UNIX gc is aware of our file set, so we are certain that
@@ -7969,17 +7879,20 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
			if (!cpu_online(cpu))
				goto err;

			sqd->thread = kthread_create_on_cpu(io_sq_thread, sqd,
							cpu, "io_uring-sq");
			sqd->sq_cpu = cpu;
		} else {
			sqd->thread = kthread_create(io_sq_thread, sqd,
							"io_uring-sq");
			sqd->sq_cpu = -1;
		}
		if (IS_ERR(sqd->thread)) {
			ret = PTR_ERR(sqd->thread);

		sqd->task_pid = current->pid;
		current->flags |= PF_IO_WORKER;
		ret = io_wq_fork_thread(io_sq_thread, sqd);
		current->flags &= ~PF_IO_WORKER;
		if (ret < 0) {
			sqd->thread = NULL;
			goto err;
		}
		wait_for_completion(&sqd->completion);
		ret = io_uring_alloc_task_context(sqd->thread, ctx);
		if (ret)
			goto err;
@@ -7991,7 +7904,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,

	return 0;
err:
	io_sq_thread_stop(ctx);
	io_sq_thread_finish(ctx);
	return ret;
}

@@ -8000,7 +7913,7 @@ static void io_sq_offload_start(struct io_ring_ctx *ctx)
	struct io_sq_data *sqd = ctx->sq_data;

	if ((ctx->flags & IORING_SETUP_SQPOLL) && sqd->thread)
		wake_up_process(sqd->thread);
		complete(&sqd->startup);
}

static inline void __io_unaccount_mem(struct user_struct *user,
@@ -8466,21 +8379,14 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
	mutex_lock(&ctx->uring_lock);
	mutex_unlock(&ctx->uring_lock);

	io_sq_thread_stop(ctx);
	io_sq_thread_finish(ctx);
	io_sqe_buffers_unregister(ctx);

	if (ctx->sqo_task) {
		put_task_struct(ctx->sqo_task);
		ctx->sqo_task = NULL;
	if (ctx->mm_account) {
		mmdrop(ctx->mm_account);
		ctx->mm_account = NULL;
	}

#ifdef CONFIG_BLK_CGROUP
	if (ctx->sqo_blkcg_css)
		css_put(ctx->sqo_blkcg_css);
#endif

	mutex_lock(&ctx->uring_lock);
	io_sqe_files_unregister(ctx);
	mutex_unlock(&ctx->uring_lock);
@@ -8500,7 +8406,6 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)

	percpu_ref_exit(&ctx->refs);
	free_uid(ctx->user);
	put_cred(ctx->creds);
	io_req_caches_free(ctx, NULL);
	kfree(ctx->cancel_hash);
	kfree(ctx);
@@ -8793,12 +8698,15 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx,
					  struct files_struct *files)
{
	struct task_struct *task = current;
	bool did_park = false;

	if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
		io_disable_sqo_submit(ctx);
		did_park = io_sq_thread_park(ctx->sq_data);
		if (did_park) {
			task = ctx->sq_data->thread;
			atomic_inc(&task->io_uring->in_idle);
		io_sq_thread_park(ctx->sq_data);
		}
	}

	io_cancel_defer_files(ctx, task, files);
@@ -8807,7 +8715,7 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx,
	if (!files)
		io_uring_try_cancel_requests(ctx, task, NULL);

	if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
	if (did_park) {
		atomic_dec(&task->io_uring->in_idle);
		io_sq_thread_unpark(ctx->sq_data);
	}
@@ -8907,14 +8815,17 @@ static s64 tctx_inflight(struct io_uring_task *tctx)

static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx)
{
	struct io_sq_data *sqd = ctx->sq_data;
	struct io_uring_task *tctx;
	s64 inflight;
	DEFINE_WAIT(wait);

	if (!ctx->sq_data)
	if (!sqd)
		return;
	tctx = ctx->sq_data->thread->io_uring;
	io_disable_sqo_submit(ctx);
	if (!io_sq_thread_park(sqd))
		return;
	tctx = ctx->sq_data->thread->io_uring;

	atomic_inc(&tctx->in_idle);
	do {
@@ -8935,6 +8846,7 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx)
		finish_wait(&tctx->wait, &wait);
	} while (1);
	atomic_dec(&tctx->in_idle);
	io_sq_thread_unpark(sqd);
}

/*
@@ -9511,12 +9423,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
	ctx->compat = in_compat_syscall();
	ctx->limit_mem = !capable(CAP_IPC_LOCK);
	ctx->user = user;
	ctx->creds = get_current_cred();
#ifdef CONFIG_AUDIT
	ctx->loginuid = current->loginuid;
	ctx->sessionid = current->sessionid;
#endif
	ctx->sqo_task = get_task_struct(current);
	ctx->sqo_task = current;

	/*
	 * This is just grabbed for accounting purposes. When a process exits,
@@ -9527,24 +9434,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
	mmgrab(current->mm);
	ctx->mm_account = current->mm;

#ifdef CONFIG_BLK_CGROUP
	/*
	 * The sq thread will belong to the original cgroup it was inited in.
	 * If the cgroup goes offline (e.g. disabling the io controller), then
	 * issued bios will be associated with the closest cgroup later in the
	 * block layer.
	 */
	rcu_read_lock();
	ctx->sqo_blkcg_css = blkcg_css();
	ret = css_tryget_online(ctx->sqo_blkcg_css);
	rcu_read_unlock();
	if (!ret) {
		/* don't init against a dying cgroup, have the user try again */
		ctx->sqo_blkcg_css = NULL;
		ret = -ENODEV;
		goto err;
	}
#endif
	ret = io_allocate_scq_urings(ctx, p);
	if (ret)
		goto err;