Skip to content
Snippets Groups Projects
socket.c 105 KiB
Newer Older
				   int dlen, long timeout)
{
	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
	struct sock *sk = sock->sk;
	struct tipc_sock *tsk = tipc_sk(sk);
	struct list_head *cong_links = &tsk->cong_links;
	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
	struct tipc_msg *hdr = &tsk->phdr;
	struct tipc_member *first = NULL;
	struct tipc_member *mbr = NULL;
	struct net *net = sock_net(sk);
	u32 node, port, exclude;
	struct list_head dsts;
	int lookups = 0;
	int dstcnt, rc;
	bool cong;

	INIT_LIST_HEAD(&dsts);
	ua->sa.type = msg_nametype(hdr);
	ua->scope = msg_lookup_scope(hdr);

	while (++lookups < 4) {
		exclude = tipc_group_exclude(tsk->group);

		first = NULL;

		/* Look for a non-congested destination member, if any */
		while (1) {
			if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt,
						       exclude, false))
				return -EHOSTUNREACH;
			tipc_dest_pop(&dsts, &node, &port);
			cong = tipc_group_cong(tsk->group, node, port, blks,
					       &mbr);
			if (!cong)
				break;
			if (mbr == first)
				break;
			if (!first)
				first = mbr;
		}

		/* Start over if destination was not in member list */
		if (unlikely(!mbr))
			continue;

		if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
			break;

		/* Block or return if destination link or member is congested */
		rc = tipc_wait_for_cond(sock, &timeout,
					!tipc_dest_find(cong_links, node, 0) &&
					tsk->group &&
					!tipc_group_cong(tsk->group, node, port,
							 blks, &mbr));
		if (unlikely(rc))
			return rc;

		/* Send, unless destination disappeared while waiting */
		if (likely(mbr))
			break;
	}

	if (unlikely(lookups >= 4))
		return -EHOSTUNREACH;

	rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);

	return rc ? rc : dlen;
}

/**
 * tipc_send_group_bcast - send message to all members in communication group
Andrew Lunn's avatar
Andrew Lunn committed
 * @sock: socket structure
 * @m: message to send
 * @dlen: total length of message data
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */
static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
				 int dlen, long timeout)
{
	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
	struct sock *sk = sock->sk;
	struct net *net = sock_net(sk);
	struct tipc_sock *tsk = tipc_sk(sk);
	struct tipc_nlist *dsts;
	struct tipc_mc_method *method = &tsk->mc_method;
	bool ack = method->mandatory && method->rcast;
	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
	struct tipc_msg *hdr = &tsk->phdr;
	int mtu = tipc_bcast_get_mtu(net);
	struct sk_buff_head pkts;
	int rc = -EHOSTUNREACH;

	/* Block or return if any destination link or member is congested */
	rc = tipc_wait_for_cond(sock, &timeout,
				!tsk->cong_link_cnt && tsk->group &&
				!tipc_group_bc_cong(tsk->group, blks));
	if (unlikely(rc))
		return rc;

	dsts = tipc_group_dests(tsk->group);
	if (!dsts->local && !dsts->remote)
		return -EHOSTUNREACH;

	/* Complete message header */
		msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
		msg_set_nameinst(hdr, ua->sa.instance);
	} else {
		msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
		msg_set_nameinst(hdr, 0);
	}
	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
	msg_set_destport(hdr, 0);
	msg_set_destnode(hdr, 0);
	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
	/* Avoid getting stuck with repeated forced replicasts */
	msg_set_grp_bc_ack_req(hdr, ack);

	/* Build message as chain of buffers */
	__skb_queue_head_init(&pkts);
	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
	if (unlikely(rc != dlen))
		return rc;

	/* Send message */
	rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
	if (unlikely(rc))
		return rc;

	/* Update broadcast sequence number and send windows */
	tipc_group_update_bc_members(tsk->group, blks, ack);

	/* Broadcast link is now free to choose method for next broadcast */
	method->mandatory = false;
	method->expires = jiffies;

/**
 * tipc_send_group_mcast - send message to all members with given identity
 * @sock: socket structure
 * @m: message to send
 * @dlen: total length of message data
 * @timeout: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Return: the number of bytes sent on success, or errno
 */
