Commit ed7cfefe authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'ceph-for-5.10-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:

 - a patch that removes crush_workspace_mutex (myself). CRUSH
   computations are no longer serialized and can run in parallel.

 - a couple new filesystem client metrics for "ceph fs top" command
   (Xiubo Li)

 - a fix for a very old messenger bug that affected the filesystem,
   marked for stable (myself)

 - assorted fixups and cleanups throughout the codebase from Jeff and
   others.

* tag 'ceph-for-5.10-rc1' of git://github.com/ceph/ceph-client: (27 commits)
  libceph: clear con->out_msg on Policy::stateful_server faults
  libceph: format ceph_entity_addr nonces as unsigned
  libceph: fix ENTITY_NAME format suggestion
  libceph: move a dout in queue_con_delay()
  ceph: comment cleanups and clarifications
  ceph: break up send_cap_msg
  ceph: drop separate mdsc argument from __send_cap
  ceph: promote to unsigned long long before shifting
  ceph: don't SetPageError on readpage errors
  ceph: mark ceph_fmt_xattr() as printf-like for better type checking
  ceph: fold ceph_update_writeable_page into ceph_write_begin
  ceph: fold ceph_sync_writepages into writepage_nounlock
  ceph: fold ceph_sync_readpages into ceph_readpage
  ceph: don't call ceph_update_writeable_page from page_mkwrite
  ceph: break out writeback of incompatible snap context to separate function
  ceph: add a note explaining session reject error string
  libceph: switch to the new "osd blocklist add" command
  libceph, rbd, ceph: "blacklist" -> "blocklist"
  ceph: have ceph_writepages_start call pagevec_lookup_range_tag
  ceph: use kill_anon_super helper
  ...
parents c4d6fe73 28e1581c
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -163,14 +163,14 @@ Mount Options
        to the default VFS implementation if this option is used.

  recover_session=<no|clean>
	Set auto reconnect mode in the case where the client is blacklisted. The
	Set auto reconnect mode in the case where the client is blocklisted. The
	available modes are "no" and "clean". The default is "no".

	* no: never attempt to reconnect when client detects that it has been
	  blacklisted. Operations will generally fail after being blacklisted.
	  blocklisted. Operations will generally fail after being blocklisted.

	* clean: client reconnects to the ceph cluster automatically when it
	  detects that it has been blacklisted. During reconnect, client drops
	  detects that it has been blocklisted. During reconnect, client drops
	  dirty data/metadata, invalidates page caches and writable file handles.
	  After reconnect, file locks become stale because the MDS loses track
	  of them. If an inode contains any stale file locks, read/write on the
+4 −4
Original line number Diff line number Diff line
@@ -4010,10 +4010,10 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
		rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
			 ENTITY_NAME(lockers[0].id.name));

		ret = ceph_monc_blacklist_add(&client->monc,
		ret = ceph_monc_blocklist_add(&client->monc,
					      &lockers[0].info.addr);
		if (ret) {
			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
			rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
				 ENTITY_NAME(lockers[0].id.name), ret);
			goto out;
		}
@@ -4077,7 +4077,7 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
	ret = rbd_try_lock(rbd_dev);
	if (ret < 0) {
		rbd_warn(rbd_dev, "failed to lock header: %d", ret);
		if (ret == -EBLACKLISTED)
		if (ret == -EBLOCKLISTED)
			goto out;

		ret = 1; /* request lock anyway */
@@ -4613,7 +4613,7 @@ static void rbd_reregister_watch(struct work_struct *work)
	ret = __rbd_register_watch(rbd_dev);
	if (ret) {
		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
		if (ret != -EBLACKLISTED && ret != -ENOENT) {
		if (ret != -EBLOCKLISTED && ret != -ENOENT) {
			queue_delayed_work(rbd_dev->task_wq,
					   &rbd_dev->watch_dwork,
					   RBD_RETRY_DELAY);
+190 −226
Original line number Diff line number Diff line
@@ -182,58 +182,15 @@ static int ceph_releasepage(struct page *page, gfp_t g)
	return !PagePrivate(page);
}

/*
 * Read some contiguous pages.  If we cross a stripe boundary, shorten
 * *plen.  Return number of bytes read, or error.
 */
static int ceph_sync_readpages(struct ceph_fs_client *fsc,
			       struct ceph_vino vino,
			       struct ceph_file_layout *layout,
			       u64 off, u64 *plen,
			       u32 truncate_seq, u64 truncate_size,
			       struct page **pages, int num_pages,
			       int page_align)
{
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;
	int rc = 0;

	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
	     vino.snap, off, *plen);
	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
				    NULL, truncate_seq, truncate_size,
				    false);
	if (IS_ERR(req))
		return PTR_ERR(req);

	/* it may be a short read due to an object boundary */
	osd_req_op_extent_osd_data_pages(req, 0,
				pages, *plen, page_align, false, false);

	dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
	     off, *plen, *plen, page_align);

	rc = ceph_osdc_start_request(osdc, req, false);
	if (!rc)
		rc = ceph_osdc_wait_request(osdc, req);

	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
				 req->r_end_latency, rc);

	ceph_osdc_put_request(req);
	dout("readpages result %d\n", rc);
	return rc;
}

