Commit 8728a455 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland
Browse files

fs: dlm: generic connect func



This patch adds a generic connect function for TCP and SCTP. If the
connect functionality differs from each other additional callbacks in
dlm_proto_ops were added. The sockopts callback handling will guarantee
that sockets created by connect() will use the same options as sockets
created by accept().

Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 90d21fc0
Loading
Loading
Loading
Loading
+150 −193
Original line number Diff line number Diff line
@@ -143,15 +143,17 @@ struct dlm_node_addr {
};

struct dlm_proto_ops {
	bool try_new_addr;
	const char *name;
	int proto;

	int (*connect)(struct connection *con, struct socket *sock,
		       struct sockaddr *addr, int addr_len);
	void (*sockopts)(struct socket *sock);
	int (*bind)(struct socket *sock);
	int (*listen_validate)(void);
	void (*listen_sockopts)(struct socket *sock);
	int (*listen_bind)(struct socket *sock);

	/* What to do to connect */
	void (*connect_action)(struct connection *con);
	/* What to do to shutdown */
	void (*shutdown_action)(struct connection *con);
	/* What to do to eof check */
@@ -186,9 +188,6 @@ static const struct dlm_proto_ops *dlm_proto_ops;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);

static void sctp_connect_to_sock(struct connection *con);
static void tcp_connect_to_sock(struct connection *con);

/* need to held writequeue_lock */
static struct writequeue_entry *con_next_wq(struct connection *con)
{
@@ -1151,189 +1150,6 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
	return result;
}

/* Initiate an SCTP association.
   This is a special case of send_to_sock() in that we don't yet have a
   peeled-off socket for this association, so we use the listening socket
   and add the primary IP address of the remote node.
 */
static void sctp_connect_to_sock(struct connection *con)
{
	struct sockaddr_storage daddr;
	int result;
	int addr_len;
	struct socket *sock;
	unsigned int mark;

	mutex_lock(&con->sock_mutex);

	/* Some odd races can cause double-connects, ignore them */
	if (con->retries++ > MAX_CONNECT_RETRIES)
		goto out;

	if (con->sock) {
		log_print("node %d already connected.", con->nodeid);
		goto out;
	}

	memset(&daddr, 0, sizeof(daddr));
	result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
	if (result < 0) {
		log_print("no address for nodeid %d", con->nodeid);
		goto out;
	}

	/* Create a socket to communicate with */
	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
				  SOCK_STREAM, IPPROTO_SCTP, &sock);
	if (result < 0)
		goto socket_err;

	sock_set_mark(sock->sk, mark);

	add_sock(sock, con);

	/* Bind to all addresses. */
	if (sctp_bind_addrs(con->sock, 0))
		goto bind_err;

	make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);

	log_print_ratelimited("connecting to %d", con->nodeid);

	/* Turn off Nagle's algorithm */
	sctp_sock_set_nodelay(sock->sk);

	/*
	 * Make sock->ops->connect() function return in specified time,
	 * since O_NONBLOCK argument in connect() function does not work here,
	 * then, we should restore the default value of this attribute.
	 */
	sock_set_sndtimeo(sock->sk, 5);
	result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
				   0);
	sock_set_sndtimeo(sock->sk, 0);

	if (result == -EINPROGRESS)
		result = 0;
	if (result == 0) {
		if (!test_and_set_bit(CF_CONNECTED, &con->flags))
			log_print("successful connected to node %d", con->nodeid);
		goto out;
	}

bind_err:
	con->sock = NULL;
	sock_release(sock);

socket_err:
	/*
	 * Some errors are fatal and this list might need adjusting. For other
	 * errors we try again until the max number of retries is reached.
	 */
	if (result != -EHOSTUNREACH &&
	    result != -ENETUNREACH &&
	    result != -ENETDOWN &&
	    result != -EINVAL &&
	    result != -EPROTONOSUPPORT) {
		log_print("connect %d try %d error %d", con->nodeid,
			  con->retries, result);
		mutex_unlock(&con->sock_mutex);
		msleep(1000);
		lowcomms_connect_sock(con);
		return;
	}

