Commit fe33032d authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: add mount option to limit caps count



If number of caps exceed the limit, ceph_trim_dentires() also trim
dentries with valid leases. Trimming dentry releases references to
associated inode, which may evict inode and release caps.

By default, there is no limit for caps count.

Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Reviewed-by: default avatarJeff Layton <jlayton@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 37c4efc1
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -118,6 +118,10 @@ Mount Options
	of a non-responsive Ceph file system.  The default is 30
	seconds.

  caps_max=X
	Specify the maximum number of caps to hold. Unused caps are released
	when number of caps exceeds the limit. The default is 0 (no limit)

  rbytes
	When stat() is called on a directory, set st_size to 'rbytes',
	the summation of file sizes over all files nested beneath that
+26 −7
Original line number Diff line number Diff line
@@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc)
	spin_unlock(&mdsc->caps_list_lock);
}

void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
			      struct ceph_mount_options *fsopt)
{
	spin_lock(&mdsc->caps_list_lock);
	mdsc->caps_min_count += delta;
	BUG_ON(mdsc->caps_min_count < 0);
	mdsc->caps_min_count = fsopt->max_readdir;
	if (mdsc->caps_min_count < 1024)
		mdsc->caps_min_count = 1024;
	mdsc->caps_use_max = fsopt->caps_max;
	if (mdsc->caps_use_max > 0 &&
	    mdsc->caps_use_max < mdsc->caps_min_count)
		mdsc->caps_use_max = mdsc->caps_min_count;
	spin_unlock(&mdsc->caps_list_lock);
}

@@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
	if (!err) {
		BUG_ON(have + alloc != need);
		ctx->count = need;
		ctx->used = 0;
	}

	spin_lock(&mdsc->caps_list_lock);
@@ -297,11 +304,22 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
			 struct ceph_cap_reservation *ctx)
{
	bool reclaim = false;
	if (!ctx->count)
		return;

	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
	spin_lock(&mdsc->caps_list_lock);
	__ceph_unreserve_caps(mdsc, ctx->count);
	ctx->count = 0;

	if (mdsc->caps_use_max > 0 &&
	    mdsc->caps_use_count > mdsc->caps_use_max)
		reclaim = true;
	spin_unlock(&mdsc->caps_list_lock);

	if (reclaim)
		ceph_reclaim_caps_nr(mdsc, ctx->used);
}

struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +364,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
	BUG_ON(list_empty(&mdsc->caps_list));

	ctx->count--;
	ctx->used++;
	mdsc->caps_reserve_count--;
	mdsc->caps_use_count++;

@@ -500,12 +519,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
			       struct ceph_inode_info *ci)
{
	struct ceph_mount_options *ma = mdsc->fsc->mount_options;
	struct ceph_mount_options *opt = mdsc->fsc->mount_options;

	ci->i_hold_caps_min = round_jiffies(jiffies +
					    ma->caps_wanted_delay_min * HZ);
					    opt->caps_wanted_delay_min * HZ);
	ci->i_hold_caps_max = round_jiffies(jiffies +
					    ma->caps_wanted_delay_max * HZ);
					    opt->caps_wanted_delay_max * HZ);
	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
}
+19 −1
Original line number Diff line number Diff line
@@ -1224,6 +1224,7 @@ enum {

struct ceph_lease_walk_control {
	bool dir_lease;
	bool expire_dir_lease;
	unsigned long nr_to_scan;
	unsigned long dir_lease_ttl;
};
@@ -1345,7 +1346,13 @@ static int __dir_lease_check(struct dentry *dentry, void *arg)
		/* Move dentry to tail of dir lease list if we don't want
		 * to delete it. So dentries in the list are checked in a
		 * round robin manner */
		if (!lwc->expire_dir_lease)
			return TOUCH;
		if (dentry->d_lockref.count > 0 ||
		    (di->flags & CEPH_DENTRY_REFERENCED))
			return TOUCH;
		/* invalidate dir lease */
		di->lease_shared_gen = 0;
	}
	return DELETE;
}
@@ -1353,8 +1360,17 @@ static int __dir_lease_check(struct dentry *dentry, void *arg)
int ceph_trim_dentries(struct ceph_mds_client *mdsc)
{
	struct ceph_lease_walk_control lwc;
	unsigned long count;
	unsigned long freed;

	spin_lock(&mdsc->caps_list_lock);
        if (mdsc->caps_use_max > 0 &&
            mdsc->caps_use_count > mdsc->caps_use_max)
		count = mdsc->caps_use_count - mdsc->caps_use_max;
	else
		count = 0;
        spin_unlock(&mdsc->caps_list_lock);

	lwc.dir_lease = false;
	lwc.nr_to_scan  = CEPH_CAPS_PER_RELEASE * 2;
	freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check);
