Commit a0c5393d authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'lockless-qdisc-packet-stuck'

Yunsheng Lin says:

====================
ix packet stuck problem for lockless qdisc

This patchset fixes the packet stuck problem mentioned in [1].

Patch 1: Add STATE_MISSED flag to fix packet stuck problem.
Patch 2: Fix a tx_action rescheduling problem after STATE_MISSED
         flag is added in patch 1.
Patch 3: Fix the significantly higher CPU consumption problem when
         multiple threads are competing on a saturated outgoing
         device.

V8: Change function name as suggested by Jakub and fix some typo
    in patch 3, adjust commit log in patch 2, and add Acked-by
    from Jakub.
V7: Fix netif_tx_wake_queue() data race noted by Jakub.
V6: Some performance optimization in patch 1 suggested by Jakub
    and drop NET_XMIT_DROP checking in patch 3.
V5: add patch 3 to fix the problem reported by Michal Kubecek.
V4: Change STATE_NEED_RESCHEDULE to STATE_MISSED and add patch 2.

[1]. https://lkml.org/lkml/2019/10/9/42


====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 974271e5 dcad9ee9
Loading
Loading
Loading
Loading
+1 −6
Original line number Diff line number Diff line
@@ -128,11 +128,6 @@ void __qdisc_run(struct Qdisc *q);
static inline void qdisc_run(struct Qdisc *q)
{
	if (qdisc_run_begin(q)) {
		/* NOLOCK qdisc must check 'state' under the qdisc seqlock
		 * to avoid racing with dev_qdisc_reset()
		 */
		if (!(q->flags & TCQ_F_NOLOCK) ||
		    likely(!test_bit(__QDISC_STATE_DEACTIVATED, &q->state)))
		__qdisc_run(q);
		qdisc_run_end(q);
	}
+34 −1
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ struct qdisc_rate_table {
enum qdisc_state_t {
	__QDISC_STATE_SCHED,
	__QDISC_STATE_DEACTIVATED,
	__QDISC_STATE_MISSED,
};

struct qdisc_size_table {
@@ -159,8 +160,33 @@ static inline bool qdisc_is_empty(const struct Qdisc *qdisc)
static inline bool qdisc_run_begin(struct Qdisc *qdisc)
{
	if (qdisc->flags & TCQ_F_NOLOCK) {
		if (spin_trylock(&qdisc->seqlock))
			goto nolock_empty;

		/* If the MISSED flag is set, it means other thread has
		 * set the MISSED flag before second spin_trylock(), so
		 * we can return false here to avoid multi cpus doing
		 * the set_bit() and second spin_trylock() concurrently.
		 */
		if (test_bit(__QDISC_STATE_MISSED, &qdisc->state))
			return false;

		/* Set the MISSED flag before the second spin_trylock(),
		 * if the second spin_trylock() return false, it means
		 * other cpu holding the lock will do dequeuing for us
		 * or it will see the MISSED flag set after releasing
		 * lock and reschedule the net_tx_action() to do the
		 * dequeuing.
		 */
		set_bit(__QDISC_STATE_MISSED, &qdisc->state);

		/* Retry again in case other CPU may not see the new flag
		 * after it releases the lock at the end of qdisc_run_end().
		 */
		if (!spin_trylock(&qdisc->seqlock))
			return false;

nolock_empty:
		WRITE_ONCE(qdisc->empty, false);
	} else if (qdisc_is_running(qdisc)) {
		return false;
@@ -176,8 +202,15 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc)
static inline void qdisc_run_end(struct Qdisc *qdisc)
{
	write_seqcount_end(&qdisc->running);
	if (qdisc->flags & TCQ_F_NOLOCK)
	if (qdisc->flags & TCQ_F_NOLOCK) {
		spin_unlock(&qdisc->seqlock);

		if (unlikely(test_bit(__QDISC_STATE_MISSED,
				      &qdisc->state))) {
			clear_bit(__QDISC_STATE_MISSED, &qdisc->state);
			__netif_schedule(qdisc);
		}
	}
}

static inline bool qdisc_may_bulk(const struct Qdisc *qdisc)
+24 −5
Original line number Diff line number Diff line
@@ -3853,6 +3853,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,

	if (q->flags & TCQ_F_NOLOCK) {
		rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK;
		if (likely(!netif_xmit_frozen_or_stopped(txq)))
			qdisc_run(q);

		if (unlikely(to_free))
@@ -5025,25 +5026,43 @@ static __latent_entropy void net_tx_action(struct softirq_action *h)
		sd->output_queue_tailp = &sd->output_queue;
		local_irq_enable();

		rcu_read_lock();

		while (head) {
			struct Qdisc *q = head;
			spinlock_t *root_lock = NULL;

			head = head->next_sched;

			if (!(q->flags & TCQ_F_NOLOCK)) {
				root_lock = qdisc_lock(q);
				spin_lock(root_lock);
			}
			/* We need to make sure head->next_sched is read
			 * before clearing __QDISC_STATE_SCHED
			 */
			smp_mb__before_atomic();

			if (!(q->flags & TCQ_F_NOLOCK)) {
				root_lock = qdisc_lock(q);
				spin_lock(root_lock);
			} else if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED,
						     &q->state))) {
				/* There is a synchronize_net() between
				 * STATE_DEACTIVATED flag being set and
				 * qdisc_reset()/some_qdisc_is_busy() in
				 * dev_deactivate(), so we can safely bail out
				 * early here to avoid data race between
				 * qdisc_deactivate() and some_qdisc_is_busy()
				 * for lockless qdisc.
				 */
				clear_bit(__QDISC_STATE_SCHED, &q->state);
				continue;
			}

			clear_bit(__QDISC_STATE_SCHED, &q->state);
			qdisc_run(q);
			if (root_lock)
				spin_unlock(root_lock);
		}

		rcu_read_unlock();
	}

	xfrm_dev_backlog(sd);
+48 −2
Original line number Diff line number Diff line
@@ -35,6 +35,25 @@
const struct Qdisc_ops *default_qdisc_ops = &pfifo_fast_ops;
EXPORT_SYMBOL(default_qdisc_ops);

static void qdisc_maybe_clear_missed(struct Qdisc *q,
				     const struct netdev_queue *txq)
{
	clear_bit(__QDISC_STATE_MISSED, &q->state);

	/* Make sure the below netif_xmit_frozen_or_stopped()
	 * checking happens after clearing STATE_MISSED.
	 */
	smp_mb__after_atomic();

	/* Checking netif_xmit_frozen_or_stopped() again to
	 * make sure STATE_MISSED is set if the STATE_MISSED
	 * set by netif_tx_wake_queue()'s rescheduling of
	 * net_tx_action() is cleared by the above clear_bit().
	 */
	if (!netif_xmit_frozen_or_stopped(txq))
		set_bit(__QDISC_STATE_MISSED, &q->state);
}

/* Main transmission queue. */

/* Modifications to data participating in scheduling must be protected with
@@ -74,6 +93,7 @@ static inline struct sk_buff *__skb_dequeue_bad_txq(struct Qdisc *q)
			}
		} else {
			skb = SKB_XOFF_MAGIC;
			qdisc_maybe_clear_missed(q, txq);
		}
	}

@@ -242,6 +262,7 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
			}
		} else {
			skb = NULL;
			qdisc_maybe_clear_missed(q, txq);
		}
		if (lock)
			spin_unlock(lock);
@@ -251,8 +272,10 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
	*validate = true;

	if ((q->flags & TCQ_F_ONETXQUEUE) &&
	    netif_xmit_frozen_or_stopped(txq))
	    netif_xmit_frozen_or_stopped(txq)) {
		qdisc_maybe_clear_missed(q, txq);
		return skb;
	}

	skb = qdisc_dequeue_skb_bad_txq(q);
	if (unlikely(skb)) {
@@ -311,6 +334,8 @@ bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
		HARD_TX_LOCK(dev, txq, smp_processor_id());
		if (!netif_xmit_frozen_or_stopped(txq))
			skb = dev_hard_start_xmit(skb, dev, txq, &ret);
		else
			qdisc_maybe_clear_missed(q, txq);

		HARD_TX_UNLOCK(dev, txq);
	} else {
@@ -640,8 +665,10 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc)
{
	struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
	struct sk_buff *skb = NULL;
	bool need_retry = true;
	int band;

retry:
	for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
		struct skb_array *q = band2list(priv, band);

@@ -652,6 +679,23 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc)
	}
	if (likely(skb)) {
		qdisc_update_stats_at_dequeue(qdisc, skb);
	} else if (need_retry &&
		   test_bit(__QDISC_STATE_MISSED, &qdisc->state)) {
		/* Delay clearing the STATE_MISSED here to reduce
		 * the overhead of the second spin_trylock() in
		 * qdisc_run_begin() and __netif_schedule() calling
		 * in qdisc_run_end().
		 */
		clear_bit(__QDISC_STATE_MISSED, &qdisc->state);

		/* Make sure dequeuing happens after clearing
		 * STATE_MISSED.
		 */
		smp_mb__after_atomic();

		need_retry = false;

		goto retry;
	} else {
		WRITE_ONCE(qdisc->empty, true);
	}
@@ -1158,9 +1202,11 @@ static void dev_reset_queue(struct net_device *dev,
	qdisc_reset(qdisc);

	spin_unlock_bh(qdisc_lock(qdisc));
	if (nolock)
	if (nolock) {
		clear_bit(__QDISC_STATE_MISSED, &qdisc->state);
		spin_unlock_bh(&qdisc->seqlock);
	}
}

static bool some_qdisc_is_busy(struct net_device *dev)
{