Unverified Commit 077fe6dc authored by openeuler-ci-bot's avatar openeuler-ci-bot Committed by Gitee
Browse files

!14663 xsk: Add generic xdp multi-buffer send support

parents eed56ce0 8a0b38e7
Loading
Loading
Loading
Loading
+237 −0
Original line number Diff line number Diff line
@@ -528,6 +528,231 @@ static void xsk_destruct_skb(struct sk_buff *skb)
	sock_wfree(skb);
}

#ifdef CONFIG_XSK_MULTI_BUF
static int xsk_cq_reserve_addr_locked(struct xdp_sock *xs, u64 addr)
{
	unsigned long flags;
	int ret;

	spin_lock_irqsave(&xs->pool->cq_lock, flags);
	ret = xskq_prod_reserve_addr(xs->pool->cq, addr);
	spin_unlock_irqrestore(&xs->pool->cq_lock, flags);

	return ret;
}

static void xsk_cq_submit_locked(struct xdp_sock *xs, u32 n)
{
	unsigned long flags;

	spin_lock_irqsave(&xs->pool->cq_lock, flags);
	xskq_prod_submit_n(xs->pool->cq, n);
	spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
}

static void xsk_cq_cancel_locked(struct xdp_sock *xs, u32 n)
{
	unsigned long flags;

	spin_lock_irqsave(&xs->pool->cq_lock, flags);
	xskq_prod_cancel_n(xs->pool->cq, n);
	spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
}

static u32 xsk_get_num_desc(struct sk_buff *skb)
{
	return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
}

static void xsk_destruct_skb_multi(struct sk_buff *skb)
{
	xsk_cq_submit_locked(xdp_sk(skb->sk), xsk_get_num_desc(skb));
	sock_wfree(skb);
}

static void xsk_set_destructor_arg(struct sk_buff *skb)
{
	long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;

	skb_shinfo(skb)->destructor_arg = (void *)num;
}

static void xsk_consume_skb(struct sk_buff *skb)
{
	struct xdp_sock *xs = xdp_sk(skb->sk);

	skb->destructor = sock_wfree;
	xsk_cq_cancel_locked(xs, xsk_get_num_desc(skb));
	/* Free skb without triggering the perf drop trace */
	consume_skb(skb);
	xs->skb = NULL;
}

static void xsk_drop_skb(struct sk_buff *skb)
{
	xdp_sk(skb->sk)->tx->invalid_descs += xsk_get_num_desc(skb);
	xsk_consume_skb(skb);
}

static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
				     struct xdp_desc *desc)
{
	struct net_device *dev = xs->dev;
	struct sk_buff *skb = xs->skb;
	int err;

	if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
		err = -EOPNOTSUPP;
		goto free_err;
	} else {
		u32 hr, tr, len;
		void *buffer;

		buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
		len = desc->len;

		if (!skb) {
			hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
			tr = dev->needed_tailroom;
			skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
			if (unlikely(!skb))
				goto free_err;

			skb_reserve(skb, hr);
			skb_put(skb, len);

			err = skb_store_bits(skb, 0, buffer, len);
			if (unlikely(err)) {
				kfree_skb(skb);
				goto free_err;
			}
		} else {
			int nr_frags = skb_shinfo(skb)->nr_frags;
			struct page *page;
			u8 *vaddr;

			if (unlikely(nr_frags == (MAX_SKB_FRAGS - 1) && xp_mb_desc(desc))) {
				err = -EOVERFLOW;
				goto free_err;
			}

			page = alloc_page(xs->sk.sk_allocation);
			if (unlikely(!page)) {
				err = -EAGAIN;
				goto free_err;
			}

			vaddr = kmap(page);
			memcpy(vaddr, buffer, len);
			kunmap(page);

			skb_add_rx_frag(skb, nr_frags, page, 0, len, PAGE_SIZE);
			refcount_add(PAGE_SIZE, &xs->sk.sk_wmem_alloc);
		}
	}

	skb->dev = dev;
	skb->priority = xs->sk.sk_priority;
	skb->mark = xs->sk.sk_mark;
	skb->destructor = xsk_destruct_skb_multi;
	xsk_set_destructor_arg(skb);

	return skb;
