Commit 2371bcc0 authored by Chuck Lever's avatar Chuck Lever
Browse files

svcrdma: Support multiple Write chunks in svc_rdma_map_reply_msg()



Refactor: svc_rdma_map_reply_msg() is restructured to DMA map only
the parts of rq_res that do not contain a result payload.

This change has been tested to confirm that it does not cause a
regression in the no Write chunk and single Write chunk cases.
Multiple Write chunks have not been tested.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 9d0b09d5
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -213,7 +213,7 @@ extern int svc_rdma_send(struct svcxprt_rdma *rdma,
extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
				  struct svc_rdma_send_ctxt *sctxt,
				  const struct svc_rdma_recv_ctxt *rctxt,
				  struct xdr_buf *xdr);
				  const struct xdr_buf *xdr);
extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
				    struct svc_rdma_send_ctxt *sctxt,
				    struct svc_rdma_recv_ctxt *rctxt,
+1 −0
Original line number Diff line number Diff line
@@ -1687,6 +1687,7 @@ DECLARE_EVENT_CLASS(svcrdma_dma_map_class,
				TP_ARGS(rdma, dma_addr, length))

DEFINE_SVC_DMA_EVENT(dma_map_page);
DEFINE_SVC_DMA_EVENT(dma_map_err);
DEFINE_SVC_DMA_EVENT(dma_unmap_page);

TRACE_EVENT(svcrdma_dma_map_rw_err,
+98 −76
Original line number Diff line number Diff line
@@ -496,39 +496,111 @@ svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
	return svc_rdma_encode_write_chunk(sctxt, chunk);
}

static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
				 struct svc_rdma_send_ctxt *ctxt,
				 struct page *page,
				 unsigned long offset,
				 unsigned int len)
struct svc_rdma_map_data {
	struct svcxprt_rdma		*md_rdma;
	struct svc_rdma_send_ctxt	*md_ctxt;
};

/**
 * svc_rdma_page_dma_map - DMA map one page
 * @data: pointer to arguments
 * @page: struct page to DMA map
 * @offset: offset into the page
 * @len: number of bytes to map
 *
 * Returns:
 *   %0 if DMA mapping was successful
 *   %-EIO if the page cannot be DMA mapped
 */
static int svc_rdma_page_dma_map(void *data, struct page *page,
				 unsigned long offset, unsigned int len)
{
	struct svc_rdma_map_data *args = data;
	struct svcxprt_rdma *rdma = args->md_rdma;
	struct svc_rdma_send_ctxt *ctxt = args->md_ctxt;
	struct ib_device *dev = rdma->sc_cm_id->device;
	dma_addr_t dma_addr;

	++ctxt->sc_cur_sge_no;

	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
	if (ib_dma_mapping_error(dev, dma_addr))
		goto out_maperr;

	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
	ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr;
	ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len;
	ctxt->sc_send_wr.num_sge++;
	return 0;

out_maperr:
	trace_svcrdma_dma_map_err(rdma, dma_addr, len);
	return -EIO;
}

/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
/**
 * svc_rdma_iov_dma_map - DMA map an iovec
 * @data: pointer to arguments
 * @iov: kvec to DMA map
 *
 * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
 * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
 *
 * Returns:
 *   %0 if DMA mapping was successful
 *   %-EIO if the iovec cannot be DMA mapped
 */
static int svc_rdma_iov_dma_map(void *data, const struct kvec *iov)
{
	if (!iov->iov_len)
		return 0;
	return svc_rdma_page_dma_map(data, virt_to_page(iov->iov_base),
				     offset_in_page(iov->iov_base),
				     iov->iov_len);
}

/**
 * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
 * @xdr: xdr_buf containing portion of an RPC message to transmit
 * @data: pointer to arguments
 *
 * Returns:
 *   %0 if DMA mapping was successful
 *   %-EIO if DMA mapping failed
 *
 * On failure, any DMA mappings that have been already done must be
 * unmapped by the caller.
 */
static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
				struct svc_rdma_send_ctxt *ctxt,
				unsigned char *base,
				unsigned int len)
