Commit 4c9f0937 authored by Daniel Borkmann's avatar Daniel Borkmann
Browse files

Merge branch 'bpf-xsk-rx-batch'



Magnus Karlsson says:

====================
This patch set introduces a batched interface for Rx buffer allocation
in AF_XDP buffer pool. Instead of using xsk_buff_alloc(*pool), drivers
can now use xsk_buff_alloc_batch(*pool, **xdp_buff_array,
max). Instead of returning a pointer to an xdp_buff, it returns the
number of xdp_buffs it managed to allocate up to the maximum value of
the max parameter in the function call. Pointers to the allocated
xdp_buff:s are put in the xdp_buff_array supplied in the call. This
could be a SW ring that already exists in the driver or a new
structure that the driver has allocated.

  u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool,
                           struct xdp_buff **xdp,
                           u32 max);

When using this interface, the driver should also use the new
interface below to set the relevant fields in the struct xdp_buff. The
reason for this is that xsk_buff_alloc_batch() does not fill in the
data and data_meta fields for you as is the case with
xsk_buff_alloc(). So it is not sufficient to just set data_end
(effectively the size) anymore in the driver. The reason for this is
performance as explained in detail in the commit message.

  void xsk_buff_set_size(struct xdp_buff *xdp, u32 size);

Patch 6 also optimizes the buffer allocation in the aligned case. In
this case, we can skip the reinitialization of most fields in the
xdp_buff_xsk struct at allocation time. As the number of elements in
the heads array is equal to the number of possible buffers in the
umem, we can initialize them once and for all at bind time and then
just point to the correct one in the xdp_buff_array that is returned
to the driver. No reason to have a stack of free head entries. In the
unaligned case, the buffers can reside anywhere in the umem, so this
optimization is not possible as we still have to fill in the right
information in the xdp_buff every single time one is allocated.

I have updated i40e and ice to use this new batched interface.

These are the throughput results on my 2.1 GHz Cascade Lake system:

Aligned mode:
ice: +11% / -9 cycles/pkt
i40e: +12% / -9 cycles/pkt

Unaligned mode:
ice: +1.5% / -1 cycle/pkt
i40e: +1% / -1 cycle/pkt

For the aligned case, batching provides around 40% of the performance
improvement and the aligned optimization the rest, around 60%. Would
have expected a ~4% boost for unaligned with this data, but I only get
around 1%. Do not know why. Note that memory consumption in aligned
mode is also reduced by this patch set.
====================

