Commit 786da5da authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'ceph-for-5.20-rc1' of https://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "We have a good pile of various fixes and cleanups from Xiubo, Jeff,
  Luis and others, almost exclusively in the filesystem.

  Several patches touch files outside of our normal purview to set the
  stage for bringing in Jeff's long awaited ceph+fscrypt series in the
  near future. All of them have appropriate acks and sat in linux-next
  for a while"

* tag 'ceph-for-5.20-rc1' of https://github.com/ceph/ceph-client: (27 commits)
  libceph: clean up ceph_osdc_start_request prototype
  libceph: fix ceph_pagelist_reserve() comment typo
  ceph: remove useless check for the folio
  ceph: don't truncate file in atomic_open
  ceph: make f_bsize always equal to f_frsize
  ceph: flush the dirty caps immediatelly when quota is approaching
  libceph: print fsid and epoch with osd id
  libceph: check pointer before assigned to "c->rules[]"
  ceph: don't get the inline data for new creating files
  ceph: update the auth cap when the async create req is forwarded
  ceph: make change_auth_cap_ses a global symbol
  ceph: fix incorrect old_size length in ceph_mds_request_args
  ceph: switch back to testing for NULL folio->private in ceph_dirty_folio
  ceph: call netfs_subreq_terminated with was_async == false
  ceph: convert to generic_file_llseek
  ceph: fix the incorrect comment for the ceph_mds_caps struct
  ceph: don't leak snap_rwsem in handle_cap_grant
  ceph: prevent a client from exceeding the MDS maximum xattr size
  ceph: choose auth MDS for getxattr with the Xs caps
  ceph: add session already open notify support
  ...
parents e18a9042 a8af0d68
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -1297,7 +1297,7 @@ static void rbd_osd_submit(struct ceph_osd_request *osd_req)
	dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
	     __func__, osd_req, obj_req, obj_req->ex.oe_objno,
	     obj_req->ex.oe_off, obj_req->ex.oe_len);
	ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
	ceph_osdc_start_request(osd_req->r_osdc, osd_req);
}

/*
@@ -2081,7 +2081,7 @@ static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
	if (ret)
		return ret;

	ceph_osdc_start_request(osdc, req, false);
	ceph_osdc_start_request(osdc, req);
	return 0;
}

@@ -4768,7 +4768,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
	if (ret)
		goto out_req;

	ceph_osdc_start_request(osdc, req, false);
	ceph_osdc_start_request(osdc, req);
	ret = ceph_osdc_wait_request(osdc, req);
	if (ret >= 0)
		ceph_copy_from_page_vector(pages, buf, 0, ret);
+24 −35
Original line number Diff line number Diff line
@@ -122,7 +122,7 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio)
	 * Reference snap context in folio->private.  Also set
	 * PagePrivate so that we get invalidate_folio callback.
	 */
	VM_BUG_ON_FOLIO(folio_test_private(folio), folio);
	VM_WARN_ON_FOLIO(folio->private, folio);
	folio_attach_private(folio, snapc);

	return ceph_fscache_dirty_folio(mapping, folio);
@@ -237,7 +237,7 @@ static void finish_netfs_read(struct ceph_osd_request *req)
	if (err >= 0 && err < subreq->len)
		__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);

	netfs_subreq_terminated(subreq, err, true);
	netfs_subreq_terminated(subreq, err, false);

	num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
	ceph_put_page_vector(osd_data->pages, num_pages, false);
@@ -313,8 +313,7 @@ static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
	int err = 0;
	u64 len = subreq->len;

	if (ci->i_inline_version != CEPH_INLINE_NONE &&
	    ceph_netfs_issue_op_inline(subreq))
	if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq))
		return;

	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
@@ -338,6 +337,7 @@ static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
	/* should always give us a page-aligned read */
	WARN_ON_ONCE(page_off);
	len = err;
	err = 0;

	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
	req->r_callback = finish_netfs_read;
@@ -345,9 +345,7 @@ static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
	req->r_inode = inode;
	ihold(inode);

	err = ceph_osdc_start_request(req->r_osdc, req, false);
	if (err)
		iput(inode);
	ceph_osdc_start_request(req->r_osdc, req);
out:
	ceph_osdc_put_request(req);
	if (err)