/*
 * read a single page, without unlocking it.
 */
/* read a single page, without unlocking it. */
static int ceph_do_readpage(struct file *filp, struct page *page)
{
	struct inode *inode = file_inode(filp);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;
	struct ceph_vino vino = ceph_vino(inode);
	int err = 0;
	u64 off = page_offset(page);
	u64 len = PAGE_SIZE;
@@ -260,19 +217,33 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
	if (err == 0)
		return -EINPROGRESS;

	dout("readpage inode %p file %p page %p index %lu\n",
	     inode, filp, page, page->index);
	err = ceph_sync_readpages(fsc, ceph_vino(inode),
				  &ci->i_layout, off, &len,
	dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
	     vino.ino, vino.snap, filp, off, len, page, page->index);
	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
				    ci->i_truncate_seq, ci->i_truncate_size,
				  &page, 1, 0);
				    false);
	if (IS_ERR(req))
		return PTR_ERR(req);

	osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);

	err = ceph_osdc_start_request(osdc, req, false);
	if (!err)
		err = ceph_osdc_wait_request(osdc, req);

	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
				 req->r_end_latency, err);

	ceph_osdc_put_request(req);
	dout("readpage result %d\n", err);

	if (err == -ENOENT)
		err = 0;
	if (err < 0) {
		SetPageError(page);
		ceph_fscache_readpage_cancel(inode, page);
		if (err == -EBLACKLISTED)
			fsc->blacklisted = true;
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
		goto out;
	}
	if (err < PAGE_SIZE)
@@ -312,8 +283,8 @@ static void finish_read(struct ceph_osd_request *req)
	int i;

	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
	if (rc == -EBLACKLISTED)
		ceph_inode_to_client(inode)->blacklisted = true;
	if (rc == -EBLOCKLISTED)
		ceph_inode_to_client(inode)->blocklisted = true;

	/* unlock all pages, zeroing any data we didn't read */
	osd_data = osd_req_op_extent_osd_data(req, 0);
@@ -619,50 +590,6 @@ static u64 get_writepages_data_length(struct inode *inode,
	return end > start ? end - start : 0;
}

/*
 * do a synchronous write on N pages
 */
static int ceph_sync_writepages(struct ceph_fs_client *fsc,
				struct ceph_vino vino,
				struct ceph_file_layout *layout,
				struct ceph_snap_context *snapc,
				u64 off, u64 len,
				u32 truncate_seq, u64 truncate_size,
				struct timespec64 *mtime,
				struct page **pages, int num_pages)
{
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;
	int rc = 0;
	int page_align = off & ~PAGE_MASK;

	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
				    snapc, truncate_seq, truncate_size,
				    true);
	if (IS_ERR(req))
		return PTR_ERR(req);

	/* it may be a short write due to an object boundary */
	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
				false, false);
	dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);

	req->r_mtime = *mtime;
	rc = ceph_osdc_start_request(osdc, req, true);
	if (!rc)
		rc = ceph_osdc_wait_request(osdc, req);

	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
				  req->r_end_latency, rc);

	ceph_osdc_put_request(req);
	if (rc == 0)
		rc = len;
	dout("writepages result %d\n", rc);
	return rc;
}

/*
 * Write a single page, but leave the page locked.
 *
@@ -671,20 +598,19 @@ static int ceph_sync_writepages(struct ceph_fs_client *fsc,
 */