static int svc_rdma_xb_dma_map(const struct xdr_buf *xdr, void *data)
{
	return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base),
				     offset_in_page(base), len);
	unsigned int len, remaining;
	unsigned long pageoff;
	struct page **ppages;
	int ret;

	ret = svc_rdma_iov_dma_map(data, &xdr->head[0]);
	if (ret < 0)
		return ret;

	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
	pageoff = offset_in_page(xdr->page_base);
	remaining = xdr->page_len;
	while (remaining) {
		len = min_t(u32, PAGE_SIZE - pageoff, remaining);

		ret = svc_rdma_page_dma_map(data, *ppages++, pageoff, len);
		if (ret < 0)
			return ret;

		remaining -= len;
		pageoff = 0;
	}

	ret = svc_rdma_iov_dma_map(data, &xdr->tail[0]);
	if (ret < 0)
		return ret;

	return xdr->len;
}

struct svc_rdma_pullup_data {
@@ -688,22 +760,22 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
 * @rctxt: Write and Reply chunks provided by client
 * @xdr: prepared xdr_buf containing RPC message
 *
 * Load the xdr_buf into the ctxt's sge array, and DMA map each
 * element as it is added. The Send WR's num_sge field is set.
 * Returns:
 *   %0 if DMA mapping was successful.
 *   %-EMSGSIZE if a buffer manipulation problem occurred
 *   %-EIO if DMA mapping failed
 *
 * Returns zero on success, or a negative errno on failure.
 * The Send WR's num_sge field is set in all cases.
 */
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
			   struct svc_rdma_send_ctxt *sctxt,
			   const struct svc_rdma_recv_ctxt *rctxt,
			   struct xdr_buf *xdr)
			   const struct xdr_buf *xdr)
{
	unsigned int len, remaining;
	unsigned long page_off;
	struct page **ppages;
	unsigned char *base;
	u32 xdr_pad;
	int ret;
	struct svc_rdma_map_data args = {
		.md_rdma	= rdma,
		.md_ctxt	= sctxt,
	};

	/* Set up the (persistently-mapped) transport header SGE. */
	sctxt->sc_send_wr.num_sge = 1;
@@ -712,7 +784,7 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
	/* If there is a Reply chunk, nothing follows the transport
	 * header, and we're done here.
	 */
	if (rctxt && rctxt->rc_reply_chunk)
	if (!pcl_is_empty(&rctxt->rc_reply_pcl))
		return 0;

	/* For pull-up, svc_rdma_send() will sync the transport header.
@@ -721,58 +793,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
	if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
		return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);

	++sctxt->sc_cur_sge_no;
	ret = svc_rdma_dma_map_buf(rdma, sctxt,
				   xdr->head[0].iov_base,
				   xdr->head[0].iov_len);
	if (ret < 0)
		return ret;

	/* If a Write chunk is present, the xdr_buf's page list
	 * is not included inline. However the Upper Layer may
	 * have added XDR padding in the tail buffer, and that
	 * should not be included inline.
	 */
	if (rctxt && rctxt->rc_write_list) {
		base = xdr->tail[0].iov_base;
		len = xdr->tail[0].iov_len;
		xdr_pad = xdr_pad_size(xdr->page_len);

		if (len && xdr_pad) {
			base += xdr_pad;
			len -= xdr_pad;
		}

		goto tail;
	}

	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
	page_off = xdr->page_base & ~PAGE_MASK;
	remaining = xdr->page_len;
	while (remaining) {
		len = min_t(u32, PAGE_SIZE - page_off, remaining);

		++sctxt->sc_cur_sge_no;
		ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
					    page_off, len);
		if (ret < 0)
			return ret;

		remaining -= len;
		page_off = 0;
	}

	base = xdr->tail[0].iov_base;
	len = xdr->tail[0].iov_len;
tail:
	if (len) {
		++sctxt->sc_cur_sge_no;
		ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
		if (ret < 0)
			return ret;
	}

	return 0;
	return pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
				       svc_rdma_xb_dma_map, &args);
}

/* The svc_rqst and all resources it owns are released as soon as