Commit 3986f9a4 authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: multiple workspaces for CRUSH computations



Replace a global map->crush_workspace (protected by a global mutex)
with a list of workspaces, up to the number of CPUs + 1.

This is based on a patch from Robin Geuze <robing@nl.team.blue>.
Robin and his team have observed a 10-20% increase in IOPS on all
queue depths and lower CPU usage as well on a high-end all-NVMe
100GbE cluster.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 1c30c907
Loading
Loading
Loading
Loading
+12 −2
Original line number Diff line number Diff line
@@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
		     const char *fmt, ...);
void ceph_oid_destroy(struct ceph_object_id *oid);

struct workspace_manager {
	struct list_head idle_ws;
	spinlock_t ws_lock;
	/* Number of free workspaces */
	int free_ws;
	/* Total number of allocated workspaces */
	atomic_t total_ws;
	/* Waiters for a free workspace */
	wait_queue_head_t ws_wait;
};

struct ceph_pg_mapping {
	struct rb_node node;
	struct ceph_pg pgid;
@@ -184,8 +195,7 @@ struct ceph_osdmap {
	 * the list of osds that store+replicate them. */
	struct crush_map *crush;

	struct mutex crush_workspace_mutex;
	void *crush_workspace;
	struct workspace_manager crush_wsm;
};

static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
+3 −0
Original line number Diff line number Diff line
@@ -346,6 +346,9 @@ struct crush_work_bucket {

struct crush_work {
	struct crush_work_bucket **work; /* Per-bucket working store */
#ifdef __KERNEL__
	struct list_head item;
#endif
};

#ifdef __KERNEL__
+151 −15
Original line number Diff line number Diff line
@@ -964,6 +964,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
	return -EINVAL;
}

/*
 * CRUSH workspaces
 *
 * workspace_manager framework borrowed from fs/btrfs/compression.c.
 * Two simplifications: there is only one type of workspace and there
 * is always at least one workspace.
 */
static struct crush_work *alloc_workspace(const struct crush_map *c)
{
	struct crush_work *work;
	size_t work_size;

	WARN_ON(!c->working_size);
	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
	dout("%s work_size %zu bytes\n", __func__, work_size);

	work = ceph_kvmalloc(work_size, GFP_NOIO);
	if (!work)
		return NULL;

	INIT_LIST_HEAD(&work->item);
	crush_init_workspace(c, work);
	return work;
}

static void free_workspace(struct crush_work *work)
{
	WARN_ON(!list_empty(&work->item));
	kvfree(work);
}

static void init_workspace_manager(struct workspace_manager *wsm)
{
	INIT_LIST_HEAD(&wsm->idle_ws);
	spin_lock_init(&wsm->ws_lock);
	atomic_set(&wsm->total_ws, 0);
	wsm->free_ws = 0;
	init_waitqueue_head(&wsm->ws_wait);
}

static void add_initial_workspace(struct workspace_manager *wsm,
				  struct crush_work *work)
{
	WARN_ON(!list_empty(&wsm->idle_ws));