@@ -621,8 +619,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);

	req->r_mtime = inode->i_mtime;
	err = ceph_osdc_start_request(osdc, req, true);
	if (!err)
	ceph_osdc_start_request(osdc, req);
	err = ceph_osdc_wait_request(osdc, req);

	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
@@ -1151,8 +1148,7 @@ static int ceph_writepages_start(struct address_space *mapping,
		}

		req->r_mtime = inode->i_mtime;
		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
		BUG_ON(rc);
		ceph_osdc_start_request(&fsc->client->osdc, req);
		req = NULL;

		wbc->nr_to_write -= i;
@@ -1327,16 +1323,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
	int r;

	r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, &folio, NULL);
	if (r == 0)
	if (r < 0)
		return r;

	folio_wait_fscache(folio);
	if (r < 0) {
		if (folio)
			folio_put(folio);
	} else {
	WARN_ON_ONCE(!folio_test_locked(folio));
	*pagep = &folio->page;
	}
	return r;
	return 0;
}

/*
@@ -1439,7 +1432,7 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
	     inode, off, ceph_cap_string(got));

	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
	    ci->i_inline_version == CEPH_INLINE_NONE) {
	    !ceph_has_inline_data(ci)) {
		CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
		ceph_add_rw_context(fi, &rw_ctx);
		ret = filemap_fault(vmf);
@@ -1696,8 +1689,7 @@ int ceph_uninline_data(struct file *file)
	}

	req->r_mtime = inode->i_mtime;
	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!err)
	ceph_osdc_start_request(&fsc->client->osdc, req);
	err = ceph_osdc_wait_request(&fsc->client->osdc, req);
	ceph_osdc_put_request(req);
	if (err < 0)
@@ -1739,8 +1731,7 @@ int ceph_uninline_data(struct file *file)
	}

	req->r_mtime = inode->i_mtime;
	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!err)
	ceph_osdc_start_request(&fsc->client->osdc, req);
	err = ceph_osdc_wait_request(&fsc->client->osdc, req);

	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
@@ -1912,14 +1903,12 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,

	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
				     0, false, true);
	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
	ceph_osdc_start_request(&fsc->client->osdc, rd_req);

	wr_req->r_mtime = ci->netfs.inode.i_mtime;
	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
	ceph_osdc_start_request(&fsc->client->osdc, wr_req);

	if (!err)
	err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
	if (!err2)
	err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);

	if (err >= 0 || err == -ENOENT)
+19 −19
Original line number Diff line number Diff line
@@ -602,7 +602,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 * @ci: inode to be moved
 * @session: new auth caps session
 */