static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
{
	struct inode *inode;
	struct ceph_inode_info *ci;
	struct ceph_fs_client *fsc;
	struct inode *inode = page->mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_snap_context *snapc, *oldest;
	loff_t page_off = page_offset(page);
	int err, len = PAGE_SIZE;
	int err;
	loff_t len = PAGE_SIZE;
	struct ceph_writeback_ctl ceph_wbc;
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	struct ceph_osd_request *req;

	dout("writepage %p idx %lu\n", page, page->index);

	inode = page->mapping->host;
	ci = ceph_inode(inode);
	fsc = ceph_inode_to_client(inode);

	/* verify this is a writeable snap context */
	snapc = page_snap_context(page);
	if (!snapc) {
@@ -713,7 +639,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	if (ceph_wbc.i_size < page_off + len)
		len = ceph_wbc.i_size - page_off;

	dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
	dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
	     inode, page, page->index, page_off, len, snapc, snapc->seq);

	if (atomic_long_inc_return(&fsc->writeback_count) >
@@ -721,11 +647,33 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);

	set_page_writeback(page);
	err = ceph_sync_writepages(fsc, ceph_vino(inode),
				   &ci->i_layout, snapc, page_off, len,
				   ceph_wbc.truncate_seq,
				   ceph_wbc.truncate_size,
				   &inode->i_mtime, &page, 1);
	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
				    ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
				    true);
	if (IS_ERR(req)) {
		redirty_page_for_writepage(wbc, page);
		end_page_writeback(page);
		return PTR_ERR(req);
	}

	/* it may be a short write due to an object boundary */
	WARN_ON_ONCE(len > PAGE_SIZE);
	osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
	dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);

	req->r_mtime = inode->i_mtime;
	err = ceph_osdc_start_request(osdc, req, true);
	if (!err)
		err = ceph_osdc_wait_request(osdc, req);

	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
				  req->r_end_latency, err);

	ceph_osdc_put_request(req);
	if (err == 0)
		err = len;

	if (err < 0) {
		struct writeback_control tmp_wbc;
		if (!wbc)
@@ -737,8 +685,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
			end_page_writeback(page);
			return err;
		}
		if (err == -EBLACKLISTED)
			fsc->blacklisted = true;
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
		dout("writepage setting page/mapping error %d %p\n",
		     err, page);
		mapping_set_error(&inode->i_data, err);
@@ -801,8 +749,8 @@ static void writepages_finish(struct ceph_osd_request *req)
	if (rc < 0) {
		mapping_set_error(mapping, rc);
		ceph_set_error_write(ci);
		if (rc == -EBLACKLISTED)
			fsc->blacklisted = true;
		if (rc == -EBLOCKLISTED)
			fsc->blocklisted = true;
	} else {
		ceph_clear_error_write(ci);
	}
@@ -962,9 +910,8 @@ static int ceph_writepages_start(struct address_space *mapping,
		max_pages = wsize >> PAGE_SHIFT;

get_more_pages:
		pvec_pages = pagevec_lookup_range_nr_tag(&pvec, mapping, &index,
						end, PAGECACHE_TAG_DIRTY,
						max_pages - locked_pages);
		pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
						end, PAGECACHE_TAG_DIRTY);
		dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
		if (!pvec_pages && !locked_pages)
			break;
@@ -1299,110 +1246,60 @@ static int context_is_writeable_or_written(struct inode *inode,
	return ret;
}

/*
 * We are only allowed to write into/dirty the page if the page is
 * clean, or already dirty within the same snap context.
/**
 * ceph_find_incompatible - find an incompatible context and return it
 * @page: page being dirtied
 *
 * called with page locked.
 * return success with page locked,
 * or any failure (incl -EAGAIN) with page unlocked.
 * We are only allowed to write into/dirty a page if the page is
 * clean, or already dirty within the same snap context. Returns a
 * conflicting context if there is one, NULL if there isn't, or a
 * negative error code on other errors.
 *
 * Must be called with page lock held.
 */
static int ceph_update_writeable_page(struct file *file,
			    loff_t pos, unsigned len,
			    struct page *page)