Signed-off-by: default avatarDaniel Borkmann <daniel@iogearbox.net>
parents e7d5184b e34087fc
Loading
Loading
Loading
Loading
+25 −27
Original line number Diff line number Diff line
@@ -193,42 +193,40 @@ bool i40e_alloc_rx_buffers_zc(struct i40e_ring *rx_ring, u16 count)
{
	u16 ntu = rx_ring->next_to_use;
	union i40e_rx_desc *rx_desc;
	struct xdp_buff **bi, *xdp;
	struct xdp_buff **xdp;
	u32 nb_buffs, i;
	dma_addr_t dma;
	bool ok = true;

	rx_desc = I40E_RX_DESC(rx_ring, ntu);
	bi = i40e_rx_bi(rx_ring, ntu);
	do {
		xdp = xsk_buff_alloc(rx_ring->xsk_pool);
		if (!xdp) {
			ok = false;
			goto no_buffers;
		}
		*bi = xdp;
		dma = xsk_buff_xdp_get_dma(xdp);
	xdp = i40e_rx_bi(rx_ring, ntu);

	nb_buffs = min_t(u16, count, rx_ring->count - ntu);
	nb_buffs = xsk_buff_alloc_batch(rx_ring->xsk_pool, xdp, nb_buffs);
	if (!nb_buffs)
		return false;

	i = nb_buffs;
	while (i--) {
		dma = xsk_buff_xdp_get_dma(*xdp);
		rx_desc->read.pkt_addr = cpu_to_le64(dma);
		rx_desc->read.hdr_addr = 0;

		rx_desc++;
		bi++;
		ntu++;
		xdp++;
	}

		if (unlikely(ntu == rx_ring->count)) {
	ntu += nb_buffs;
	if (ntu == rx_ring->count) {
		rx_desc = I40E_RX_DESC(rx_ring, 0);
			bi = i40e_rx_bi(rx_ring, 0);
		xdp = i40e_rx_bi(rx_ring, 0);
		ntu = 0;
	}
	} while (--count);

no_buffers:
	if (rx_ring->next_to_use != ntu) {
	/* clear the status bits for the next_to_use descriptor */
	rx_desc->wb.qword1.status_error_len = 0;
	i40e_release_rx_desc(rx_ring, ntu);
	}

	return ok;
	return count == nb_buffs ? true : false;
}

/**
@@ -365,7 +363,7 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
			break;

		bi = *i40e_rx_bi(rx_ring, next_to_clean);
		bi->data_end = bi->data + size;
		xsk_buff_set_size(bi, size);
		xsk_buff_dma_sync_for_cpu(bi, rx_ring->xsk_pool);

		xdp_res = i40e_run_xdp_zc(rx_ring, bi);
+5 −11
Original line number Diff line number Diff line
@@ -164,18 +164,11 @@ struct ice_tx_offload_params {
};

struct ice_rx_buf {
	union {
		struct {
	dma_addr_t dma;
	struct page *page;
	unsigned int page_offset;
	u16 pagecnt_bias;
};
		struct {
			struct xdp_buff *xdp;
		};
	};
};

struct ice_q_stats {
	u64 pkts;
@@ -270,6 +263,7 @@ struct ice_ring {
	union {
		struct ice_tx_buf *tx_buf;
		struct ice_rx_buf *rx_buf;
		struct xdp_buff **xdp_buf;
	};
	/* CL2 - 2nd cacheline starts here */
	u16 q_index;			/* Queue number of ring */
+43 −49
Original line number Diff line number Diff line
@@ -364,45 +364,39 @@ bool ice_alloc_rx_bufs_zc(struct ice_ring *rx_ring, u16 count)
{
	union ice_32b_rx_flex_desc *rx_desc;
	u16 ntu = rx_ring->next_to_use;
	struct ice_rx_buf *rx_buf;
	bool ok = true;
	struct xdp_buff **xdp;
	u32 nb_buffs, i;
	dma_addr_t dma;

	if (!count)
		return true;

	rx_desc = ICE_RX_DESC(rx_ring, ntu);
	rx_buf = &rx_ring->rx_buf[ntu];
	xdp = &rx_ring->xdp_buf[ntu];

	do {
		rx_buf->xdp = xsk_buff_alloc(rx_ring->xsk_pool);
		if (!rx_buf->xdp) {
			ok = false;
			break;
		}
	nb_buffs = min_t(u16, count, rx_ring->count - ntu);
	nb_buffs = xsk_buff_alloc_batch(rx_ring->xsk_pool, xdp, nb_buffs);
	if (!nb_buffs)
		return false;

		dma = xsk_buff_xdp_get_dma(rx_buf->xdp);
	i = nb_buffs;
	while (i--) {
		dma = xsk_buff_xdp_get_dma(*xdp);
		rx_desc->read.pkt_addr = cpu_to_le64(dma);
		rx_desc->wb.status_error0 = 0;

		rx_desc++;
		rx_buf++;
		ntu++;
		xdp++;
	}

		if (unlikely(ntu == rx_ring->count)) {
	ntu += nb_buffs;
	if (ntu == rx_ring->count) {
		rx_desc = ICE_RX_DESC(rx_ring, 0);
			rx_buf = rx_ring->rx_buf;
		xdp = rx_ring->xdp_buf;
		ntu = 0;
	}
	} while (--count);

	if (rx_ring->next_to_use != ntu) {
	/* clear the status bits for the next_to_use descriptor */
	rx_desc->wb.status_error0 = 0;
	ice_release_rx_desc(rx_ring, ntu);
	}

	return ok;
	return count == nb_buffs ? true : false;
}

/**
@@ -421,19 +415,19 @@ static void ice_bump_ntc(struct ice_ring *rx_ring)
/**
 * ice_construct_skb_zc - Create an sk_buff from zero-copy buffer
 * @rx_ring: Rx ring
 * @rx_buf: zero-copy Rx buffer
 * @xdp_arr: Pointer to the SW ring of xdp_buff pointers
 *
 * This function allocates a new skb from a zero-copy Rx buffer.
 *
 * Returns the skb on success, NULL on failure.
 */
static struct sk_buff *
ice_construct_skb_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
ice_construct_skb_zc(struct ice_ring *rx_ring, struct xdp_buff **xdp_arr)
{
	unsigned int metasize = rx_buf->xdp->data - rx_buf->xdp->data_meta;
	unsigned int datasize = rx_buf->xdp->data_end - rx_buf->xdp->data;
	unsigned int datasize_hard = rx_buf->xdp->data_end -
				     rx_buf->xdp->data_hard_start;
	struct xdp_buff *xdp = *xdp_arr;
	unsigned int metasize = xdp->data - xdp->data_meta;
	unsigned int datasize = xdp->data_end - xdp->data;
	unsigned int datasize_hard = xdp->data_end - xdp->data_hard_start;
	struct sk_buff *skb;

	skb = __napi_alloc_skb(&rx_ring->q_vector->napi, datasize_hard,
@@ -441,13 +435,13 @@ ice_construct_skb_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
	if (unlikely(!skb))
		return NULL;

	skb_reserve(skb, rx_buf->xdp->data - rx_buf->xdp->data_hard_start);
	memcpy(__skb_put(skb, datasize), rx_buf->xdp->data, datasize);
	skb_reserve(skb, xdp->data - xdp->data_hard_start);
	memcpy(__skb_put(skb, datasize), xdp->data, datasize);
	if (metasize)
		skb_metadata_set(skb, metasize);

	xsk_buff_free(rx_buf->xdp);
	rx_buf->xdp = NULL;
	xsk_buff_free(xdp);
	*xdp_arr = NULL;
	return skb;
}

@@ -521,7 +515,7 @@ int ice_clean_rx_irq_zc(struct ice_ring *rx_ring, int budget)
	while (likely(total_rx_packets < (unsigned int)budget)) {
		union ice_32b_rx_flex_desc *rx_desc;
		unsigned int size, xdp_res = 0;
		struct ice_rx_buf *rx_buf;
		struct xdp_buff **xdp;
		struct sk_buff *skb;
		u16 stat_err_bits;
		u16 vlan_tag = 0;
@@ -544,18 +538,18 @@ int ice_clean_rx_irq_zc(struct ice_ring *rx_ring, int budget)
		if (!size)
			break;

		rx_buf = &rx_ring->rx_buf[rx_ring->next_to_clean];
		rx_buf->xdp->data_end = rx_buf->xdp->data + size;
		xsk_buff_dma_sync_for_cpu(rx_buf->xdp, rx_ring->xsk_pool);
		xdp = &rx_ring->xdp_buf[rx_ring->next_to_clean];
		xsk_buff_set_size(*xdp, size);
		xsk_buff_dma_sync_for_cpu(*xdp, rx_ring->xsk_pool);

		xdp_res = ice_run_xdp_zc(rx_ring, rx_buf->xdp);
		xdp_res = ice_run_xdp_zc(rx_ring, *xdp);
		if (xdp_res) {
			if (xdp_res & (ICE_XDP_TX | ICE_XDP_REDIR))
				xdp_xmit |= xdp_res;
			else
				xsk_buff_free(rx_buf->xdp);
				xsk_buff_free(*xdp);

			rx_buf->xdp = NULL;
			*xdp = NULL;
			total_rx_bytes += size;
			total_rx_packets++;
			cleaned_count++;
@@ -565,7 +559,7 @@ int ice_clean_rx_irq_zc(struct ice_ring *rx_ring, int budget)
		}

		/* XDP_PASS path */
		skb = ice_construct_skb_zc(rx_ring, rx_buf);
		skb = ice_construct_skb_zc(rx_ring, xdp);
		if (!skb) {
			rx_ring->rx_stats.alloc_buf_failed++;
			break;
@@ -813,12 +807,12 @@ void ice_xsk_clean_rx_ring(struct ice_ring *rx_ring)
	u16 i;

	for (i = 0; i < rx_ring->count; i++) {
		struct ice_rx_buf *rx_buf = &rx_ring->rx_buf[i];
		struct xdp_buff **xdp = &rx_ring->xdp_buf[i];

		if (!rx_buf->xdp)
		if (!xdp)
			continue;

		rx_buf->xdp = NULL;
		*xdp = NULL;
	}
}

+22 −0
Original line number Diff line number Diff line
@@ -77,6 +77,12 @@ static inline struct xdp_buff *xsk_buff_alloc(struct xsk_buff_pool *pool)
	return xp_alloc(pool);
}

/* Returns as many entries as possible up to max. 0 <= N <= max. */
static inline u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max)
{
	return xp_alloc_batch(pool, xdp, max);
}

static inline bool xsk_buff_can_alloc(struct xsk_buff_pool *pool, u32 count)
{
	return xp_can_alloc(pool, count);
@@ -89,6 +95,13 @@ static inline void xsk_buff_free(struct xdp_buff *xdp)
	xp_free(xskb);
}

static inline void xsk_buff_set_size(struct xdp_buff *xdp, u32 size)
{
	xdp->data = xdp->data_hard_start + XDP_PACKET_HEADROOM;
	xdp->data_meta = xdp->data;
	xdp->data_end = xdp->data + size;
}

static inline dma_addr_t xsk_buff_raw_get_dma(struct xsk_buff_pool *pool,
					      u64 addr)
{
@@ -212,6 +225,11 @@ static inline struct xdp_buff *xsk_buff_alloc(struct xsk_buff_pool *pool)
	return NULL;
}

static inline u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max)
{
	return 0;
}

static inline bool xsk_buff_can_alloc(struct xsk_buff_pool *pool, u32 count)
{
	return false;
@@ -221,6 +239,10 @@ static inline void xsk_buff_free(struct xdp_buff *xdp)
{
}

static inline void xsk_buff_set_size(struct xdp_buff *xdp, u32 size)
{
}

static inline dma_addr_t xsk_buff_raw_get_dma(struct xsk_buff_pool *pool,
					      u64 addr)
{
+46 −2
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@
#include <linux/if_xdp.h>
#include <linux/types.h>
#include <linux/dma-mapping.h>
#include <linux/bpf.h>
#include <net/xdp.h>

struct xsk_buff_pool;
@@ -23,7 +24,6 @@ struct xdp_buff_xsk {
	dma_addr_t dma;
	dma_addr_t frame_dma;
	struct xsk_buff_pool *pool;
	bool unaligned;
	u64 orig_addr;
	struct list_head free_list_node;
};
@@ -67,6 +67,7 @@ struct xsk_buff_pool {
	u32 free_heads_cnt;
	u32 headroom;
	u32 chunk_size;
	u32 chunk_shift;
	u32 frame_len;
	u8 cached_need_wakeup;
	bool uses_need_wakeup;
@@ -81,6 +82,13 @@ struct xsk_buff_pool {
	struct xdp_buff_xsk *free_heads[];
};

/* Masks for xdp_umem_page flags.
 * The low 12-bits of the addr will be 0 since this is the page address, so we
 * can use them for flags.
 */
#define XSK_NEXT_PG_CONTIG_SHIFT 0
#define XSK_NEXT_PG_CONTIG_MASK BIT_ULL(XSK_NEXT_PG_CONTIG_SHIFT)

/* AF_XDP core. */
struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs,
						struct xdp_umem *umem);
@@ -89,7 +97,6 @@ int xp_assign_dev(struct xsk_buff_pool *pool, struct net_device *dev,
int xp_assign_dev_shared(struct xsk_buff_pool *pool, struct xdp_umem *umem,
			 struct net_device *dev, u16 queue_id);
void xp_destroy(struct xsk_buff_pool *pool);
void xp_release(struct xdp_buff_xsk *xskb);
void xp_get_pool(struct xsk_buff_pool *pool);
bool xp_put_pool(struct xsk_buff_pool *pool);
void xp_clear_dev(struct xsk_buff_pool *pool);
@@ -99,12 +106,28 @@ void xp_del_xsk(struct xsk_buff_pool *pool, struct xdp_sock *xs);
/* AF_XDP, and XDP core. */
void xp_free(struct xdp_buff_xsk *xskb);

static inline void xp_init_xskb_addr(struct xdp_buff_xsk *xskb, struct xsk_buff_pool *pool,
				     u64 addr)
{
	xskb->orig_addr = addr;
	xskb->xdp.data_hard_start = pool->addrs + addr + pool->headroom;
}

static inline void xp_init_xskb_dma(struct xdp_buff_xsk *xskb, struct xsk_buff_pool *pool,
				    dma_addr_t *dma_pages, u64 addr)
{
	xskb->frame_dma = (dma_pages[addr >> PAGE_SHIFT] & ~XSK_NEXT_PG_CONTIG_MASK) +
		(addr & ~PAGE_MASK);
	xskb->dma = xskb->frame_dma + pool->headroom + XDP_PACKET_HEADROOM;
}

/* AF_XDP ZC drivers, via xdp_sock_buff.h */
void xp_set_rxq_info(struct xsk_buff_pool *pool, struct xdp_rxq_info *rxq);
int xp_dma_map(struct xsk_buff_pool *pool, struct device *dev,
	       unsigned long attrs, struct page **pages, u32 nr_pages);
void xp_dma_unmap(struct xsk_buff_pool *pool, unsigned long attrs);
struct xdp_buff *xp_alloc(struct xsk_buff_pool *pool);
u32 xp_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max);
bool xp_can_alloc(struct xsk_buff_pool *pool, u32 count);
void *xp_raw_get_data(struct xsk_buff_pool *pool, u64 addr);
dma_addr_t xp_raw_get_dma(struct xsk_buff_pool *pool, u64 addr);
@@ -180,4 +203,25 @@ static inline u64 xp_unaligned_add_offset_to_addr(u64 addr)
		xp_unaligned_extract_offset(addr);
}

static inline u32 xp_aligned_extract_idx(struct xsk_buff_pool *pool, u64 addr)
{
	return xp_aligned_extract_addr(pool, addr) >> pool->chunk_shift;
}

static inline void xp_release(struct xdp_buff_xsk *xskb)
{
	if (xskb->pool->unaligned)
		xskb->pool->free_heads[xskb->pool->free_heads_cnt++] = xskb;
}

static inline u64 xp_get_handle(struct xdp_buff_xsk *xskb)
{
	u64 offset = xskb->xdp.data - xskb->xdp.data_hard_start;

	offset += xskb->pool->headroom;
	if (!xskb->pool->unaligned)
		return xskb->orig_addr + offset;
	return xskb->orig_addr + (offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT);
}

#endif /* XSK_BUFF_POOL_H_ */
Loading