out:
	mutex_unlock(&con->sock_mutex);
}

/* Connect a new socket to its peer */
static void tcp_connect_to_sock(struct connection *con)
{
	struct sockaddr_storage saddr, src_addr;
	unsigned int mark;
	int addr_len;
	struct socket *sock = NULL;
	int result;

	mutex_lock(&con->sock_mutex);
	if (con->retries++ > MAX_CONNECT_RETRIES)
		goto out;

	/* Some odd races can cause double-connects, ignore them */
	if (con->sock)
		goto out;

	/* Create a socket to communicate with */
	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
				  SOCK_STREAM, IPPROTO_TCP, &sock);
	if (result < 0)
		goto out_err;

	memset(&saddr, 0, sizeof(saddr));
	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
	if (result < 0) {
		log_print("no address for nodeid %d", con->nodeid);
		goto out_err;
	}

	sock_set_mark(sock->sk, mark);

	add_sock(sock, con);

	/* Bind to our cluster-known address connecting to avoid
	   routing problems */
	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
	make_sockaddr(&src_addr, 0, &addr_len);
	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
				 addr_len);
	if (result < 0) {
		log_print("could not bind for connect: %d", result);
		/* This *may* not indicate a critical error */
	}

	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);

	log_print_ratelimited("connecting to %d", con->nodeid);

	/* Turn off Nagle's algorithm */
	tcp_sock_set_nodelay(sock->sk);

	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
				   O_NONBLOCK);
	if (result == -EINPROGRESS)
		result = 0;
	if (result == 0)
		goto out;

out_err:
	if (con->sock) {
		sock_release(con->sock);
		con->sock = NULL;
	} else if (sock) {
		sock_release(sock);
	}
	/*
	 * Some errors are fatal and this list might need adjusting. For other
	 * errors we try again until the max number of retries is reached.
	 */
	if (result != -EHOSTUNREACH &&
	    result != -ENETUNREACH &&
	    result != -ENETDOWN && 
	    result != -EINVAL &&
	    result != -EPROTONOSUPPORT) {
		log_print("connect %d try %d error %d", con->nodeid,
			  con->retries, result);
		mutex_unlock(&con->sock_mutex);
		msleep(1000);
		lowcomms_connect_sock(con);
		return;
	}
out:
	mutex_unlock(&con->sock_mutex);
	return;
}

/* Get local addresses */
static void init_local(void)
{
@@ -1687,6 +1503,74 @@ static void process_listen_recv_socket(struct work_struct *work)
	accept_from_sock(&listen_con);
}

static void dlm_connect(struct connection *con)
{
	struct sockaddr_storage addr;
	int result, addr_len;
	struct socket *sock;
	unsigned int mark;

	/* Some odd races can cause double-connects, ignore them */
	if (con->retries++ > MAX_CONNECT_RETRIES)
		return;

	if (con->sock) {
		log_print("node %d already connected.", con->nodeid);
		return;
	}

	memset(&addr, 0, sizeof(addr));
	result = nodeid_to_addr(con->nodeid, &addr, NULL,
				dlm_proto_ops->try_new_addr, &mark);
	if (result < 0) {
		log_print("no address for nodeid %d", con->nodeid);
		return;
	}

	/* Create a socket to communicate with */
	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
	if (result < 0)
		goto socket_err;

	sock_set_mark(sock->sk, mark);
	dlm_proto_ops->sockopts(sock);

	add_sock(sock, con);

	result = dlm_proto_ops->bind(sock);
	if (result < 0)
		goto add_sock_err;

	log_print_ratelimited("connecting to %d", con->nodeid);
	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
					addr_len);
	if (result < 0)
		goto add_sock_err;

	return;

add_sock_err:
	dlm_close_sock(&con->sock);

socket_err:
	/*
	 * Some errors are fatal and this list might need adjusting. For other
	 * errors we try again until the max number of retries is reached.
	 */
	if (result != -EHOSTUNREACH &&
	    result != -ENETUNREACH &&
	    result != -ENETDOWN &&
	    result != -EINVAL &&
	    result != -EPROTONOSUPPORT) {
		log_print("connect %d try %d error %d", con->nodeid,
			  con->retries, result);
		msleep(1000);
		lowcomms_connect_sock(con);
	}
}