@@ -1365,6 +1381,8 @@ int ceph_trim_dentries(struct ceph_mds_client *mdsc)
		lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE;

	lwc.dir_lease = true;
	lwc.expire_dir_lease = freed < count;
	lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ;
	freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check);
	if (!lwc.nr_to_scan) /* more to check */
		return -EAGAIN;
+26 −8
Original line number Diff line number Diff line
@@ -1965,6 +1965,18 @@ void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
        }
}

void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
{
	int val;
	if (!nr)
		return;
	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
	if (!(val % CEPH_CAPS_PER_RELEASE)) {
		atomic_set(&mdsc->cap_reclaim_pending, 0);
		ceph_queue_cap_reclaim_work(mdsc);
	}
}

/*
 * requests
 */
@@ -2878,7 +2890,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
				    req->r_op == CEPH_MDS_OP_LSSNAP))
			ceph_readdir_prepopulate(req, req->r_session);
		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
	}
	current->journal_info = NULL;
	mutex_unlock(&req->r_fill_mutex);
@@ -2887,13 +2898,19 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
	if (realm)
		ceph_put_snap_realm(mdsc, realm);

	if (err == 0 && req->r_target_inode &&
	if (err == 0) {
		if (req->r_target_inode &&
		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
			struct ceph_inode_info *ci =
				ceph_inode(req->r_target_inode);
			spin_lock(&ci->i_unsafe_lock);
		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
			list_add_tail(&req->r_unsafe_target_item,
				      &ci->i_unsafe_iops);
			spin_unlock(&ci->i_unsafe_lock);
		}

		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
	}
out_err:
	mutex_lock(&mdsc->mutex);
	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
@@ -4083,13 +4100,14 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
	spin_lock_init(&mdsc->cap_dirty_lock);
	init_waitqueue_head(&mdsc->cap_flushing_wq);
	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
	atomic_set(&mdsc->cap_reclaim_pending, 0);

	spin_lock_init(&mdsc->dentry_list_lock);
	INIT_LIST_HEAD(&mdsc->dentry_leases);
	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);

	ceph_caps_init(mdsc);
	ceph_adjust_min_caps(mdsc, fsc->min_caps);
	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);

	spin_lock_init(&mdsc->snapid_map_lock);
	mdsc->snapid_map_tree = RB_ROOT;
+3 −0
Original line number Diff line number Diff line
@@ -379,6 +379,7 @@ struct ceph_mds_client {
	wait_queue_head_t cap_flushing_wq;

	struct work_struct cap_reclaim_work;
	atomic_t	   cap_reclaim_pending;

	/*
	 * Cap reservations
@@ -396,6 +397,7 @@ struct ceph_mds_client {
						unreserved) */
	int		caps_total_count;    /* total caps allocated */
	int		caps_use_count;      /* in use */
	int		caps_use_max;	     /* max used caps */
	int		caps_reserve_count;  /* unused, reserved */
	int		caps_avail_count;    /* unused, unreserved */
	int		caps_min_count;      /* keep at least this many
@@ -465,6 +467,7 @@ extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
				    struct ceph_mds_session *session);
extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);

extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
Loading