static void change_auth_cap_ses(struct ceph_inode_info *ci,
void change_auth_cap_ses(struct ceph_inode_info *ci,
			 struct ceph_mds_session *session)
{
	lockdep_assert_held(&ci->i_ceph_lock);
@@ -1978,14 +1978,15 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
	}

	dout("check_caps %llx.%llx file_want %s used %s dirty %s flushing %s"
	     " issued %s revoking %s retain %s %s%s\n", ceph_vinop(inode),
	     " issued %s revoking %s retain %s %s%s%s\n", ceph_vinop(inode),
	     ceph_cap_string(file_wanted),
	     ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
	     ceph_cap_string(ci->i_flushing_caps),
	     ceph_cap_string(issued), ceph_cap_string(revoking),
	     ceph_cap_string(retain),
	     (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
	     (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
	     (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "",
	     (flags & CHECK_CAPS_NOINVAL) ? " NOINVAL" : "");

	/*
	 * If we no longer need to hold onto old our caps, and we may
@@ -3005,7 +3006,7 @@ int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got
		}

		if (S_ISREG(ci->netfs.inode.i_mode) &&
		    ci->i_inline_version != CEPH_INLINE_NONE &&
		    ceph_has_inline_data(ci) &&
		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
		    i_size_read(inode) > 0) {
			struct page *page =
@@ -3578,8 +3579,8 @@ static void handle_cap_grant(struct inode *inode,
			fill_inline = true;
	}

	if (ci->i_auth_cap == cap &&
	    le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
		if (ci->i_auth_cap == cap) {
			if (newcaps & ~extra_info->issued)
				wake = true;

@@ -3591,11 +3592,10 @@ static void handle_cap_grant(struct inode *inode,
			}

			ceph_kick_flushing_inode_caps(session, ci);
		spin_unlock(&ci->i_ceph_lock);
		}
		up_read(&session->s_mdsc->snap_rwsem);
	} else {
		spin_unlock(&ci->i_ceph_lock);
	}
	spin_unlock(&ci->i_ceph_lock);

	if (fill_inline)
		ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
+70 −9
Original line number Diff line number Diff line
@@ -856,6 +856,10 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir,
	if (ceph_snap(dir) != CEPH_NOSNAP)
		return -EROFS;

	err = ceph_wait_on_conflict_unlink(dentry);
	if (err)
		return err;

	if (ceph_quota_is_max_files_exceeded(dir)) {
		err = -EDQUOT;
		goto out;
@@ -918,6 +922,10 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir,
	if (ceph_snap(dir) != CEPH_NOSNAP)
		return -EROFS;

	err = ceph_wait_on_conflict_unlink(dentry);
	if (err)
		return err;

	if (ceph_quota_is_max_files_exceeded(dir)) {
		err = -EDQUOT;
		goto out;
@@ -968,9 +976,13 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
	struct ceph_mds_request *req;
	struct ceph_acl_sec_ctx as_ctx = {};
	int err = -EROFS;
	int err;
	int op;

	err = ceph_wait_on_conflict_unlink(dentry);
	if (err)
		return err;

	if (ceph_snap(dir) == CEPH_SNAPDIR) {
		/* mkdir .snap/foo is a MKSNAP */
		op = CEPH_MDS_OP_MKSNAP;
@@ -980,6 +992,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
		dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
		op = CEPH_MDS_OP_MKDIR;
	} else {
		err = -EROFS;
		goto out;
	}

@@ -1037,6 +1050,10 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
	struct ceph_mds_request *req;
	int err;

	err = ceph_wait_on_conflict_unlink(dentry);
	if (err)
		return err;

	if (ceph_snap(dir) != CEPH_NOSNAP)
		return -EROFS;

@@ -1071,9 +1088,27 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
				 struct ceph_mds_request *req)
{
	struct dentry *dentry = req->r_dentry;
	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
	struct ceph_dentry_info *di = ceph_dentry(dentry);
	int result = req->r_err ? req->r_err :
			le32_to_cpu(req->r_reply_info.head->result);

	if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
		pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
			__func__, dentry, dentry);

	spin_lock(&fsc->async_unlink_conflict_lock);
	hash_del_rcu(&di->hnode);
	spin_unlock(&fsc->async_unlink_conflict_lock);

	spin_lock(&dentry->d_lock);
	di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
	wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT);
	spin_unlock(&dentry->d_lock);

	synchronize_rcu();

	if (result == -EJUKEBOX)
		goto out;

@@ -1081,7 +1116,7 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
	if (result) {
		int pathlen = 0;
		u64 base = 0;
		char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
		char *path = ceph_mdsc_build_path(dentry, &pathlen,
						  &base, 0);

		/* mark error on parent + clear complete */
@@ -1089,13 +1124,13 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
		ceph_dir_clear_complete(req->r_parent);

		/* drop the dentry -- we don't know its status */
		if (!d_unhashed(req->r_dentry))
			d_drop(req->r_dentry);
		if (!d_unhashed(dentry))
			d_drop(dentry);

		/* mark inode itself for an error (since metadata is bogus) */
		mapping_set_error(req->r_old_inode->i_mapping, result);

		pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
		pr_warn("async unlink failure path=(%llx)%s result=%d!\n",
			base, IS_ERR(path) ? "<<bad>>" : path, result);
		ceph_mdsc_free_path(path, pathlen);
	}
@@ -1180,6 +1215,8 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)

	if (try_async && op == CEPH_MDS_OP_UNLINK &&
	    (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
		struct ceph_dentry_info *di = ceph_dentry(dentry);

		dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir),
		     dentry->d_name.len, dentry->d_name.name,
		     ceph_cap_string(req->r_dir_caps));