free_err:
	if (err == -EOVERFLOW) {
		/* Drop the packet */
		xsk_set_destructor_arg(xs->skb);
		xsk_drop_skb(xs->skb);
		xskq_cons_release(xs->tx);
	} else {
		/* Let application retry */
		xsk_cq_cancel_locked(xs, 1);
	}

	return ERR_PTR(err);
}

static int xsk_generic_xmit_multi(struct sock *sk)
{
	struct xdp_sock *xs = xdp_sk(sk);
	u32 max_batch = TX_BATCH_SIZE;
	bool sent_frame = false;
	struct xdp_desc desc;
	struct sk_buff *skb;
	int err = 0;

	mutex_lock(&xs->mutex);

	if (xs->queue_id >= xs->dev->real_num_tx_queues)
		goto out;

	while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {
		if (max_batch-- == 0) {
			err = -EAGAIN;
			goto out;
		}

		/* This is the backpressure mechanism for the Tx path.
		 * Reserve space in the completion queue and only proceed
		 * if there is space in it. This avoids having to implement
		 * any buffering in the Tx path.
		 */
		if (xsk_cq_reserve_addr_locked(xs, desc.addr))
			goto out;

		skb = xsk_build_skb(xs, &desc);
		if (IS_ERR(skb)) {
			err = PTR_ERR(skb);
			if (err != -EOVERFLOW)
				goto out;
			err = 0;
			continue;
		}

		xskq_cons_release(xs->tx);

		if (xp_mb_desc(&desc)) {
			xs->skb = skb;
			continue;
		}

		err = __dev_direct_xmit(skb, xs->queue_id);
		if  (err == NETDEV_TX_BUSY) {
			/* Tell user-space to retry the send */
			xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb));
			xsk_consume_skb(skb);
			err = -EAGAIN;
			goto out;
		}

		/* Ignore NET_XMIT_CN as packet might have been sent */
		if (err == NET_XMIT_DROP) {
			/* SKB completed but not sent */
			err = -EBUSY;
			xs->skb = NULL;
			goto out;
		}

		sent_frame = true;
		xs->skb = NULL;
	}

	if (xskq_has_descs(xs->tx)) {
		if (xs->skb)
			xsk_drop_skb(xs->skb);
		xskq_cons_release(xs->tx);
	}

out:
	if (sent_frame)
		if (xsk_tx_writeable(xs))
			sk->sk_write_space(sk);

	mutex_unlock(&xs->mutex);
	return err;
}
#endif

static int xsk_generic_xmit(struct sock *sk)
{
	struct xdp_sock *xs = xdp_sk(sk);
@@ -631,7 +856,14 @@ static int __xsk_sendmsg(struct sock *sk)
	if (unlikely(!xs->tx))
		return -ENOBUFS;

#ifdef CONFIG_XSK_MULTI_BUF
	if (xs->zc)
		return xsk_zc_xmit(xs);
	else
		return xs->sg ? xsk_generic_xmit_multi(sk) : xsk_generic_xmit(sk);
#else
	return xs->zc ? xsk_zc_xmit(xs) : xsk_generic_xmit(sk);
#endif
}

static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
@@ -769,6 +1001,11 @@ static int xsk_release(struct socket *sock)

	net = sock_net(sk);

#ifdef CONFIG_XSK_MULTI_BUF
	if (xs->skb)
		xsk_drop_skb(xs->skb);
#endif

	mutex_lock(&net->xdp.lock);
	sk_del_node_init_rcu(sk);
	mutex_unlock(&net->xdp.lock);
+14 −0
Original line number Diff line number Diff line
@@ -186,6 +186,13 @@ static inline bool xp_validate_desc(struct xsk_buff_pool *pool,
		xp_aligned_validate_desc(pool, desc);
}

#ifdef CONFIG_XSK_MULTI_BUF
static inline bool xskq_has_descs(struct xsk_queue *q)
{
	return q->cached_cons != q->cached_prod;
}
#endif

static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
					   struct xdp_desc *d,
					   struct xsk_buff_pool *pool)
@@ -282,6 +289,13 @@ static inline bool xskq_cons_is_full(struct xsk_queue *q)
		q->nentries;
}

#ifdef CONFIG_XSK_MULTI_BUF
static inline void xskq_cons_cancel_n(struct xsk_queue *q, u32 cnt)
{
	q->cached_cons -= cnt;
}
#endif

static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
{
	/* No barriers needed since data is not accessed */