	list_add(&work->item, &wsm->idle_ws);
	atomic_set(&wsm->total_ws, 1);
	wsm->free_ws = 1;
}

static void cleanup_workspace_manager(struct workspace_manager *wsm)
{
	struct crush_work *work;

	while (!list_empty(&wsm->idle_ws)) {
		work = list_first_entry(&wsm->idle_ws, struct crush_work,
					item);
		list_del_init(&work->item);
		free_workspace(work);
	}
	atomic_set(&wsm->total_ws, 0);
	wsm->free_ws = 0;
}

/*
 * Finds an available workspace or allocates a new one.  If it's not
 * possible to allocate a new one, waits until there is one.
 */
static struct crush_work *get_workspace(struct workspace_manager *wsm,
					const struct crush_map *c)
{
	struct crush_work *work;
	int cpus = num_online_cpus();

again:
	spin_lock(&wsm->ws_lock);
	if (!list_empty(&wsm->idle_ws)) {
		work = list_first_entry(&wsm->idle_ws, struct crush_work,
					item);
		list_del_init(&work->item);
		wsm->free_ws--;
		spin_unlock(&wsm->ws_lock);
		return work;

	}
	if (atomic_read(&wsm->total_ws) > cpus) {
		DEFINE_WAIT(wait);

		spin_unlock(&wsm->ws_lock);
		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
			schedule();
		finish_wait(&wsm->ws_wait, &wait);
		goto again;
	}
	atomic_inc(&wsm->total_ws);
	spin_unlock(&wsm->ws_lock);

	work = alloc_workspace(c);
	if (!work) {
		atomic_dec(&wsm->total_ws);
		wake_up(&wsm->ws_wait);

		/*
		 * Do not return the error but go back to waiting.  We
		 * have the inital workspace and the CRUSH computation
		 * time is bounded so we will get it eventually.
		 */
		WARN_ON(atomic_read(&wsm->total_ws) < 1);
		goto again;
	}
	return work;
}

/*
 * Puts a workspace back on the list or frees it if we have enough
 * idle ones sitting around.
 */
static void put_workspace(struct workspace_manager *wsm,
			  struct crush_work *work)
{
	spin_lock(&wsm->ws_lock);
	if (wsm->free_ws <= num_online_cpus()) {
		list_add(&work->item, &wsm->idle_ws);
		wsm->free_ws++;
		spin_unlock(&wsm->ws_lock);
		goto wake;
	}
	spin_unlock(&wsm->ws_lock);

	free_workspace(work);
	atomic_dec(&wsm->total_ws);
wake:
	if (wq_has_sleeper(&wsm->ws_wait))
		wake_up(&wsm->ws_wait);
}

/*
 * osd map
 */
@@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
	map->primary_temp = RB_ROOT;
	map->pg_upmap = RB_ROOT;
	map->pg_upmap_items = RB_ROOT;
	mutex_init(&map->crush_workspace_mutex);

	init_workspace_manager(&map->crush_wsm);

	return map;
}
@@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
void ceph_osdmap_destroy(struct ceph_osdmap *map)
{
	dout("osdmap_destroy %p\n", map);

	if (map->crush)
		crush_destroy(map->crush);
	cleanup_workspace_manager(&map->crush_wsm);

	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
		struct ceph_pg_mapping *pg =
			rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
	kvfree(map->osd_weight);
	kvfree(map->osd_addr);
	kvfree(map->osd_primary_affinity);
	kvfree(map->crush_workspace);
	kfree(map);
}

@@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)

static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
{
	void *workspace;
	size_t work_size;
	struct crush_work *work;

	if (IS_ERR(crush))
		return PTR_ERR(crush);

	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
	dout("%s work_size %zu bytes\n", __func__, work_size);
	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
	if (!workspace) {
	work = alloc_workspace(crush);
	if (!work) {
		crush_destroy(crush);
		return -ENOMEM;
	}
	crush_init_workspace(crush, workspace);

	if (map->crush)
		crush_destroy(map->crush);
	kvfree(map->crush_workspace);
	cleanup_workspace_manager(&map->crush_wsm);
	map->crush = crush;
	map->crush_workspace = workspace;
	add_initial_workspace(&map->crush_wsm, work);
	return 0;
}

@@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
		    s64 choose_args_index)
{
	struct crush_choose_arg_map *arg_map;
	struct crush_work *work;
	int r;

	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
						CEPH_DEFAULT_CHOOSE_ARGS);

	mutex_lock(&map->crush_workspace_mutex);
	work = get_workspace(&map->crush_wsm, map->crush);
	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
			  weight, weight_max, map->crush_workspace,
			  weight, weight_max, work,
			  arg_map ? arg_map->args : NULL);
	mutex_unlock(&map->crush_workspace_mutex);

	put_workspace(&map->crush_wsm, work);
	return r;
}