static struct ceph_snap_context *
ceph_find_incompatible(struct page *page)
{
	struct inode *inode = file_inode(file);
	struct inode *inode = page->mapping->host;
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_inode_info *ci = ceph_inode(inode);
	loff_t page_off = pos & PAGE_MASK;
	int pos_in_page = pos & ~PAGE_MASK;
	int end_in_page = pos_in_page + len;
	loff_t i_size;
	int r;
	struct ceph_snap_context *snapc, *oldest;

	if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
		dout(" page %p forced umount\n", page);
		unlock_page(page);
		return -EIO;
		return ERR_PTR(-EIO);
	}

retry_locked:
	/* writepages currently holds page lock, but if we change that later, */
	for (;;) {
		struct ceph_snap_context *snapc, *oldest;

		wait_on_page_writeback(page);

		snapc = page_snap_context(page);
	if (snapc && snapc != ci->i_head_snapc) {
		if (!snapc || snapc == ci->i_head_snapc)
			break;

		/*
		 * this page is already dirty in another (older) snap
		 * context!  is it writeable now?
		 */
		oldest = get_oldest_context(inode, NULL, NULL);
		if (snapc->seq > oldest->seq) {
			/* not writeable -- return it for the caller to deal with */
			ceph_put_snap_context(oldest);
			dout(" page %p snapc %p not current or oldest\n",
			     page, snapc);
			/*
			 * queue for writeback, and wait for snapc to
			 * be writeable or written
			 */
			snapc = ceph_get_snap_context(snapc);
			unlock_page(page);
			ceph_queue_writeback(inode);
			r = wait_event_killable(ci->i_cap_wq,
			       context_is_writeable_or_written(inode, snapc));
			ceph_put_snap_context(snapc);
			if (r == -ERESTARTSYS)
				return r;
			return -EAGAIN;
			dout(" page %p snapc %p not current or oldest\n", page, snapc);
			return ceph_get_snap_context(snapc);
		}
		ceph_put_snap_context(oldest);

		/* yay, writeable, do it now (without dropping page lock) */
		dout(" page %p snapc %p not current, but oldest\n",
		     page, snapc);
		if (!clear_page_dirty_for_io(page))
			goto retry_locked;
		r = writepage_nounlock(page, NULL);
		dout(" page %p snapc %p not current, but oldest\n", page, snapc);
		if (clear_page_dirty_for_io(page)) {
			int r = writepage_nounlock(page, NULL);
			if (r < 0)
			goto fail_unlock;
		goto retry_locked;
	}

	if (PageUptodate(page)) {
		dout(" page %p already uptodate\n", page);
		return 0;
	}

	/* full page? */
	if (pos_in_page == 0 && len == PAGE_SIZE)
		return 0;

	/* past end of file? */
	i_size = i_size_read(inode);

	if (page_off >= i_size ||
	    (pos_in_page == 0 && (pos+len) >= i_size &&
	     end_in_page - pos_in_page != PAGE_SIZE)) {
		dout(" zeroing %p 0 - %d and %d - %d\n",
		     page, pos_in_page, end_in_page, (int)PAGE_SIZE);
		zero_user_segments(page,
				   0, pos_in_page,
				   end_in_page, PAGE_SIZE);
		return 0;
				return ERR_PTR(r);
		}

	/* we need to read it. */
	r = ceph_do_readpage(file, page);
	if (r < 0) {
		if (r == -EINPROGRESS)
			return -EAGAIN;
		goto fail_unlock;
	}
	goto retry_locked;
fail_unlock:
	unlock_page(page);
	return r;
	return NULL;
}

/*
@@ -1414,26 +1311,78 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
			    struct page **pagep, void **fsdata)
{
	struct inode *inode = file_inode(file);
	struct page *page;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc;
	struct page *page = NULL;
	pgoff_t index = pos >> PAGE_SHIFT;
	int r;
	int pos_in_page = pos & ~PAGE_MASK;
	int r = 0;

	do {
		/* get a page */
	dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len);

	for (;;) {
		page = grab_cache_page_write_begin(mapping, index, 0);
		if (!page)
			return -ENOMEM;
		if (!page) {
			r = -ENOMEM;
			break;
		}

		snapc = ceph_find_incompatible(page);
		if (snapc) {
			if (IS_ERR(snapc)) {
				r = PTR_ERR(snapc);
				break;
			}
			unlock_page(page);
			put_page(page);
			page = NULL;
			ceph_queue_writeback(inode);
			r = wait_event_killable(ci->i_cap_wq,
						context_is_writeable_or_written(inode, snapc));
			ceph_put_snap_context(snapc);
			if (r != 0)
				break;
			continue;
		}

		if (PageUptodate(page)) {
			dout(" page %p already uptodate\n", page);
			break;
		}

		dout("write_begin file %p inode %p page %p %d~%d\n", file,
		     inode, page, (int)pos, (int)len);
		/*
		 * In some cases we don't need to read at all:
		 * - full page write
		 * - write that lies completely beyond EOF
		 * - write that covers the the page from start to EOF or beyond it
		 */
		if ((pos_in_page == 0 && len == PAGE_SIZE) ||
		    (pos >= i_size_read(inode)) ||
		    (pos_in_page == 0 && (pos + len) >= i_size_read(inode))) {
			zero_user_segments(page, 0, pos_in_page,
					   pos_in_page + len, PAGE_SIZE);
			break;
		}

		r = ceph_update_writeable_page(file, pos, len, page);
		if (r < 0)
		/*
		 * We need to read it. If we get back -EINPROGRESS, then the page was
		 * handed off to fscache and it will be unlocked when the read completes.
		 * Refind the page in that case so we can reacquire the page lock. Otherwise
		 * we got a hard error or the read was completed synchronously.
		 */
		r = ceph_do_readpage(file, page);
		if (r != -EINPROGRESS)
			break;
	}

	if (r < 0) {
		if (page) {
			unlock_page(page);
			put_page(page);
		else
		}
	} else {
		*pagep = page;
	} while (r == -EAGAIN);

	}
	return r;
}