static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
				 int dlen, long timeout)
{
	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
	struct sock *sk = sock->sk;
	struct tipc_sock *tsk = tipc_sk(sk);
	struct tipc_group *grp = tsk->group;
	struct tipc_msg *hdr = &tsk->phdr;
	struct net *net = sock_net(sk);
	struct list_head dsts;

	INIT_LIST_HEAD(&dsts);
	ua->sa.type = msg_nametype(hdr);
	ua->scope = msg_lookup_scope(hdr);
	exclude = tipc_group_exclude(grp);
	if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt, exclude, true))
		return -EHOSTUNREACH;

	if (dstcnt == 1) {
		tipc_dest_pop(&dsts, &ua->sk.node, &ua->sk.ref);
		return tipc_send_group_unicast(sock, m, dlen, timeout);
	}

	tipc_dest_list_purge(&dsts);
	return tipc_send_group_bcast(sock, m, dlen, timeout);
}

/**
 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
 * @net: the associated network namespace
 * @arrvq: queue with arriving messages, to be cloned after destination lookup
 * @inputq: queue with cloned messages, delivered to socket after dest lookup
 *
 * Multi-threaded: parallel calls with reference to same queues may occur
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
		       struct sk_buff_head *inputq)
	u32 self = tipc_own_addr(net);
	struct sk_buff *skb, *_skb;
	struct sk_buff_head tmpq;
	struct list_head dports;
	struct tipc_msg *hdr;
	int user, mtyp, hlen;
	__skb_queue_head_init(&tmpq);
	INIT_LIST_HEAD(&dports);
	ua.addrtype = TIPC_SERVICE_RANGE;
	/* tipc_skb_peek() increments the head skb's reference counter */
	skb = tipc_skb_peek(arrvq, &inputq->lock);
	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
		hdr = buf_msg(skb);
		user = msg_user(hdr);
		mtyp = msg_type(hdr);
		hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
		onode = msg_orignode(hdr);
		ua.sr.type = msg_nametype(hdr);
		ua.sr.lower = msg_namelower(hdr);
		ua.sr.upper = msg_nameupper(hdr);
		if (onode == self)
			ua.scope = TIPC_ANY_SCOPE;
		else
			ua.scope = TIPC_CLUSTER_SCOPE;
		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
			spin_lock_bh(&inputq->lock);
			if (skb_peek(arrvq) == skb) {
				__skb_dequeue(arrvq);
				__skb_queue_tail(inputq, skb);
			}
			kfree_skb(skb);
			spin_unlock_bh(&inputq->lock);
			continue;
		}

		/* Group messages require exact scope match */
		if (msg_in_group(hdr)) {
			ua.sr.lower = 0;
			ua.sr.upper = ~0;
			ua.scope = msg_lookup_scope(hdr);

		/* Create destination port list: */
		tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);

		/* Clone message per destination */
		while (tipc_dest_pop(&dports, NULL, &portid)) {
			_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
			if (_skb) {
				msg_set_destport(buf_msg(_skb), portid);
				__skb_queue_tail(&tmpq, _skb);
				continue;
			}
			pr_warn("Failed to clone mcast rcv buffer\n");
		/* Append clones to inputq only if skb is still head of arrvq */
		spin_lock_bh(&inputq->lock);
		if (skb_peek(arrvq) == skb) {
			skb_queue_splice_tail_init(&tmpq, inputq);
			kfree_skb(__skb_dequeue(arrvq));
		}
		spin_unlock_bh(&inputq->lock);
		__skb_queue_purge(&tmpq);
		kfree_skb(skb);
	tipc_sk_rcv(net, inputq);
/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
 *                         when socket is in Nagle mode
 */
static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
{
	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
	struct sk_buff *skb = skb_peek_tail(txq);
	struct net *net = sock_net(&tsk->sk);
	u32 dnode = tsk_peer_node(tsk);
	int rc;

	if (nagle_ack) {
		tsk->pkt_cnt += skb_queue_len(txq);
		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
			tsk->oneway = 0;
			if (tsk->nagle_start < NAGLE_START_MAX)
				tsk->nagle_start *= 2;
			tsk->expect_ack = false;
			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
				 tsk->nagle_start);
		} else {
			tsk->nagle_start = NAGLE_START_INIT;
			if (skb) {
				msg_set_ack_required(buf_msg(skb));
				tsk->expect_ack = true;
			} else {
				tsk->expect_ack = false;
			}
		}
		tsk->msg_acc = 0;
		tsk->pkt_cnt = 0;
	}

	if (!skb || tsk->cong_link_cnt)
		return;

	/* Do not send SYN again after congestion */
	if (msg_is_syn(buf_msg(skb)))
	if (tsk->msg_acc)
		tsk->pkt_cnt += skb_queue_len(txq);
	tsk->snt_unacked += tsk->snd_backlog;
	tsk->snd_backlog = 0;
	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
	if (rc == -ELINKCONG)
		tsk->cong_link_cnt = 1;
}

 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
 * @tsk: receiving socket
 * @skb: pointer to message buffer.
 * @inputq: buffer list containing the buffers
 * @xmitq: output message area