@@ -1187,6 +1224,16 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
		req->r_callback = ceph_async_unlink_cb;
		req->r_old_inode = d_inode(dentry);
		ihold(req->r_old_inode);

		spin_lock(&dentry->d_lock);
		di->flags |= CEPH_DENTRY_ASYNC_UNLINK;
		spin_unlock(&dentry->d_lock);

		spin_lock(&fsc->async_unlink_conflict_lock);
		hash_add_rcu(fsc->async_unlink_conflict, &di->hnode,
			     dentry->d_name.hash);
		spin_unlock(&fsc->async_unlink_conflict_lock);

		err = ceph_mdsc_submit_request(mdsc, dir, req);
		if (!err) {
			/*
@@ -1195,11 +1242,21 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
			 */
			drop_nlink(inode);
			d_delete(dentry);
		} else if (err == -EJUKEBOX) {
		} else {
			spin_lock(&fsc->async_unlink_conflict_lock);
			hash_del_rcu(&di->hnode);
			spin_unlock(&fsc->async_unlink_conflict_lock);

			spin_lock(&dentry->d_lock);
			di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
			spin_unlock(&dentry->d_lock);

			if (err == -EJUKEBOX) {
				try_async = false;
				ceph_mdsc_put_request(req);
				goto retry;
			}
		}
	} else {
		set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
		err = ceph_mdsc_do_request(mdsc, dir, req);
@@ -1237,6 +1294,10 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir,
	    (!ceph_quota_is_same_realm(old_dir, new_dir)))
		return -EXDEV;

	err = ceph_wait_on_conflict_unlink(new_dentry);
	if (err)
		return err;

	dout("rename dir %p dentry %p to dir %p dentry %p\n",
	     old_dir, old_dentry, new_dir, new_dentry);
	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
+48 −75
Original line number Diff line number Diff line
@@ -240,8 +240,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
	INIT_LIST_HEAD(&fi->rw_contexts);
	fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);

	if ((file->f_mode & FMODE_WRITE) &&
	    ci->i_inline_version != CEPH_INLINE_NONE) {
	if ((file->f_mode & FMODE_WRITE) && ceph_has_inline_data(ci)) {
		ret = ceph_uninline_data(file);
		if (ret < 0)
			goto error;
@@ -568,7 +567,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
		char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
						  &base, 0);

		pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
		pr_warn("async create failure path=(%llx)%s result=%d!\n",
			base, IS_ERR(path) ? "<<bad>>" : path, result);
		ceph_mdsc_free_path(path, pathlen);

@@ -611,6 +610,7 @@ static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
	struct ceph_mds_reply_inode in = { };
	struct ceph_mds_reply_info_in iinfo = { .in = &in };
	struct ceph_inode_info *ci = ceph_inode(dir);
	struct ceph_dentry_info *di = ceph_dentry(dentry);
	struct inode *inode;
	struct timespec64 now;
	struct ceph_string *pool_ns;
@@ -709,6 +709,12 @@ static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
		file->f_mode |= FMODE_CREATED;
		ret = finish_open(file, dentry, ceph_open);
	}

	spin_lock(&dentry->d_lock);
	di->flags &= ~CEPH_DENTRY_ASYNC_CREATE;
	wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT);
	spin_unlock(&dentry->d_lock);

	return ret;
}

@@ -735,6 +741,15 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
	if (dentry->d_name.len > NAME_MAX)
		return -ENAMETOOLONG;

	err = ceph_wait_on_conflict_unlink(dentry);
	if (err)
		return err;
	/*
	 * Do not truncate the file, since atomic_open is called before the
	 * permission check. The caller will do the truncation afterward.
	 */
	flags &= ~O_TRUNC;

	if (flags & O_CREAT) {
		if (ceph_quota_is_max_files_exceeded(dir))
			return -EDQUOT;
@@ -781,9 +796,16 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
		    (req->r_dir_caps =
		      try_prep_async_create(dir, dentry, &lo,
					    &req->r_deleg_ino))) {
			struct ceph_dentry_info *di = ceph_dentry(dentry);

			set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
			req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
			req->r_callback = ceph_async_create_cb;

			spin_lock(&dentry->d_lock);
			di->flags |= CEPH_DENTRY_ASYNC_CREATE;
			spin_unlock(&dentry->d_lock);

			err = ceph_mdsc_submit_request(mdsc, dir, req);
			if (!err) {
				err = ceph_finish_async_create(dir, dentry,
@@ -802,9 +824,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
	}

	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
	err = ceph_mdsc_do_request(mdsc,
				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
				   req);
	err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req);
	if (err == -ENOENT) {
		dentry = ceph_handle_snapdir(req, dentry);
		if (IS_ERR(dentry)) {
@@ -960,8 +980,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,

		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
						 false, false);
		ret = ceph_osdc_start_request(osdc, req, false);
		if (!ret)
		ceph_osdc_start_request(osdc, req);
		ret = ceph_osdc_wait_request(osdc, req);

		ceph_update_read_metrics(&fsc->mdsc->metric,
@@ -1225,7 +1244,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
	req->r_inode = inode;
	req->r_priv = aio_req;

	ret = ceph_osdc_start_request(req->r_osdc, req, false);
	ceph_osdc_start_request(req->r_osdc, req);
out:
	if (ret < 0) {
		req->r_result = ret;
@@ -1362,8 +1381,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
			continue;
		}

		ret = ceph_osdc_start_request(req->r_osdc, req, false);
		if (!ret)
		ceph_osdc_start_request(req->r_osdc, req);
		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);

		if (write)
@@ -1427,8 +1445,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
					       r_private_item);
			list_del_init(&req->r_private_item);
			if (ret >= 0)
				ret = ceph_osdc_start_request(req->r_osdc,
							      req, false);
				ceph_osdc_start_request(req->r_osdc, req);
			if (ret < 0) {
				req->r_result = ret;
				ceph_aio_complete_req(req);
@@ -1541,8 +1558,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
						false, true);

		req->r_mtime = mtime;
		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
		if (!ret)
		ceph_osdc_start_request(&fsc->client->osdc, req);
		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);

		ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
