Commit 9b4b7c1f authored by Bob Pearson's avatar Bob Pearson Committed by Jason Gunthorpe
Browse files

RDMA/rxe: Add workqueue support for rxe tasks

Replace tasklets by work queues for the three main rxe tasklets:
rxe_requester, rxe_completer and rxe_responder.

work queues are a more modern way to process work from an IRQ and provide
more control over how that work is run for future patches.

Link: https://lore.kernel.org/r/20230428171321.5774-1-rpearsonhpe@gmail.com


Signed-off-by: default avatarIan Ziemba <ian.ziemba@hpe.com>
Signed-off-by: default avatarBob Pearson <rpearsonhpe@gmail.com>
Reviewed-by: default avatarDaisuke Matsuda <matsuda-daisuke@fujitsu.com>
Tested-by: default avatarDaisuke Matsuda <matsuda-daisuke@fujitsu.com>
Signed-off-by: default avatarJason Gunthorpe <jgg@nvidia.com>
parent f1fcbaa1
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -212,10 +212,16 @@ static int __init rxe_module_init(void)
{
	int err;

	err = rxe_net_init();
	err = rxe_alloc_wq();
	if (err)
		return err;

	err = rxe_net_init();
	if (err) {
		rxe_destroy_wq();
		return err;
	}

	rdma_link_register(&rxe_link_ops);
	pr_info("loaded\n");
	return 0;
@@ -226,6 +232,7 @@ static void __exit rxe_module_exit(void)
	rdma_link_unregister(&rxe_link_ops);
	ib_unregister_driver(RDMA_DRIVER_RXE);
	rxe_net_exit();
	rxe_destroy_wq();

	pr_info("unloaded\n");
}
+63 −47
Original line number Diff line number Diff line
@@ -6,8 +6,24 @@

#include "rxe.h"

static struct workqueue_struct *rxe_wq;

int rxe_alloc_wq(void)
{
	rxe_wq = alloc_workqueue("rxe_wq", WQ_UNBOUND, WQ_MAX_ACTIVE);
	if (!rxe_wq)
		return -ENOMEM;

	return 0;
}

void rxe_destroy_wq(void)
{
	destroy_workqueue(rxe_wq);
}

/* Check if task is idle i.e. not running, not scheduled in
 * tasklet queue and not draining. If so move to busy to
 * work queue and not draining. If so move to busy to
 * reserve a slot in do_task() by setting to busy and taking
 * a qp reference to cover the gap from now until the task finishes.
 * state will move out of busy if task returns a non zero value
@@ -21,9 +37,6 @@ static bool __reserve_if_idle(struct rxe_task *task)
{
	WARN_ON(rxe_read(task->qp) <= 0);

	if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
		return false;

	if (task->state == TASK_STATE_IDLE) {
		rxe_get(task->qp);
		task->state = TASK_STATE_BUSY;
@@ -38,7 +51,7 @@ static bool __reserve_if_idle(struct rxe_task *task)
}

/* check if task is idle or drained and not currently
 * scheduled in the tasklet queue. This routine is
 * scheduled in the work queue. This routine is
 * called by rxe_cleanup_task or rxe_disable_task to
 * see if the queue is empty.
 * Context: caller should hold task->lock.
@@ -46,7 +59,7 @@ static bool __reserve_if_idle(struct rxe_task *task)
 */
