Commit 2ca836b1 authored by Uladzislau Rezki (Sony)'s avatar Uladzislau Rezki (Sony) Committed by Paul E. McKenney
Browse files

rcu/kvfree: Split ready for reclaim objects from a batch



This patch splits the lists of objects so as to avoid sending any
through RCU that have already been queued for more than one grace
period.  These long-term-resident objects are immediately freed.
The remaining short-term-resident objects are queued for later freeing
using queue_rcu_work().

This change avoids delaying workqueue handlers with synchronize_rcu()
invocations.  Yes, workqueue handlers are designed to handle blocking,
but avoiding blocking when unnecessary improves performance during
low-memory situations.

Signed-off-by: default avatarUladzislau Rezki (Sony) <urezki@gmail.com>
Signed-off-by: default avatarPaul E. McKenney <paulmck@kernel.org>
parent 4c33464a
Loading
Loading
Loading
Loading
+54 −33
Original line number Diff line number Diff line
@@ -2900,15 +2900,13 @@ struct kvfree_rcu_bulk_data {
 * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
 * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
 * @head_free: List of kfree_rcu() objects waiting for a grace period
 * @head_free_gp_snap: Snapshot of RCU state for objects placed to "@head_free"
 * @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
 * @krcp: Pointer to @kfree_rcu_cpu structure
 */

struct kfree_rcu_cpu_work {
	struct work_struct rcu_work;
	struct rcu_work rcu_work;
	struct rcu_head *head_free;
	unsigned long head_free_gp_snap;
	struct list_head bulk_head_free[FREE_N_CHANNELS];
	struct kfree_rcu_cpu *krcp;
};
@@ -2916,6 +2914,7 @@ struct kfree_rcu_cpu_work {
/**
 * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
 * @head: List of kfree_rcu() objects not yet waiting for a grace period
 * @head_gp_snap: Snapshot of RCU state for objects placed to "@head"
 * @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
 * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
 * @lock: Synchronize access to this structure
@@ -2943,6 +2942,7 @@ struct kfree_rcu_cpu {
	// Objects queued on a linked list
	// through their rcu_head structures.
	struct rcu_head *head;
	unsigned long head_gp_snap;
	atomic_t head_count;

	// Objects queued on a bulk-list.
@@ -3111,10 +3111,9 @@ static void kfree_rcu_work(struct work_struct *work)
	struct rcu_head *head;
	struct kfree_rcu_cpu *krcp;
	struct kfree_rcu_cpu_work *krwp;
	unsigned long head_free_gp_snap;
	int i;

	krwp = container_of(work,
	krwp = container_of(to_rcu_work(work),
		struct kfree_rcu_cpu_work, rcu_work);
	krcp = krwp->krcp;

@@ -3126,26 +3125,11 @@ static void kfree_rcu_work(struct work_struct *work)
	// Channel 3.
	head = krwp->head_free;
	krwp->head_free = NULL;
	head_free_gp_snap = krwp->head_free_gp_snap;
	raw_spin_unlock_irqrestore(&krcp->lock, flags);

	// Handle the first two channels.
	for (i = 0; i < FREE_N_CHANNELS; i++) {
		// Start from the tail page, so a GP is likely passed for it.
		list_for_each_entry_safe_reverse(bnode, n, &bulk_head[i], list) {
			// Not yet ready? Bail out since we need one more GP.
			if (!poll_state_synchronize_rcu(bnode->gp_snap))
				break;

			list_del_init(&bnode->list);
			kvfree_rcu_bulk(krcp, bnode, i);
		}

		// Please note a request for one more extra GP can
		// occur only once for all objects in this batch.
		if (!list_empty(&bulk_head[i]))
			synchronize_rcu();

		list_for_each_entry_safe(bnode, n, &bulk_head[i], list)
			kvfree_rcu_bulk(krcp, bnode, i);
	}
@@ -3157,11 +3141,8 @@ static void kfree_rcu_work(struct work_struct *work)
	 * queued on a linked list through their rcu_head structures.
	 * This list is named "Channel 3".
	 */
	if (head) {
		cond_synchronize_rcu(head_free_gp_snap);
	kvfree_rcu_list(head);
}
}

static bool
need_offload_krc(struct kfree_rcu_cpu *krcp)
@@ -3201,6 +3182,44 @@ schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
	queue_delayed_work(system_wq, &krcp->monitor_work, delay);
}