static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
				   struct sk_buff_head *xmitq)
	struct tipc_msg *hdr = buf_msg(skb);
	u32 onode = tsk_own_node(tsk);
	struct sock *sk = &tsk->sk;
	int mtyp = msg_type(hdr);
	bool was_cong;
	/* Ignore if connection cannot be validated: */
	if (!tsk_peer_msg(tsk, hdr)) {
		trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
	if (unlikely(msg_errcode(hdr))) {
		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
		tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
				      tsk_peer_port(tsk));
		sk->sk_state_change(sk);

		/* State change is ignored if socket already awake,
		 * - convert msg to abort msg and add to inqueue
		 */
		msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
		msg_set_type(hdr, TIPC_CONN_MSG);
		msg_set_size(hdr, BASIC_H_SIZE);
		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
		__skb_queue_tail(inputq, skb);
		return;
	tsk->probe_unacked = false;
	if (mtyp == CONN_PROBE) {
		msg_set_type(hdr, CONN_PROBE_REPLY);
		if (tipc_msg_reverse(onode, &skb, TIPC_OK))
			__skb_queue_tail(xmitq, skb);
		return;
	} else if (mtyp == CONN_ACK) {
		was_cong = tsk_conn_cong(tsk);
		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
		tsk->snt_unacked -= msg_conn_ack(hdr);
		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
			tsk->snd_win = msg_adv_win(hdr);
		if (was_cong && !tsk_conn_cong(tsk))
			sk->sk_write_space(sk);
	} else if (mtyp != CONN_PROBE_REPLY) {
		pr_warn("Received unknown CONN_PROTO msg\n");
Per Liden's avatar
Per Liden committed
/**
 * tipc_sendmsg - send message in connectionless manner
Per Liden's avatar
Per Liden committed
 * @sock: socket structure
 * @m: message to send
 * @dsz: amount of user data to be sent
Per Liden's avatar
Per Liden committed
 * Message must have an destination specified explicitly.
 * Used for SOCK_RDM and SOCK_DGRAM messages,
Per Liden's avatar
Per Liden committed
 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 * Return: the number of bytes sent on success, or errno otherwise
Per Liden's avatar
Per Liden committed
 */
static int tipc_sendmsg(struct socket *sock,
{
	struct sock *sk = sock->sk;
	int ret;

	lock_sock(sk);
	ret = __tipc_sendmsg(sock, m, dsz);
	release_sock(sk);

	return ret;
}

static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
Per Liden's avatar
Per Liden committed
{
	struct sock *sk = sock->sk;
	struct net *net = sock_net(sk);
	struct tipc_sock *tsk = tipc_sk(sk);
	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
	struct list_head *clinks = &tsk->cong_links;
	bool syn = !tipc_sk_type_connectionless(sk);
	struct tipc_group *grp = tsk->group;
	struct tipc_msg *hdr = &tsk->phdr;
	struct tipc_socket_addr skaddr;
Per Liden's avatar
Per Liden committed

	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
	if (ua) {
		if (!tipc_uaddr_valid(ua, m->msg_namelen))
			return -EINVAL;
		atype = ua->addrtype;
	/* If socket belongs to a communication group follow other paths */
			return tipc_send_group_bcast(sock, m, dlen, timeout);
		if (atype == TIPC_SERVICE_ADDR)
			return tipc_send_group_anycast(sock, m, dlen, timeout);
		if (atype == TIPC_SOCKET_ADDR)
			return tipc_send_group_unicast(sock, m, dlen, timeout);
		if (atype == TIPC_SERVICE_RANGE)
			return tipc_send_group_mcast(sock, m, dlen, timeout);
		return -EINVAL;
	}
	if (!ua) {
		ua = (struct tipc_uaddr *)&tsk->peer;
		if (!syn && ua->family != AF_TIPC)
			return -EDESTADDRREQ;
		atype = ua->addrtype;
		if (sk->sk_state == TIPC_LISTEN)
			return -EPIPE;
		if (sk->sk_state != TIPC_OPEN)
			return -EISCONN;
		if (tsk->published)
			return -EOPNOTSUPP;
		if (atype == TIPC_SERVICE_ADDR)
			tsk->conn_addrtype = atype;
		msg_set_syn(hdr, 1);
Per Liden's avatar
Per Liden committed
	}
	memset(&skaddr, 0, sizeof(skaddr));

	/* Determine destination */
	if (atype == TIPC_SERVICE_RANGE) {
		return tipc_sendmcast(sock, ua, m, dlen, timeout);
	} else if (atype == TIPC_SERVICE_ADDR) {
		skaddr.node = ua->lookup_node;
		ua->scope = tipc_node2scope(skaddr.node);
		if (!tipc_nametbl_lookup_anycast(net, ua, &skaddr))
			return -EHOSTUNREACH;
	} else if (atype == TIPC_SOCKET_ADDR) {
		skaddr = ua->sk;
	} else {
		return -EINVAL;
	/* Block or return if destination link is congested */
	rc = tipc_wait_for_cond(sock, &timeout,
				!tipc_dest_find(clinks, skaddr.node, 0));
	/* Finally build message header */
	msg_set_destnode(hdr, skaddr.node);
	msg_set_destport(hdr, skaddr.ref);
	if (atype == TIPC_SERVICE_ADDR) {
		msg_set_type(hdr, TIPC_NAMED_MSG);
		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
		msg_set_nametype(hdr, ua->sa.type);
		msg_set_nameinst(hdr, ua->sa.instance);
		msg_set_lookup_scope(hdr, ua->scope);
	} else { /* TIPC_SOCKET_ADDR */
		msg_set_type(hdr, TIPC_DIRECT_MSG);
		msg_set_lookup_scope(hdr, 0);
		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
	}

	/* Add message body */
	__skb_queue_head_init(&pkts);
	mtu = tipc_node_get_mtu(net, skaddr.node, tsk->portid, true);
	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
	if (unlikely(rc != dlen))
	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
		__skb_queue_purge(&pkts);
	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
	rc = tipc_node_xmit(net, &pkts, skaddr.node, tsk->portid);
	if (unlikely(rc == -ELINKCONG)) {
		tipc_dest_push(clinks, skaddr.node, 0);
	if (unlikely(syn && !rc)) {
		tipc_set_sk_state(sk, TIPC_CONNECTING);
		if (dlen && timeout) {
			timeout = msecs_to_jiffies(timeout);
			tipc_wait_for_connect(sock, &timeout);
		}
	}
 * tipc_sendstream - send stream-oriented data
Per Liden's avatar
Per Liden committed
 * @sock: socket structure
 * @m: data to send
 * @dsz: total length of data to be transmitted
 * Used for SOCK_STREAM data.
 * Return: the number of bytes sent on success (or partial success),
Per Liden's avatar
Per Liden committed
 */
static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
{
	struct sock *sk = sock->sk;
	int ret;

	lock_sock(sk);
	ret = __tipc_sendstream(sock, m, dsz);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
Per Liden's avatar
Per Liden committed
{
	struct sock *sk = sock->sk;
	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
	struct sk_buff_head *txq = &sk->sk_write_queue;
	struct tipc_sock *tsk = tipc_sk(sk);
	struct tipc_msg *hdr = &tsk->phdr;
	struct net *net = sock_net(sk);
	struct sk_buff *skb;
	u32 dnode = tsk_peer_node(tsk);
	int maxnagle = tsk->maxnagle;
	int maxpkt = tsk->max_pkt;
	int blocks, rc = 0;
	if (unlikely(dlen > INT_MAX))
		return -EMSGSIZE;
	/* Handle implicit connection setup */
	if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
		rc = __tipc_sendmsg(sock, m, dlen);
		if (dlen && dlen == rc) {
			tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
		rc = tipc_wait_for_cond(sock, &timeout,
					(!tsk->cong_link_cnt &&
					 !tsk_conn_cong(tsk) &&
					 tipc_sk_connected(sk)));
		if (unlikely(rc))
			break;
		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
		blocks = tsk->snd_backlog;
		if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
		    send <= maxnagle) {
			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
			if (unlikely(rc < 0))
				break;
			blocks += rc;
			if (blocks <= 64 && tsk->expect_ack) {
				tsk->snd_backlog = blocks;
				sent += send;
				break;
			} else if (blocks > 64) {
				tsk->pkt_cnt += skb_queue_len(txq);
			} else {
				skb = skb_peek_tail(txq);
				if (skb) {
					msg_set_ack_required(buf_msg(skb));
					tsk->expect_ack = true;
				} else {
					tsk->expect_ack = false;
				}
				tsk->msg_acc = 0;
				tsk->pkt_cnt = 0;
			}
		} else {
			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
			if (unlikely(rc != send))
				break;
			blocks += tsk_inc(tsk, send + MIN_H_SIZE);
		}
		trace_tipc_sk_sendstream(sk, skb_peek(txq),
					 TIPC_DUMP_SK_SNDQ, " ");
		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
		if (unlikely(rc == -ELINKCONG)) {
			tsk->cong_link_cnt = 1;
			rc = 0;
		}
		if (likely(!rc)) {
			tsk->snt_unacked += blocks;
			tsk->snd_backlog = 0;
			sent += send;
		}
	} while (sent < dlen && !rc);
 * tipc_send_packet - send a connection-oriented message
Per Liden's avatar
Per Liden committed
 * @sock: socket structure
 * @m: message to send
 * @dsz: length of data to be transmitted
 * Used for SOCK_SEQPACKET messages.
 * Return: the number of bytes sent on success, or errno otherwise
Per Liden's avatar
Per Liden committed
 */
static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
Per Liden's avatar
Per Liden committed
{
	if (dsz > TIPC_MAX_USER_MSG_SIZE)
		return -EMSGSIZE;
Per Liden's avatar
Per Liden committed

	return tipc_sendstream(sock, m, dsz);
/* tipc_sk_finish_conn - complete the setup of a connection
Per Liden's avatar
Per Liden committed
 */
static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
Per Liden's avatar
Per Liden committed
{
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
	struct tipc_msg *msg = &tsk->phdr;
Per Liden's avatar
Per Liden committed

	msg_set_syn(msg, 0);
	msg_set_destnode(msg, peer_node);
	msg_set_destport(msg, peer_port);
	msg_set_type(msg, TIPC_CONN_MSG);
	msg_set_lookup_scope(msg, 0);
	msg_set_hdr_sz(msg, SHORT_H_SIZE);
	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
	tsk_set_nagle(tsk);
	__skb_queue_purge(&sk->sk_write_queue);
	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
		return;

	/* Fall back to message based flow control */
	tsk->rcv_win = FLOWCTL_MSG_WIN;
	tsk->snd_win = FLOWCTL_MSG_WIN;
 * tipc_sk_set_orig_addr - capture sender's address for received message
Per Liden's avatar
Per Liden committed
 * @m: descriptor for message info
Andrew Lunn's avatar
Andrew Lunn committed
 * @skb: received message
Per Liden's avatar
Per Liden committed
 * Note: Address is not captured if not requested by receiver.
 */
static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
Per Liden's avatar
Per Liden committed
{
	DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
	struct tipc_msg *hdr = buf_msg(skb);

	if (!srcaddr)
		return;

	srcaddr->sock.family = AF_TIPC;
	srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
	srcaddr->sock.scope = 0;
	srcaddr->sock.addr.id.ref = msg_origport(hdr);
	srcaddr->sock.addr.id.node = msg_orignode(hdr);
	srcaddr->sock.addr.name.domain = 0;
	m->msg_namelen = sizeof(struct sockaddr_tipc);

	if (!msg_in_group(hdr))
		return;

	/* Group message users may also want to know sending member's id */
	srcaddr->member.family = AF_TIPC;
	srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
	srcaddr->member.scope = 0;
	srcaddr->member.addr.name.name.type = msg_nametype(hdr);
	srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
	srcaddr->member.addr.name.domain = 0;
	m->msg_namelen = sizeof(*srcaddr);
 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
Per Liden's avatar
Per Liden committed
 * @m: descriptor for message info
 * @skb: received message buffer
 * @tsk: TIPC port associated with message
Per Liden's avatar
Per Liden committed
 * Note: Ancillary data is not captured if not requested by receiver.
 * Return: 0 if successful, otherwise errno
Per Liden's avatar
Per Liden committed
 */
static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
				 struct tipc_sock *tsk)
Per Liden's avatar
Per Liden committed
{
	struct tipc_msg *hdr;
	u32 data[3] = {0,};
	bool has_addr;
	int dlen, rc;
Per Liden's avatar
Per Liden committed

	if (likely(m->msg_controllen == 0))
		return 0;

	hdr = buf_msg(skb);
	dlen = msg_data_sz(hdr);

	/* Capture errored message object, if any */
	if (msg_errcode(hdr)) {
		if (skb_linearize(skb))
			return -ENOMEM;
		hdr = buf_msg(skb);
		data[0] = msg_errcode(hdr);
		data[1] = dlen;
		rc = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, data);
		if (rc || !dlen)
			return rc;
		rc = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, dlen, msg_data(hdr));
		if (rc)
			return rc;
	/* Capture TIPC_SERVICE_ADDR/RANGE destination address, if any */
	switch (msg_type(hdr)) {
Per Liden's avatar
Per Liden committed
	case TIPC_NAMED_MSG:
		has_addr = true;
		data[0] = msg_nametype(hdr);
		data[1] = msg_namelower(hdr);
		data[2] = data[1];
Per Liden's avatar
Per Liden committed
		break;
	case TIPC_MCAST_MSG:
		has_addr = true;
		data[0] = msg_nametype(hdr);
		data[1] = msg_namelower(hdr);
		data[2] = msg_nameupper(hdr);
Per Liden's avatar
Per Liden committed
		break;
	case TIPC_CONN_MSG:
		has_addr = !!tsk->conn_addrtype;
		data[0] = msg_nametype(&tsk->phdr);
		data[1] = msg_nameinst(&tsk->phdr);
		data[2] = data[1];
Per Liden's avatar
Per Liden committed
		break;
	default:
		has_addr = false;
	if (!has_addr)
		return 0;
	return put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, data);
static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
	struct sock *sk = &tsk->sk;
	struct tipc_msg *msg;
	u32 peer_port = tsk_peer_port(tsk);
	u32 dnode = tsk_peer_node(tsk);
	if (!tipc_sk_connected(sk))
	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
			      dnode, tsk_own_node(tsk), peer_port,
			      tsk->portid, TIPC_OK);
	msg_set_conn_ack(msg, tsk->rcv_unacked);
	tsk->rcv_unacked = 0;

	/* Adjust to and advertize the correct window limit */
	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
		msg_set_adv_win(msg, tsk->rcv_win);
	}
	return skb;
}

static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
	struct sk_buff *skb;

	skb = tipc_sk_build_ack(tsk);
	if (!skb)
		return;

	tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
			   msg_link_selector(buf_msg(skb)));
static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT_FUNC(wait, woken_wake_function);
		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
			if (sk->sk_shutdown & RCV_SHUTDOWN) {
				err = -ENOTCONN;
				break;
			}
			add_wait_queue(sk_sleep(sk), &wait);
			release_sock(sk);
			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
			sched_annotate_sleep();
			lock_sock(sk);
			remove_wait_queue(sk_sleep(sk), &wait);
		}
		err = 0;
		if (!skb_queue_empty(&sk->sk_receive_queue))
			break;
		err = -EAGAIN;
		if (!timeo)
			break;
		err = sock_intr_errno(timeo);
		if (signal_pending(current))
			break;
 * tipc_recvmsg - receive packet-oriented message
 * @sock: network socket
Per Liden's avatar
Per Liden committed
 * @m: descriptor for message info
 * @buflen: length of user buffer area
Per Liden's avatar
Per Liden committed
 * @flags: receive flags
Per Liden's avatar
Per Liden committed
 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 * If the complete message doesn't fit in user area, truncate it.
 *
 * Return: size of returned message data, errno otherwise
Per Liden's avatar
Per Liden committed
 */
static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
			size_t buflen,	int flags)
Per Liden's avatar
Per Liden committed
{
	struct sock *sk = sock->sk;
	bool connected = !tipc_sk_type_connectionless(sk);
	struct tipc_sock *tsk = tipc_sk(sk);
	int rc, err, hlen, dlen, copy;
	struct tipc_skb_cb *skb_cb;
	struct sk_buff_head xmitq;
	struct tipc_msg *hdr;
	struct sk_buff *skb;
	bool grp_evt;
Per Liden's avatar
Per Liden committed

	/* Catch invalid receive requests */
	if (unlikely(!buflen))
Per Liden's avatar
Per Liden committed
		return -EINVAL;

	if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
		rc = -ENOTCONN;
Per Liden's avatar
Per Liden committed
		goto exit;
	}
	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
Per Liden's avatar
Per Liden committed

	/* Step rcv queue to first msg with data or error; wait if necessary */
	do {
		rc = tipc_wait_for_rcvmsg(sock, &timeout);
		if (unlikely(rc))
			goto exit;
		skb = skb_peek(&sk->sk_receive_queue);
		skb_cb = TIPC_SKB_CB(skb);
		hdr = buf_msg(skb);
		dlen = msg_data_sz(hdr);
		hlen = msg_hdr_sz(hdr);
		err = msg_errcode(hdr);
		grp_evt = msg_is_grp_evt(hdr);
		if (likely(dlen || err))
			break;
		tsk_advance_rx_queue(sk);
Per Liden's avatar
Per Liden committed

	/* Collect msg meta data, including error code and rejected data */
	tipc_sk_set_orig_addr(m, skb);
	rc = tipc_sk_anc_data_recv(m, skb, tsk);
	if (unlikely(rc))
Per Liden's avatar
Per Liden committed
		goto exit;
Per Liden's avatar
Per Liden committed

	/* Capture data if non-error msg, otherwise just set return value */
	if (likely(!err)) {
		int offset = skb_cb->bytes_read;

		copy = min_t(int, dlen - offset, buflen);
		rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
		if (unlikely(rc))
			goto exit;
		if (unlikely(offset + copy < dlen)) {
			if (flags & MSG_EOR) {
				if (!(flags & MSG_PEEK))
					skb_cb->bytes_read = offset + copy;
			} else {
				m->msg_flags |= MSG_TRUNC;
				skb_cb->bytes_read = 0;
			}
		} else {
			if (flags & MSG_EOR)
				m->msg_flags |= MSG_EOR;
			skb_cb->bytes_read = 0;
		}
Per Liden's avatar
Per Liden committed
	} else {
		if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control) {
			rc = -ECONNRESET;
	/* Mark message as group event if applicable */
	if (unlikely(grp_evt)) {
		if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
			m->msg_flags |= MSG_EOR;
		m->msg_flags |= MSG_OOB;
		copy = 0;
	}

	/* Caption of data or error code/rejected data was successful */
	if (unlikely(flags & MSG_PEEK))
		goto exit;

	/* Send group flow control advertisement when applicable */
	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
		__skb_queue_head_init(&xmitq);
		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
					  msg_orignode(hdr), msg_origport(hdr),
					  &xmitq);
		tipc_node_distr_xmit(sock_net(sk), &xmitq);
	}

	if (skb_cb->bytes_read)
		goto exit;

	tsk_advance_rx_queue(sk);
	if (likely(!connected))
	/* Send connection flow control advertisement when applicable */
	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
		tipc_sk_send_ack(tsk);
Per Liden's avatar
Per Liden committed
exit:
	release_sock(sk);
	return rc ? rc : copy;