static bool __is_done(struct rxe_task *task)
{
	if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
	if (work_pending(&task->work))
		return false;

	if (task->state == TASK_STATE_IDLE ||
@@ -77,23 +90,23 @@ static bool is_done(struct rxe_task *task)
 * schedules the task. They must call __reserve_if_idle to
 * move the task to busy before calling or scheduling.
 * The task can also be moved to drained or invalid
 * by calls to rxe-cleanup_task or rxe_disable_task.
 * by calls to rxe_cleanup_task or rxe_disable_task.
 * In that case tasks which get here are not executed but
 * just flushed. The tasks are designed to look to see if
 * there is work to do and do part of it before returning
 * there is work to do and then do part of it before returning
 * here with a return value of zero until all the work
 * has been consumed then it retuens a non-zero value.
 * has been consumed then it returns a non-zero value.
 * The number of times the task can be run is limited by
 * max iterations so one task cannot hold the cpu forever.
 * If the limit is hit and work remains the task is rescheduled.
 */
static void do_task(struct tasklet_struct *t)
static void do_task(struct rxe_task *task)
{
	int cont;
	int ret;
	struct rxe_task *task = from_tasklet(task, t, tasklet);
	unsigned int iterations;
	unsigned long flags;
	int resched = 0;
	int cont;
	int ret;

	WARN_ON(rxe_read(task->qp) <= 0);

@@ -115,25 +128,22 @@ static void do_task(struct tasklet_struct *t)
		} while (ret == 0 && iterations-- > 0);

		spin_lock_irqsave(&task->lock, flags);
		switch (task->state) {
		case TASK_STATE_BUSY:
			if (ret) {
				task->state = TASK_STATE_IDLE;
			} else {
				/* This can happen if the client
				 * can add work faster than the
				 * tasklet can finish it.
				 * Reschedule the tasklet and exit
				 * the loop to give up the cpu
		/* we're not done yet but we ran out of iterations.
		 * yield the cpu and reschedule the task
		 */
		if (!ret) {
			task->state = TASK_STATE_IDLE;
			resched = 1;
			goto exit;
		}

		switch (task->state) {
		case TASK_STATE_BUSY:
			task->state = TASK_STATE_IDLE;
			break;

		/* someone tried to run the task since the last time we called
		 * func, so we will call one more time regardless of the
		 * return value
		/* someone tried to schedule the task while we
		 * were running, keep going
		 */
		case TASK_STATE_ARMED:
			task->state = TASK_STATE_BUSY;
@@ -141,21 +151,23 @@ static void do_task(struct tasklet_struct *t)
			break;

		case TASK_STATE_DRAINING:
			if (ret)
			task->state = TASK_STATE_DRAINED;
			else
				cont = 1;
			break;

		default:
			WARN_ON(1);
			rxe_info_qp(task->qp, "unexpected task state = %d", task->state);
			rxe_dbg_qp(task->qp, "unexpected task state = %d",
				   task->state);
			task->state = TASK_STATE_IDLE;
		}

exit:
		if (!cont) {
			task->num_done++;
			if (WARN_ON(task->num_done != task->num_sched))
				rxe_err_qp(task->qp, "%ld tasks scheduled, %ld tasks done",
				rxe_dbg_qp(
					task->qp,
					"%ld tasks scheduled, %ld tasks done",
					task->num_sched, task->num_done);
		}
		spin_unlock_irqrestore(&task->lock, flags);
@@ -169,6 +181,12 @@ static void do_task(struct tasklet_struct *t)
	rxe_put(task->qp);
}

/* wrapper around do_task to fix argument for work queue */
static void do_work(struct work_struct *work)
{
	do_task(container_of(work, struct rxe_task, work));
}

int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
		  int (*func)(struct rxe_qp *))
{
@@ -176,11 +194,9 @@ int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,

	task->qp = qp;
	task->func = func;

	tasklet_setup(&task->tasklet, do_task);

	task->state = TASK_STATE_IDLE;
	spin_lock_init(&task->lock);
	INIT_WORK(&task->work, do_work);

	return 0;
}
@@ -213,8 +229,6 @@ void rxe_cleanup_task(struct rxe_task *task)
	while (!is_done(task))
		cond_resched();

	tasklet_kill(&task->tasklet);

	spin_lock_irqsave(&task->lock, flags);
	task->state = TASK_STATE_INVALID;
	spin_unlock_irqrestore(&task->lock, flags);
@@ -226,7 +240,7 @@ void rxe_cleanup_task(struct rxe_task *task)
void rxe_run_task(struct rxe_task *task)
{
	unsigned long flags;
	int run;
	bool run;

	WARN_ON(rxe_read(task->qp) <= 0);

@@ -235,11 +249,11 @@ void rxe_run_task(struct rxe_task *task)
	spin_unlock_irqrestore(&task->lock, flags);

	if (run)
		do_task(&task->tasklet);
		do_task(task);
}

/* schedule the task to run later as a tasklet.
 * the tasklet)schedule call can be called holding
/* schedule the task to run later as a work queue entry.
 * the queue_work call can be called holding
 * the lock.
 */
void rxe_sched_task(struct rxe_task *task)
@@ -250,7 +264,7 @@ void rxe_sched_task(struct rxe_task *task)

	spin_lock_irqsave(&task->lock, flags);
	if (__reserve_if_idle(task))
		tasklet_schedule(&task->tasklet);
		queue_work(rxe_wq, &task->work);
	spin_unlock_irqrestore(&task->lock, flags);
}

@@ -277,7 +291,9 @@ void rxe_disable_task(struct rxe_task *task)
	while (!is_done(task))
		cond_resched();

	tasklet_disable(&task->tasklet);
	spin_lock_irqsave(&task->lock, flags);
	task->state = TASK_STATE_DRAINED;
	spin_unlock_irqrestore(&task->lock, flags);
}

void rxe_enable_task(struct rxe_task *task)
@@ -291,7 +307,7 @@ void rxe_enable_task(struct rxe_task *task)
		spin_unlock_irqrestore(&task->lock, flags);
		return;
	}

	task->state = TASK_STATE_IDLE;
	tasklet_enable(&task->tasklet);
	spin_unlock_irqrestore(&task->lock, flags);
}
+5 −1
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@ enum {
 * called again.
 */
struct rxe_task {
	struct tasklet_struct	tasklet;
	struct work_struct	work;
	int			state;
	spinlock_t		lock;
	struct rxe_qp		*qp;
@@ -32,6 +32,10 @@ struct rxe_task {
	long			num_done;
};

int rxe_alloc_wq(void);

void rxe_destroy_wq(void);

/*
 * init rxe_task structure
 *	qp  => parameter to pass to func