Commit d363f833 authored by Paul E. McKenney's avatar Paul E. McKenney
Browse files

rcu-tasks: Use workqueues for multiple rcu_tasks_invoke_cbs() invocations



If there is a flood of callbacks, it is necessary to put multiple
CPUs to work invoking those callbacks.  This commit therefore uses a
workqueue-flooding approach to parallelize RCU Tasks callback execution.

Reported-by: default avatarMartin Lau <kafai@fb.com>
Cc: Neeraj Upadhyay <neeraj.iitr10@gmail.com>
Signed-off-by: default avatarPaul E. McKenney <paulmck@kernel.org>
parent 57881863
Loading
Loading
Loading
Loading
+52 −23
Original line number Diff line number Diff line
@@ -24,10 +24,14 @@ typedef void (*postgp_func_t)(struct rcu_tasks *rtp);
 * struct rcu_tasks_percpu - Per-CPU component of definition for a Tasks-RCU-like mechanism.
 * @cblist: Callback list.
 * @lock: Lock protecting per-CPU callback list.
 * @rtp_work: Work queue for invoking callbacks.
 */
struct rcu_tasks_percpu {
	struct rcu_segcblist cblist;
	raw_spinlock_t __private lock;
	struct work_struct rtp_work;
	int cpu;
	struct rcu_tasks *rtpp;
};

/**
@@ -146,6 +150,8 @@ static const char * const rcu_tasks_gp_state_names[] = {
//
// Generic code.

static void rcu_tasks_invoke_cbs_wq(struct work_struct *wp);

/* Record grace-period phase and time. */
static void set_tasks_gp_state(struct rcu_tasks *rtp, int newstate)
{
@@ -185,6 +191,9 @@ static void cblist_init_generic(struct rcu_tasks *rtp)
		raw_spin_lock_rcu_node(rtpcp); // irqs already disabled.
		if (rcu_segcblist_empty(&rtpcp->cblist))
			rcu_segcblist_init(&rtpcp->cblist);
		INIT_WORK(&rtpcp->rtp_work, rcu_tasks_invoke_cbs_wq);
		rtpcp->cpu = cpu;
		rtpcp->rtpp = rtp;
		raw_spin_unlock_rcu_node(rtpcp); // irqs remain disabled.
	}
	raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
@@ -256,19 +265,30 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
}

// Advance callbacks and invoke any that are ready.
static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp)
static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp, struct rcu_tasks_percpu *rtpcp)
{
	int cpu;
	int cpunext;
	unsigned long flags;
	int len;
	struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
	struct rcu_head *rhp;
	struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
	struct rcu_tasks_percpu *rtpcp_next;

	for (cpu = 0; cpu < rtp->percpu_enqueue_lim; cpu++) {
		struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
	cpu = rtpcp->cpu;
	cpunext = cpu * 2 + 1;
	if (cpunext < rtp->percpu_enqueue_lim) {
		rtpcp_next = per_cpu_ptr(rtp->rtpcpu, cpunext);
		queue_work_on(cpunext, system_wq, &rtpcp_next->rtp_work);
		cpunext++;
		if (cpunext < rtp->percpu_enqueue_lim) {
			rtpcp_next = per_cpu_ptr(rtp->rtpcpu, cpunext);
			queue_work_on(cpunext, system_wq, &rtpcp_next->rtp_work);
		}
	}

	if (rcu_segcblist_empty(&rtpcp->cblist))
			continue;
		return;
	raw_spin_lock_irqsave_rcu_node(rtpcp, flags);
	rcu_segcblist_advance(&rtpcp->cblist, rcu_seq_current(&rtp->tasks_gp_seq));
	rcu_segcblist_extract_done_cbs(&rtpcp->cblist, &rcl);
@@ -285,6 +305,15 @@ static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp)
	(void)rcu_segcblist_accelerate(&rtpcp->cblist, rcu_seq_snap(&rtp->tasks_gp_seq));
	raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
}

// Workqueue flood to advance callbacks and invoke any that are ready.
static void rcu_tasks_invoke_cbs_wq(struct work_struct *wp)
{
	struct rcu_tasks *rtp;
	struct rcu_tasks_percpu *rtpcp = container_of(wp, struct rcu_tasks_percpu, rtp_work);

	rtp = rtpcp->rtpp;
	rcu_tasks_invoke_cbs(rtp, rtpcp);
}

/* RCU-tasks kthread that detects grace periods and invokes callbacks. */
@@ -320,7 +349,7 @@ static int __noreturn rcu_tasks_kthread(void *arg)

		/* Invoke callbacks. */
		set_tasks_gp_state(rtp, RTGS_INVOKE_CBS);
		rcu_tasks_invoke_cbs(rtp);
		rcu_tasks_invoke_cbs(rtp, per_cpu_ptr(rtp->rtpcpu, 0));

		/* Paranoid sleep to keep this from entering a tight loop */
		schedule_timeout_idle(rtp->gp_sleep);