@@ -1522,7 +1471,7 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_info *fi = vma->vm_file->private_data;
	struct page *pinned_page = NULL;
	loff_t off = vmf->pgoff << PAGE_SHIFT;
	loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
	int want, got, err;
	sigset_t oldset;
	vm_fault_t ret = VM_FAULT_SIGBUS;
@@ -1668,6 +1617,8 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
	inode_inc_iversion_raw(inode);

	do {
		struct ceph_snap_context *snapc;

		lock_page(page);

		if (page_mkwrite_check_truncate(page, inode) < 0) {
@@ -1676,13 +1627,26 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
			break;
		}

		err = ceph_update_writeable_page(vma->vm_file, off, len, page);
		if (err >= 0) {
		snapc = ceph_find_incompatible(page);
		if (!snapc) {
			/* success.  we'll keep the page locked. */
			set_page_dirty(page);
			ret = VM_FAULT_LOCKED;
			break;
		}

		unlock_page(page);

		if (IS_ERR(snapc)) {
			ret = VM_FAULT_SIGBUS;
			break;
		}
	} while (err == -EAGAIN);

		ceph_queue_writeback(inode);
		err = wait_event_killable(ci->i_cap_wq,
				context_is_writeable_or_written(inode, snapc));
		ceph_put_snap_context(snapc);
	} while (err == 0);

	if (ret == VM_FAULT_LOCKED ||
	    ci->i_inline_version != CEPH_INLINE_NONE) {
@@ -2039,16 +2003,16 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
	if (err >= 0 || err == -ENOENT)
		have |= POOL_READ;
	else if (err != -EPERM) {
		if (err == -EBLACKLISTED)
			fsc->blacklisted = true;
		if (err == -EBLOCKLISTED)
			fsc->blocklisted = true;
		goto out_unlock;
	}

	if (err2 == 0 || err2 == -EEXIST)
		have |= POOL_WRITE;
	else if (err2 != -EPERM) {
		if (err2 == -EBLACKLISTED)
			fsc->blacklisted = true;
		if (err2 == -EBLOCKLISTED)
			fsc->blocklisted = true;
		err = err2;
		goto out_unlock;
	}
+86 −42
Original line number Diff line number Diff line
@@ -1222,36 +1222,27 @@ struct cap_msg_args {
};

/*
 * Build and send a cap message to the given MDS.
 *
 * Caller should be holding s_mutex.
 * cap struct size + flock buffer size + inline version + inline data size +
 * osd_epoch_barrier + oldest_flush_tid
 */
static int send_cap_msg(struct cap_msg_args *arg)
#define CAP_MSG_SIZE (sizeof(struct ceph_mds_caps) + \
		      4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4)