static void
kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
{
	struct list_head bulk_ready[FREE_N_CHANNELS];
	struct kvfree_rcu_bulk_data *bnode, *n;
	struct rcu_head *head_ready = NULL;
	unsigned long flags;
	int i;

	raw_spin_lock_irqsave(&krcp->lock, flags);
	for (i = 0; i < FREE_N_CHANNELS; i++) {
		INIT_LIST_HEAD(&bulk_ready[i]);

		list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) {
			if (!poll_state_synchronize_rcu(bnode->gp_snap))
				break;

			atomic_sub(bnode->nr_records, &krcp->bulk_count[i]);
			list_move(&bnode->list, &bulk_ready[i]);
		}
	}

	if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) {
		head_ready = krcp->head;
		atomic_set(&krcp->head_count, 0);
		WRITE_ONCE(krcp->head, NULL);
	}
	raw_spin_unlock_irqrestore(&krcp->lock, flags);

	for (i = 0; i < FREE_N_CHANNELS; i++) {
		list_for_each_entry_safe(bnode, n, &bulk_ready[i], list)
			kvfree_rcu_bulk(krcp, bnode, i);
	}

	if (head_ready)
		kvfree_rcu_list(head_ready);
}

/*
 * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
 */
@@ -3211,6 +3230,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
	unsigned long flags;
	int i, j;

	// Drain ready for reclaim.
	kvfree_rcu_drain_ready(krcp);

	raw_spin_lock_irqsave(&krcp->lock, flags);

	// Attempt to start a new batch.
@@ -3230,8 +3252,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
			// Channel 2 corresponds to vmalloc-pointer bulk path.
			for (j = 0; j < FREE_N_CHANNELS; j++) {
				if (list_empty(&krwp->bulk_head_free[j])) {
					list_replace_init(&krcp->bulk_head[j], &krwp->bulk_head_free[j]);
					atomic_set(&krcp->bulk_count[j], 0);
					list_replace_init(&krcp->bulk_head[j],
						&krwp->bulk_head_free[j]);
				}
			}

@@ -3239,13 +3262,8 @@ static void kfree_rcu_monitor(struct work_struct *work)
			// objects queued on the linked list.
			if (!krwp->head_free) {
				krwp->head_free = krcp->head;
				WRITE_ONCE(krcp->head, NULL);
				atomic_set(&krcp->head_count, 0);

				// Take a snapshot for this krwp. Please note no more
				// any objects can be added to attached head_free channel
				// therefore fixate a GP for it here.
				krwp->head_free_gp_snap = get_state_synchronize_rcu();
				WRITE_ONCE(krcp->head, NULL);
			}

			// One work is per one batch, so there are three
@@ -3253,7 +3271,7 @@ static void kfree_rcu_monitor(struct work_struct *work)
			// be that the work is in the pending state when
			// channels have been detached following by each
			// other.
			queue_work(system_wq, &krwp->rcu_work);
			queue_rcu_work(system_wq, &krwp->rcu_work);
		}
	}

@@ -3440,6 +3458,9 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
		head->next = krcp->head;
		WRITE_ONCE(krcp->head, head);
		atomic_inc(&krcp->head_count);

		// Take a snapshot for this krcp.
		krcp->head_gp_snap = get_state_synchronize_rcu();
		success = true;
	}

@@ -4834,7 +4855,7 @@ static void __init kfree_rcu_batch_init(void)
		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);

		for (i = 0; i < KFREE_N_BATCHES; i++) {
			INIT_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
			INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
			krcp->krw_arr[i].krcp = krcp;

			for (j = 0; j < FREE_N_CHANNELS; j++)