@@ -1627,7 +1643,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
		     ceph_cap_string(got));

		if (ci->i_inline_version == CEPH_INLINE_NONE) {
		if (!ceph_has_inline_data(ci)) {
			if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
				ret = ceph_direct_read_write(iocb, to,
							     NULL, NULL);
@@ -1890,7 +1906,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
		if (dirty)
			__mark_inode_dirty(inode, dirty);
		if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
			ceph_check_caps(ci, 0, NULL);
			ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL);
	}

	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
@@ -1930,58 +1946,16 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 */
static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
{
	struct inode *inode = file->f_mapping->host;
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	loff_t i_size;
	loff_t ret;

	inode_lock(inode);

	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
		struct inode *inode = file_inode(file);
		int ret;

		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
		if (ret < 0)
			goto out;
	}

	i_size = i_size_read(inode);
	switch (whence) {
	case SEEK_END:
		offset += i_size;
		break;
	case SEEK_CUR:
		/*
		 * Here we special-case the lseek(fd, 0, SEEK_CUR)
		 * position-querying operation.  Avoid rewriting the "same"
		 * f_pos value back to the file because a concurrent read(),
		 * write() or lseek() might have altered it
		 */
		if (offset == 0) {
			ret = file->f_pos;
			goto out;
		}
		offset += file->f_pos;
		break;
	case SEEK_DATA:
		if (offset < 0 || offset >= i_size) {
			ret = -ENXIO;
			goto out;
		}
		break;
	case SEEK_HOLE:
		if (offset < 0 || offset >= i_size) {
			ret = -ENXIO;
			goto out;
		}
		offset = i_size;
		break;
	}

	ret = vfs_setpos(file, offset, max(i_size, fsc->max_file_size));

out:
	inode_unlock(inode);
			return ret;
	}
	return generic_file_llseek(file, offset, whence);
}

static inline void ceph_zero_partial_page(
	struct inode *inode, loff_t offset, unsigned size)
@@ -2049,12 +2023,10 @@ static int ceph_zero_partial_object(struct inode *inode,
	}

	req->r_mtime = inode->i_mtime;
	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
	if (!ret) {
	ceph_osdc_start_request(&fsc->client->osdc, req);
	ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
	if (ret == -ENOENT)
		ret = 0;
	}
	ceph_osdc_put_request(req);

out:
@@ -2356,7 +2328,7 @@ static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off
		if (IS_ERR(req))
			ret = PTR_ERR(req);
		else {
			ceph_osdc_start_request(osdc, req, false);
			ceph_osdc_start_request(osdc, req);
			ret = ceph_osdc_wait_request(osdc, req);
			ceph_update_copyfrom_metrics(&fsc->mdsc->metric,
						     req->r_start_latency,
@@ -2549,7 +2521,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
		/* Let the MDS know about dst file size change */
		if (ceph_inode_set_size(dst_inode, dst_off) ||
		    ceph_quota_is_max_bytes_approaching(dst_inode, dst_off))
			ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY, NULL);
			ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_FLUSH,
					NULL);
	}
	/* Mark Fw dirty */
	spin_lock(&dst_ci->i_ceph_lock);
Loading