/* Marshal up the cap msg to the MDS */
static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg)
{
	struct ceph_mds_caps *fc;
	struct ceph_msg *msg;
	void *p;
	size_t extra_len;
	struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;

	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
	     " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
	     " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
	     arg->cid, arg->ino, ceph_cap_string(arg->caps),
	     ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
	     arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
	     arg->mseq, arg->follows, arg->size, arg->max_size,
	     arg->xattr_version,
	dout("%s %s %llx %llx caps %s wanted %s dirty %s seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu xattr_ver %llu xattr_len %d\n",
	     __func__, ceph_cap_op_name(arg->op), arg->cid, arg->ino,
	     ceph_cap_string(arg->caps), ceph_cap_string(arg->wanted),
	     ceph_cap_string(arg->dirty), arg->seq, arg->issue_seq,
	     arg->flush_tid, arg->oldest_flush_tid, arg->mseq, arg->follows,
	     arg->size, arg->max_size, arg->xattr_version,
	     arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);

	/* flock buffer size + inline version + inline data size +
	 * osd_epoch_barrier + oldest_flush_tid */
	extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
			   GFP_NOFS, false);
	if (!msg)
		return -ENOMEM;

	msg->hdr.version = cpu_to_le16(10);
	msg->hdr.tid = cpu_to_le64(arg->flush_tid);

@@ -1323,9 +1314,6 @@ static int send_cap_msg(struct cap_msg_args *arg)

	/* Advisory flags (version 10) */
	ceph_encode_32(&p, arg->flags);

	ceph_con_send(&arg->session->s_con, msg);
	return 0;
}

/*
@@ -1454,25 +1442,25 @@ static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
 *
 * Caller should hold snap_rwsem (read), s_mutex.
 */
static void __send_cap(struct ceph_mds_client *mdsc, struct cap_msg_args *arg,
		       struct ceph_inode_info *ci)
