Commit 9866bcd6 authored by Maciej Fijalkowski's avatar Maciej Fijalkowski Committed by Alexei Starovoitov
Browse files

selftests: xsk: Split worker thread



Let's a have a separate Tx/Rx worker threads instead of a one common
thread packed with Tx/Rx specific checks.

Move mmap for umem buffer space and a switch_namespace() call to
thread_common_ops.

This also allows for a bunch of simplifactions that are the subject of
the next commits. The final result will be a code base that is much
easier to follow.

Signed-off-by: default avatarMaciej Fijalkowski <maciej.fijalkowski@intel.com>
Signed-off-by: default avatarAlexei Starovoitov <ast@kernel.org>
Link: https://lore.kernel.org/bpf/20210329224316.17793-10-maciej.fijalkowski@intel.com
parent ef928078
Loading
Loading
Loading
Loading
+77 −79
Original line number Diff line number Diff line
@@ -760,6 +760,15 @@ static void thread_common_ops(struct ifobject *ifobject, void *bufs, pthread_mut
	int ctr = 0;
	int ret;

	pthread_attr_setstacksize(&attr, THREAD_STACK);

	bufs = mmap(NULL, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE,
		    PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
	if (bufs == MAP_FAILED)
		exit_with_error(errno);

	ifobject->ns_fd = switch_namespace(ifobject->nsname);

	xsk_configure_umem(ifobject, bufs, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE);
	ret = xsk_configure_socket(ifobject);

@@ -782,9 +791,12 @@ static void thread_common_ops(struct ifobject *ifobject, void *bufs, pthread_mut

	if (ctr >= SOCK_RECONF_CTR)
		exit_with_error(ret);

	print_verbose("Interface [%s] vector [%s]\n",
		      ifobject->ifname, ifobject->fv.vector == tx ? "Tx" : "Rx");
}

static void *worker_testapp_validate(void *arg)
static void *worker_testapp_validate_tx(void *arg)
{
	struct udphdr *udp_hdr =
	    (struct udphdr *)(pkt_data + sizeof(struct ethhdr) + sizeof(struct iphdr));
@@ -792,21 +804,8 @@ static void *worker_testapp_validate(void *arg)
	struct ethhdr *eth_hdr = (struct ethhdr *)pkt_data;
	struct ifobject *ifobject = (struct ifobject *)arg;
	struct generic_data data;
	void *bufs = NULL;

	pthread_attr_setstacksize(&attr, THREAD_STACK);

	if (!bidi_pass) {
		bufs = mmap(NULL, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE,
			    PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
		if (bufs == MAP_FAILED)
			exit_with_error(errno);

		ifobject->ns_fd = switch_namespace(ifobject->nsname);
	}

	if (ifobject->fv.vector == tx) {
	int spinningrxctr = 0;
	void *bufs = NULL;

	if (!bidi_pass)
		thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_tx);
@@ -816,7 +815,6 @@ static void *worker_testapp_validate(void *arg)
		usleep(USLEEP_MAX);
	}

		print_verbose("Interface [%s] vector [Tx]\n", ifobject->ifname);
	for (int i = 0; i < num_frames; i++) {
		/*send EOT frame */
		if (i == (num_frames - 1))
@@ -833,14 +831,23 @@ static void *worker_testapp_validate(void *arg)
	print_verbose("Sending %d packets on interface %s\n",
		      (opt_pkt_count - 1), ifobject->ifname);
	tx_only_all(ifobject);
	} else if (ifobject->fv.vector == rx) {

	if (test_type != TEST_TYPE_BIDI || bidi_pass) {
		xsk_socket__delete(ifobject->xsk->xsk);
		(void)xsk_umem__delete(ifobject->umem->umem);
	}
	pthread_exit(NULL);
}

static void *worker_testapp_validate_rx(void *arg)
{
	struct ifobject *ifobject = (struct ifobject *)arg;
	struct pollfd fds[MAX_SOCKS] = { };
		int ret;
	void *bufs = NULL;

	if (!bidi_pass)
		thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_rx);

		print_verbose("Interface [%s] vector [Rx]\n", ifobject->ifname);
	if (stat_test_type != STAT_TEST_RX_FILL_EMPTY)
		xsk_populate_fill_ring(ifobject->umem);

@@ -859,30 +866,21 @@ static void *worker_testapp_validate(void *arg)
	pthread_mutex_unlock(&sync_mutex);

	while (1) {
			if (test_type == TEST_TYPE_POLL) {
				ret = poll(fds, 1, POLL_TMOUT);
				if (ret <= 0)
					continue;
			}

		if (test_type != TEST_TYPE_STATS) {
			rx_pkt(ifobject->xsk, fds);
			worker_pkt_validate();
		} else {
			worker_stats_validate(ifobject);
		}

		if (sigvar)
			break;
	}

		if (test_type != TEST_TYPE_STATS)
	print_verbose("Received %d packets on interface %s\n",
		      pkt_counter, ifobject->ifname);

	if (test_type == TEST_TYPE_TEARDOWN)
		print_verbose("Destroying socket\n");
	}

	if ((test_type != TEST_TYPE_BIDI) || bidi_pass) {
		xsk_socket__delete(ifobject->xsk->xsk);
@@ -911,12 +909,12 @@ static void testapp_validate(void)

	/*Spawn RX thread */
	if (!bidi || !bidi_pass) {
		if (pthread_create(&t0, &attr, worker_testapp_validate, ifdict[1]))
		if (pthread_create(&t0, &attr, worker_testapp_validate_rx, ifdict[1]))
			exit_with_error(errno);
	} else if (bidi && bidi_pass) {
		/*switch Tx/Rx vectors */
		ifdict[0]->fv.vector = rx;
		if (pthread_create(&t0, &attr, worker_testapp_validate, ifdict[0]))
		if (pthread_create(&t0, &attr, worker_testapp_validate_rx, ifdict[0]))
			exit_with_error(errno);
	}

@@ -931,12 +929,12 @@ static void testapp_validate(void)

	/*Spawn TX thread */
	if (!bidi || !bidi_pass) {
		if (pthread_create(&t1, &attr, worker_testapp_validate, ifdict[0]))
		if (pthread_create(&t1, &attr, worker_testapp_validate_tx, ifdict[0]))
			exit_with_error(errno);
	} else if (bidi && bidi_pass) {
		/*switch Tx/Rx vectors */
		ifdict[1]->fv.vector = tx;
		if (pthread_create(&t1, &attr, worker_testapp_validate, ifdict[1]))
		if (pthread_create(&t1, &attr, worker_testapp_validate_tx, ifdict[1]))
			exit_with_error(errno);
	}