Newer
Older
struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct list_head *cong_links = &tsk->cong_links;
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_member *first = NULL;
struct tipc_member *mbr = NULL;
struct net *net = sock_net(sk);
u32 node, port, exclude;
struct list_head dsts;
int lookups = 0;
int dstcnt, rc;
bool cong;
INIT_LIST_HEAD(&dsts);
ua->sa.type = msg_nametype(hdr);
ua->scope = msg_lookup_scope(hdr);
exclude = tipc_group_exclude(tsk->group);
first = NULL;
/* Look for a non-congested destination member, if any */
while (1) {
if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt,
exclude, false))
return -EHOSTUNREACH;
tipc_dest_pop(&dsts, &node, &port);
cong = tipc_group_cong(tsk->group, node, port, blks,
&mbr);
if (!cong)
break;
if (mbr == first)
break;
if (!first)
first = mbr;
}
/* Start over if destination was not in member list */
if (unlikely(!mbr))
continue;
if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
break;
/* Block or return if destination link or member is congested */
rc = tipc_wait_for_cond(sock, &timeout,
!tipc_dest_find(cong_links, node, 0) &&
tsk->group &&
!tipc_group_cong(tsk->group, node, port,
blks, &mbr));
if (unlikely(rc))
return rc;
/* Send, unless destination disappeared while waiting */
if (likely(mbr))
break;
}
if (unlikely(lookups >= 4))
return -EHOSTUNREACH;
rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
return rc ? rc : dlen;
}
/**
* tipc_send_group_bcast - send message to all members in communication group
* @m: message to send
* @dlen: total length of message data
* @timeout: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Return: the number of bytes sent on success, or errno
*/
static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
int dlen, long timeout)
{
struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
struct sock *sk = sock->sk;
struct net *net = sock_net(sk);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_nlist *dsts;
struct tipc_mc_method *method = &tsk->mc_method;
bool ack = method->mandatory && method->rcast;
int blks = tsk_blocks(MCAST_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
int mtu = tipc_bcast_get_mtu(net);
struct sk_buff_head pkts;
int rc = -EHOSTUNREACH;
/* Block or return if any destination link or member is congested */
rc = tipc_wait_for_cond(sock, &timeout,
!tsk->cong_link_cnt && tsk->group &&
!tipc_group_bc_cong(tsk->group, blks));
if (unlikely(rc))
return rc;
dsts = tipc_group_dests(tsk->group);
if (!dsts->local && !dsts->remote)
return -EHOSTUNREACH;
msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
msg_set_nameinst(hdr, ua->sa.instance);
} else {
msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
msg_set_nameinst(hdr, 0);
}
msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
/* Avoid getting stuck with repeated forced replicasts */
msg_set_grp_bc_ack_req(hdr, ack);
/* Build message as chain of buffers */
__skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
return rc;
/* Send message */
rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
if (unlikely(rc))
return rc;
/* Update broadcast sequence number and send windows */
tipc_group_update_bc_members(tsk->group, blks, ack);
/* Broadcast link is now free to choose method for next broadcast */
method->mandatory = false;
method->expires = jiffies;
/**
* tipc_send_group_mcast - send message to all members with given identity
* @sock: socket structure
* @m: message to send
* @dlen: total length of message data
* @timeout: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Return: the number of bytes sent on success, or errno
*/
static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
int dlen, long timeout)
{
struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
struct net *net = sock_net(sk);
struct list_head dsts;
u32 dstcnt, exclude;
ua->sa.type = msg_nametype(hdr);
ua->scope = msg_lookup_scope(hdr);
exclude = tipc_group_exclude(grp);
if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt, exclude, true))
return -EHOSTUNREACH;
if (dstcnt == 1) {
tipc_dest_pop(&dsts, &ua->sk.node, &ua->sk.ref);
return tipc_send_group_unicast(sock, m, dlen, timeout);
}
tipc_dest_list_purge(&dsts);
return tipc_send_group_bcast(sock, m, dlen, timeout);
}
/**
* tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
* @net: the associated network namespace
* @arrvq: queue with arriving messages, to be cloned after destination lookup
* @inputq: queue with cloned messages, delivered to socket after dest lookup
*
* Multi-threaded: parallel calls with reference to same queues may occur
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
struct sk_buff *skb, *_skb;
u32 portid, onode;
struct tipc_uaddr ua;
__skb_queue_head_init(&tmpq);
INIT_LIST_HEAD(&dports);
ua.addrtype = TIPC_SERVICE_RANGE;
/* tipc_skb_peek() increments the head skb's reference counter */
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
hdr = buf_msg(skb);
user = msg_user(hdr);
mtyp = msg_type(hdr);
hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
onode = msg_orignode(hdr);
ua.sr.type = msg_nametype(hdr);
ua.sr.lower = msg_namelower(hdr);
ua.sr.upper = msg_nameupper(hdr);
if (onode == self)
ua.scope = TIPC_ANY_SCOPE;
else
ua.scope = TIPC_CLUSTER_SCOPE;
if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
__skb_dequeue(arrvq);
__skb_queue_tail(inputq, skb);
}
spin_unlock_bh(&inputq->lock);
continue;
}
/* Group messages require exact scope match */
if (msg_in_group(hdr)) {
ua.sr.lower = 0;
ua.sr.upper = ~0;
ua.scope = msg_lookup_scope(hdr);
/* Create destination port list: */
tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);
/* Clone message per destination */
while (tipc_dest_pop(&dports, NULL, &portid)) {
_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
if (_skb) {
msg_set_destport(buf_msg(_skb), portid);
__skb_queue_tail(&tmpq, _skb);
continue;
}
pr_warn("Failed to clone mcast rcv buffer\n");
/* Append clones to inputq only if skb is still head of arrvq */
spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
skb_queue_splice_tail_init(&tmpq, inputq);
/* Decrement the skb's refcnt */
kfree_skb(__skb_dequeue(arrvq));
}
spin_unlock_bh(&inputq->lock);
__skb_queue_purge(&tmpq);
kfree_skb(skb);
tipc_sk_rcv(net, inputq);
}
/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
* when socket is in Nagle mode
*/
static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
{
struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
struct sk_buff *skb = skb_peek_tail(txq);
struct net *net = sock_net(&tsk->sk);
u32 dnode = tsk_peer_node(tsk);
int rc;
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
if (nagle_ack) {
tsk->pkt_cnt += skb_queue_len(txq);
if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
tsk->oneway = 0;
if (tsk->nagle_start < NAGLE_START_MAX)
tsk->nagle_start *= 2;
tsk->expect_ack = false;
pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
tsk->nagle_start);
} else {
tsk->nagle_start = NAGLE_START_INIT;
if (skb) {
msg_set_ack_required(buf_msg(skb));
tsk->expect_ack = true;
} else {
tsk->expect_ack = false;
}
}
tsk->msg_acc = 0;
tsk->pkt_cnt = 0;
}
if (!skb || tsk->cong_link_cnt)
return;
/* Do not send SYN again after congestion */
if (msg_is_syn(buf_msg(skb)))
if (tsk->msg_acc)
tsk->pkt_cnt += skb_queue_len(txq);
tsk->snt_unacked += tsk->snd_backlog;
tsk->snd_backlog = 0;
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (rc == -ELINKCONG)
tsk->cong_link_cnt = 1;
}
* tipc_sk_conn_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket
* @skb: pointer to message buffer.
* @inputq: buffer list containing the buffers
* @xmitq: output message area
static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
Parthasarathy Bhuvaragan
committed
struct sk_buff_head *inputq,
struct tipc_msg *hdr = buf_msg(skb);
u32 onode = tsk_own_node(tsk);
struct sock *sk = &tsk->sk;
int mtyp = msg_type(hdr);
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr)) {
trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
if (unlikely(msg_errcode(hdr))) {
tipc_set_sk_state(sk, TIPC_DISCONNECTING);
tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
tsk_peer_port(tsk));
sk->sk_state_change(sk);
Parthasarathy Bhuvaragan
committed
/* State change is ignored if socket already awake,
* - convert msg to abort msg and add to inqueue
*/
msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
msg_set_type(hdr, TIPC_CONN_MSG);
msg_set_size(hdr, BASIC_H_SIZE);
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
__skb_queue_tail(inputq, skb);
return;
}
tsk->probe_unacked = false;
if (mtyp == CONN_PROBE) {
msg_set_type(hdr, CONN_PROBE_REPLY);
if (tipc_msg_reverse(onode, &skb, TIPC_OK))
__skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
pr_warn("Received unknown CONN_PROTO msg\n");
* tipc_sendmsg - send message in connectionless manner
* @dsz: amount of user data to be sent
* Message must have an destination specified explicitly.
* Used for SOCK_RDM and SOCK_DGRAM messages,
* and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
* (Note: 'SYN+' is prohibited on SOCK_STREAM.)
* Return: the number of bytes sent on success, or errno otherwise
static int tipc_sendmsg(struct socket *sock,
struct msghdr *m, size_t dsz)
{
struct sock *sk = sock->sk;
int ret;
lock_sock(sk);
ret = __tipc_sendmsg(sock, m, dsz);
release_sock(sk);
return ret;
}
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
struct net *net = sock_net(sk);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
struct list_head *clinks = &tsk->cong_links;
bool syn = !tipc_sk_type_connectionless(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_socket_addr skaddr;
struct sk_buff_head pkts;
int atype, mtu, rc;
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
if (ua) {
if (!tipc_uaddr_valid(ua, m->msg_namelen))
/* If socket belongs to a communication group follow other paths */
return tipc_send_group_bcast(sock, m, dlen, timeout);
if (atype == TIPC_SERVICE_ADDR)
return tipc_send_group_anycast(sock, m, dlen, timeout);
if (atype == TIPC_SOCKET_ADDR)
return tipc_send_group_unicast(sock, m, dlen, timeout);
if (atype == TIPC_SERVICE_RANGE)
return tipc_send_group_mcast(sock, m, dlen, timeout);
if (!ua) {
ua = (struct tipc_uaddr *)&tsk->peer;
if (!syn && ua->family != AF_TIPC)
return -EDESTADDRREQ;
if (unlikely(syn)) {
if (sk->sk_state == TIPC_LISTEN)
if (sk->sk_state != TIPC_OPEN)
return -EISCONN;
if (tsk->published)
return -EOPNOTSUPP;
if (atype == TIPC_SERVICE_ADDR)
tsk->conn_addrtype = atype;
memset(&skaddr, 0, sizeof(skaddr));
/* Determine destination */
if (atype == TIPC_SERVICE_RANGE) {
return tipc_sendmcast(sock, ua, m, dlen, timeout);
} else if (atype == TIPC_SERVICE_ADDR) {
skaddr.node = ua->lookup_node;
ua->scope = tipc_node2scope(skaddr.node);
if (!tipc_nametbl_lookup_anycast(net, ua, &skaddr))
} else if (atype == TIPC_SOCKET_ADDR) {
skaddr = ua->sk;
} else {
return -EINVAL;
}
/* Block or return if destination link is congested */
rc = tipc_wait_for_cond(sock, &timeout,
!tipc_dest_find(clinks, skaddr.node, 0));
if (unlikely(rc))
return rc;
/* Finally build message header */
msg_set_destnode(hdr, skaddr.node);
msg_set_destport(hdr, skaddr.ref);
if (atype == TIPC_SERVICE_ADDR) {
msg_set_type(hdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(hdr, NAMED_H_SIZE);
msg_set_nametype(hdr, ua->sa.type);
msg_set_nameinst(hdr, ua->sa.instance);
msg_set_lookup_scope(hdr, ua->scope);
msg_set_type(hdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(hdr, 0);
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
}
/* Add message body */
__skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, skaddr.node, tsk->portid, true);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
__skb_queue_purge(&pkts);
trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
rc = tipc_node_xmit(net, &pkts, skaddr.node, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tipc_dest_push(clinks, skaddr.node, 0);
tsk->cong_link_cnt++;
rc = 0;
}
tipc_set_sk_state(sk, TIPC_CONNECTING);
if (dlen && timeout) {
timeout = msecs_to_jiffies(timeout);
tipc_wait_for_connect(sock, &timeout);
}
}
return rc ? rc : dlen;
* tipc_sendstream - send stream-oriented data
* @m: data to send
* @dsz: total length of data to be transmitted
* Used for SOCK_STREAM data.
* Return: the number of bytes sent on success (or partial success),
* or errno if no data sent
static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
{
struct sock *sk = sock->sk;
int ret;
lock_sock(sk);
ret = __tipc_sendstream(sock, m, dsz);
release_sock(sk);
return ret;
}
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
struct sk_buff_head *txq = &sk->sk_write_queue;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
u32 dnode = tsk_peer_node(tsk);
int maxnagle = tsk->maxnagle;
int maxpkt = tsk->max_pkt;
int send, sent = 0;
if (unlikely(dlen > INT_MAX))
return -EMSGSIZE;
/* Handle implicit connection setup */
if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
rc = __tipc_sendmsg(sock, m, dlen);
if (dlen && dlen == rc) {
tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
Parthasarathy Bhuvaragan
committed
rc = tipc_wait_for_cond(sock, &timeout,
(!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk) &&
tipc_sk_connected(sk)));
if (unlikely(rc))
break;
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
send <= maxnagle) {
rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
if (unlikely(rc < 0))
break;
blocks += rc;
if (blocks <= 64 && tsk->expect_ack) {
tsk->snd_backlog = blocks;
sent += send;
break;
} else if (blocks > 64) {
tsk->pkt_cnt += skb_queue_len(txq);
} else {
skb = skb_peek_tail(txq);
if (skb) {
msg_set_ack_required(buf_msg(skb));
tsk->expect_ack = true;
} else {
tsk->expect_ack = false;
}
tsk->msg_acc = 0;
tsk->pkt_cnt = 0;
}
} else {
rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
if (unlikely(rc != send))
break;
blocks += tsk_inc(tsk, send + MIN_H_SIZE);
}
trace_tipc_sk_sendstream(sk, skb_peek(txq),
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
if (likely(!rc)) {
tsk->snt_unacked += blocks;
tsk->snd_backlog = 0;
sent += send;
}
} while (sent < dlen && !rc);
Parthasarathy Bhuvaragan
committed
return sent ? sent : rc;
* tipc_send_packet - send a connection-oriented message
* @m: message to send
* @dsz: length of data to be transmitted
* Used for SOCK_SEQPACKET messages.
* Return: the number of bytes sent on success, or errno otherwise
static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE;
return tipc_sendstream(sock, m, dsz);
/* tipc_sk_finish_conn - complete the setup of a connection
static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
u32 peer_node)
struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk);
struct tipc_msg *msg = &tsk->phdr;
msg_set_destnode(msg, peer_node);
msg_set_destport(msg, peer_port);
msg_set_type(msg, TIPC_CONN_MSG);
msg_set_lookup_scope(msg, 0);
msg_set_hdr_sz(msg, SHORT_H_SIZE);
sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
tipc_set_sk_state(sk, TIPC_ESTABLISHED);
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
__skb_queue_purge(&sk->sk_write_queue);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
return;
/* Fall back to message based flow control */
tsk->rcv_win = FLOWCTL_MSG_WIN;
tsk->snd_win = FLOWCTL_MSG_WIN;
* tipc_sk_set_orig_addr - capture sender's address for received message
* Note: Address is not captured if not requested by receiver.
*/
static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
struct tipc_msg *hdr = buf_msg(skb);
if (!srcaddr)
return;
srcaddr->sock.family = AF_TIPC;
srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
srcaddr->sock.scope = 0;
srcaddr->sock.addr.id.ref = msg_origport(hdr);
srcaddr->sock.addr.id.node = msg_orignode(hdr);
srcaddr->sock.addr.name.domain = 0;
m->msg_namelen = sizeof(struct sockaddr_tipc);
if (!msg_in_group(hdr))
return;
/* Group message users may also want to know sending member's id */
srcaddr->member.family = AF_TIPC;
srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
srcaddr->member.scope = 0;
srcaddr->member.addr.name.name.type = msg_nametype(hdr);
srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
srcaddr->member.addr.name.domain = 0;
m->msg_namelen = sizeof(*srcaddr);
* tipc_sk_anc_data_recv - optionally capture ancillary data for received message
* @skb: received message buffer
* @tsk: TIPC port associated with message
* Note: Ancillary data is not captured if not requested by receiver.
* Return: 0 if successful, otherwise errno
static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
struct tipc_sock *tsk)
struct tipc_msg *hdr;
u32 data[3] = {0,};
bool has_addr;
int dlen, rc;
if (likely(m->msg_controllen == 0))
return 0;
hdr = buf_msg(skb);
dlen = msg_data_sz(hdr);
/* Capture errored message object, if any */
if (msg_errcode(hdr)) {
if (skb_linearize(skb))
return -ENOMEM;
hdr = buf_msg(skb);
data[0] = msg_errcode(hdr);
data[1] = dlen;
rc = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, data);
if (rc || !dlen)
return rc;
rc = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, dlen, msg_data(hdr));
if (rc)
return rc;
/* Capture TIPC_SERVICE_ADDR/RANGE destination address, if any */
switch (msg_type(hdr)) {
has_addr = true;
data[0] = msg_nametype(hdr);
data[1] = msg_namelower(hdr);
data[2] = data[1];
has_addr = true;
data[0] = msg_nametype(hdr);
data[1] = msg_namelower(hdr);
data[2] = msg_nameupper(hdr);
has_addr = !!tsk->conn_addrtype;
data[0] = msg_nametype(&tsk->phdr);
data[1] = msg_nameinst(&tsk->phdr);
data[2] = data[1];
if (!has_addr)
return 0;
return put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, data);
static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
struct sock *sk = &tsk->sk;
struct sk_buff *skb = NULL;
u32 peer_port = tsk_peer_port(tsk);
u32 dnode = tsk_peer_node(tsk);
if (!tipc_sk_connected(sk))
skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
dnode, tsk_own_node(tsk), peer_port,
tsk->portid, TIPC_OK);
if (!skb)
msg = buf_msg(skb);
msg_set_conn_ack(msg, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
/* Adjust to and advertize the correct window limit */
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
msg_set_adv_win(msg, tsk->rcv_win);
}
return skb;
}
static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct sk_buff *skb;
skb = tipc_sk_build_ack(tsk);
if (!skb)
return;
tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
msg_link_selector(buf_msg(skb)));
static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
DEFINE_WAIT_FUNC(wait, woken_wake_function);
long timeo = *timeop;
int err = sock_error(sk);
if (err)
return err;
if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
if (sk->sk_shutdown & RCV_SHUTDOWN) {
add_wait_queue(sk_sleep(sk), &wait);
timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
sched_annotate_sleep();
remove_wait_queue(sk_sleep(sk), &wait);
}
err = 0;
if (!skb_queue_empty(&sk->sk_receive_queue))
break;
err = -EAGAIN;
if (!timeo)
break;
err = sock_intr_errno(timeo);
if (signal_pending(current))
break;
err = sock_error(sk);
if (err)
break;
*timeop = timeo;
* tipc_recvmsg - receive packet-oriented message
* @buflen: length of user buffer area
* Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
* If the complete message doesn't fit in user area, truncate it.
*
* Return: size of returned message data, errno otherwise
static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
size_t buflen, int flags)
bool connected = !tipc_sk_type_connectionless(sk);
struct tipc_sock *tsk = tipc_sk(sk);
int rc, err, hlen, dlen, copy;
struct tipc_skb_cb *skb_cb;
struct sk_buff_head xmitq;
struct tipc_msg *hdr;
struct sk_buff *skb;
bool grp_evt;
/* Catch invalid receive requests */
if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
rc = -ENOTCONN;
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
/* Step rcv queue to first msg with data or error; wait if necessary */
do {
rc = tipc_wait_for_rcvmsg(sock, &timeout);
if (unlikely(rc))
goto exit;
skb = skb_peek(&sk->sk_receive_queue);
skb_cb = TIPC_SKB_CB(skb);
hdr = buf_msg(skb);
dlen = msg_data_sz(hdr);
hlen = msg_hdr_sz(hdr);
err = msg_errcode(hdr);
grp_evt = msg_is_grp_evt(hdr);
if (likely(dlen || err))
break;
/* Collect msg meta data, including error code and rejected data */
tipc_sk_set_orig_addr(m, skb);
rc = tipc_sk_anc_data_recv(m, skb, tsk);
hdr = buf_msg(skb);
/* Capture data if non-error msg, otherwise just set return value */
if (likely(!err)) {
int offset = skb_cb->bytes_read;
copy = min_t(int, dlen - offset, buflen);
rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
if (unlikely(rc))
goto exit;
if (unlikely(offset + copy < dlen)) {
if (flags & MSG_EOR) {
if (!(flags & MSG_PEEK))
skb_cb->bytes_read = offset + copy;
} else {
m->msg_flags |= MSG_TRUNC;
skb_cb->bytes_read = 0;
}
} else {
if (flags & MSG_EOR)
m->msg_flags |= MSG_EOR;
skb_cb->bytes_read = 0;
}
if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control) {
goto exit;
}
/* Mark message as group event if applicable */
if (unlikely(grp_evt)) {
if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
m->msg_flags |= MSG_EOR;
m->msg_flags |= MSG_OOB;
copy = 0;
}
/* Caption of data or error code/rejected data was successful */
if (unlikely(flags & MSG_PEEK))
goto exit;
/* Send group flow control advertisement when applicable */
if (tsk->group && msg_in_group(hdr) && !grp_evt) {
__skb_queue_head_init(&xmitq);
tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
msg_orignode(hdr), msg_origport(hdr),
&xmitq);
tipc_node_distr_xmit(sock_net(sk), &xmitq);
}
if (skb_cb->bytes_read)
goto exit;
tsk_advance_rx_queue(sk);
/* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);