/* Send workqueue function */
static void process_send_sockets(struct work_struct *work)
{
@@ -1701,11 +1585,15 @@ static void process_send_sockets(struct work_struct *work)
		dlm_midcomms_unack_msg_resend(con->nodeid);
	}

	if (con->sock == NULL) { /* not mutex protected so check it inside too */
	if (con->sock == NULL) {
		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
			msleep(1000);
		dlm_proto_ops->connect_action(con);

		mutex_lock(&con->sock_mutex);
		dlm_connect(con);
		mutex_unlock(&con->sock_mutex);
	}

	if (!list_empty(&con->writequeue))
		send_to_sock(con);
}
@@ -1899,6 +1787,43 @@ static int dlm_listen_for_all(void)
	return result;
}

static int dlm_tcp_bind(struct socket *sock)
{
	struct sockaddr_storage src_addr;
	int result, addr_len;

	/* Bind to our cluster-known address connecting to avoid
	 * routing problems.
	 */
	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
	make_sockaddr(&src_addr, 0, &addr_len);

	result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
				 addr_len);
	if (result < 0) {
		/* This *may* not indicate a critical error */
		log_print("could not bind for connect: %d", result);
	}

	return 0;
}

static int dlm_tcp_connect(struct connection *con, struct socket *sock,
			   struct sockaddr *addr, int addr_len)
{
	int ret;

	ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
	switch (ret) {
	case -EINPROGRESS:
		fallthrough;
	case 0:
		return 0;
	}

	return ret;
}

static int dlm_tcp_listen_validate(void)
{
	/* We don't support multi-homed hosts */
@@ -1935,14 +1860,43 @@ static int dlm_tcp_listen_bind(struct socket *sock)
static const struct dlm_proto_ops dlm_tcp_ops = {
	.name = "TCP",
	.proto = IPPROTO_TCP,
	.connect = dlm_tcp_connect,
	.sockopts = dlm_tcp_sockopts,
	.bind = dlm_tcp_bind,
	.listen_validate = dlm_tcp_listen_validate,
	.listen_sockopts = dlm_tcp_listen_sockopts,
	.listen_bind = dlm_tcp_listen_bind,
	.connect_action = tcp_connect_to_sock,
	.shutdown_action = dlm_tcp_shutdown,
	.eof_condition = tcp_eof_condition,
};

static int dlm_sctp_bind(struct socket *sock)
{
	return sctp_bind_addrs(sock, 0);
}

static int dlm_sctp_connect(struct connection *con, struct socket *sock,
			    struct sockaddr *addr, int addr_len)
{
	int ret;

	/*
	 * Make sock->ops->connect() function return in specified time,
	 * since O_NONBLOCK argument in connect() function does not work here,
	 * then, we should restore the default value of this attribute.
	 */
	sock_set_sndtimeo(sock->sk, 5);
	ret = sock->ops->connect(sock, addr, addr_len, 0);
	sock_set_sndtimeo(sock->sk, 0);
	if (ret < 0)
		return ret;

	if (!test_and_set_bit(CF_CONNECTED, &con->flags))
		log_print("successful connected to node %d", con->nodeid);

	return 0;
}

static int dlm_sctp_listen_validate(void)
{
	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
@@ -1969,10 +1923,13 @@ static void dlm_sctp_sockopts(struct socket *sock)
static const struct dlm_proto_ops dlm_sctp_ops = {
	.name = "SCTP",
	.proto = IPPROTO_SCTP,
	.try_new_addr = true,
	.connect = dlm_sctp_connect,
	.sockopts = dlm_sctp_sockopts,
	.bind = dlm_sctp_bind,
	.listen_validate = dlm_sctp_listen_validate,
	.listen_sockopts = dlm_sctp_sockopts,
	.listen_bind = dlm_sctp_bind_listen,
	.connect_action = sctp_connect_to_sock,
};

int dlm_lowcomms_start(void)