static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci)
{
	struct ceph_msg *msg;
	struct inode *inode = &ci->vfs_inode;
	int ret;

	ret = send_cap_msg(arg);
	if (ret < 0) {
		pr_err("error sending cap msg, ino (%llx.%llx) "
		       "flushing %s tid %llu, requeue\n",
	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false);
	if (!msg) {
		pr_err("error allocating cap msg: ino (%llx.%llx) flushing %s tid %llu, requeuing cap.\n",
		       ceph_vinop(inode), ceph_cap_string(arg->dirty),
		       arg->flush_tid);
		spin_lock(&ci->i_ceph_lock);
		__cap_delay_requeue(mdsc, ci);
		__cap_delay_requeue(arg->session->s_mdsc, ci);
		spin_unlock(&ci->i_ceph_lock);
		return;
	}

	encode_cap_msg(msg, arg);
	ceph_con_send(&arg->session->s_con, msg);
	ceph_buffer_put(arg->old_xattr_buf);

	if (arg->wake)
		wake_up_all(&ci->i_cap_wq);
}
@@ -1483,6 +1471,11 @@ static inline int __send_flush_snap(struct inode *inode,
				    u32 mseq, u64 oldest_flush_tid)
{
	struct cap_msg_args	arg;
	struct ceph_msg		*msg;

	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false);
	if (!msg)
		return -ENOMEM;

	arg.session = session;
	arg.ino = ceph_vino(inode).ino;
@@ -1521,7 +1514,9 @@ static inline int __send_flush_snap(struct inode *inode,
	arg.flags = 0;
	arg.wake = false;

	return send_cap_msg(&arg);
	encode_cap_msg(msg, &arg);
	ceph_con_send(&arg.session->s_con, msg);
	return 0;
}

/*
@@ -1906,9 +1901,8 @@ bool __ceph_should_report_size(struct ceph_inode_info *ci)
void ceph_check_caps(struct ceph_inode_info *ci, int flags,
		     struct ceph_mds_session *session)
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
	struct ceph_mds_client *mdsc = fsc->mdsc;
	struct inode *inode = &ci->vfs_inode;
	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
	struct ceph_cap *cap;
	u64 flush_tid, oldest_flush_tid;
	int file_wanted, used, cap_used;
@@ -1928,12 +1922,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
retry:
	spin_lock(&ci->i_ceph_lock);
retry_locked:
	/* Caps wanted by virtue of active open files. */
	file_wanted = __ceph_caps_file_wanted(ci);

	/* Caps which have active references against them */
	used = __ceph_caps_used(ci);

	/*
	 * "issued" represents the current caps that the MDS wants us to have.
	 * "implemented" is the set that we have been granted, and includes the
	 * ones that have not yet been returned to the MDS (the "revoking" set,
	 * usually because they have outstanding references).
	 */
	issued = __ceph_caps_issued(ci, &implemented);
	revoking = implemented & ~issued;

	want = file_wanted;

	/* The ones we currently want to retain (may be adjusted below) */
	retain = file_wanted | used | CEPH_CAP_PIN;
	if (!mdsc->stopping && inode->i_nlink > 0) {
		if (file_wanted) {
@@ -2011,6 +2017,10 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,

		/* NOTE: no side-effects allowed, until we take s_mutex */

		/*
		 * If we have an auth cap, we don't need to consider any
		 * overlapping caps as used.
		 */
		cap_used = used;
		if (ci->i_auth_cap && cap != ci->i_auth_cap)
			cap_used &= ~ci->i_auth_cap->issued;
@@ -2148,7 +2158,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
			   want, retain, flushing, flush_tid, oldest_flush_tid);
		spin_unlock(&ci->i_ceph_lock);

		__send_cap(mdsc, &arg, ci);
		__send_cap(&arg, ci);

		goto retry; /* retake i_ceph_lock and restart our cap scan. */
	}
@@ -2222,7 +2232,7 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
			   flushing, flush_tid, oldest_flush_tid);
		spin_unlock(&ci->i_ceph_lock);

		__send_cap(mdsc, &arg, ci);
		__send_cap(&arg, ci);
	} else {
		if (!list_empty(&ci->i_cap_flush_list)) {
			struct ceph_cap_flush *cf =
@@ -2436,7 +2446,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
					  (cap->issued | cap->implemented),
					  cf->caps, cf->tid, oldest_flush_tid);
			spin_unlock(&ci->i_ceph_lock);
			__send_cap(mdsc, &arg, ci);
			__send_cap(&arg, ci);
		} else {
			struct ceph_cap_snap *capsnap =
					container_of(cf, struct ceph_cap_snap,
@@ -4284,13 +4294,30 @@ void __ceph_touch_fmode(struct ceph_inode_info *ci,

void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
{
	int i;
	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->vfs_inode.i_sb);
	int bits = (fmode << 1) | 1;
	bool is_opened = false;
	int i;

	if (count == 1)
		atomic64_inc(&mdsc->metric.opened_files);

	spin_lock(&ci->i_ceph_lock);
	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
		if (bits & (1 << i))
			ci->i_nr_by_mode[i] += count;

		/*
		 * If any of the mode ref is larger than 1,
		 * that means it has been already opened by
		 * others. Just skip checking the PIN ref.
		 */
		if (i && ci->i_nr_by_mode[i] > 1)
			is_opened = true;
	}

	if (!is_opened)
		percpu_counter_inc(&mdsc->metric.opened_inodes);
	spin_unlock(&ci->i_ceph_lock);
}

@@ -4301,15 +4328,32 @@ void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
 */
void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
{
	int i;
	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->vfs_inode.i_sb);
	int bits = (fmode << 1) | 1;
	bool is_closed = true;
	int i;

	if (count == 1)
		atomic64_dec(&mdsc->metric.opened_files);

	spin_lock(&ci->i_ceph_lock);
	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
		if (bits & (1 << i)) {
			BUG_ON(ci->i_nr_by_mode[i] < count);
			ci->i_nr_by_mode[i] -= count;
		}

		/*
		 * If any of the mode ref is not 0 after
		 * decreased, that means it is still opened
		 * by others. Just skip checking the PIN ref.
		 */
		if (i && ci->i_nr_by_mode[i])
			is_closed = false;
	}

	if (is_closed)
		percpu_counter_dec(&mdsc->metric.opened_inodes);
	spin_unlock(&ci->i_ceph_lock);
}

+15 −3

File changed.

Preview size limit exceeded, changes collapsed.

Loading