Newer
Older
* net/tipc/socket.c: TIPC socket API
* Copyright (c) 2001-2007, 2012-2014, Ericsson AB
* Copyright (c) 2004-2008, 2010-2013, Wind River Systems
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the names of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
* Alternatively, this software may be distributed under the terms of the
* GNU General Public License ("GPL") version 2 as published by the Free
* Software Foundation.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
#include <linux/rhashtable.h>
#include <linux/jhash.h>
#include "name_table.h"
#include "link.h"
#define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */
#define TIPC_FWD_MSG 1
#define TIPC_CONN_OK 0
#define TIPC_CONN_PROBING 1
#define TIPC_MAX_PORT 0xffffffff
#define TIPC_MIN_PORT 1
/**
* struct tipc_sock - TIPC socket structure
* @sk: socket - interacts with 'port' and with user via the socket API
* @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established
* @conn_instance: TIPC instance used when connection was established
* @published: non-zero if port has one or more associated names
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports
* @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
* @probing_intv:
* @timer:
* @port: port - interacts with 'sk' and with the rest of the TIPC stack
* @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion
* @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer
* @node: hash table node
* @rcu: rcu struct for tipc_sock
*/
struct tipc_sock {
struct sock sk;
int connected;
u32 conn_type;
u32 conn_instance;
int published;
u32 max_pkt;
struct tipc_msg phdr;
struct list_head sock_list;
struct list_head publications;
u32 pub_count;
u32 probing_state;
unsigned long probing_intv;
struct timer_list timer;
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool link_cong;
uint sent_unacked;
uint rcv_unacked;
struct rhash_head node;
struct rcu_head rcu;
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
David S. Miller
committed
static void tipc_data_ready(struct sock *sk);
static void tipc_write_space(struct sock *sk);
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
static void tipc_sk_timeout(unsigned long data);
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
static struct tipc_sock *tipc_sk_lookup(u32 portid);
static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
static const struct proto_ops msg_ops;
static struct proto tipc_proto_kern;
static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
[TIPC_NLA_SOCK_UNSPEC] = { .type = NLA_UNSPEC },
[TIPC_NLA_SOCK_ADDR] = { .type = NLA_U32 },
[TIPC_NLA_SOCK_REF] = { .type = NLA_U32 },
[TIPC_NLA_SOCK_CON] = { .type = NLA_NESTED },
[TIPC_NLA_SOCK_HAS_PUBL] = { .type = NLA_FLAG }
};
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
* Revised TIPC socket locking policy:
*
* Most socket operations take the standard socket lock when they start
* and hold it until they finish (or until they need to sleep). Acquiring
* this lock grants the owner exclusive access to the fields of the socket
* data structures, with the exception of the backlog queue. A few socket
* operations can be done without taking the socket lock because they only
* read socket information that never changes during the life of the socket.
*
* Socket operations may acquire the lock for the associated TIPC port if they
* need to perform an operation on the port. If any routine needs to acquire
* both the socket lock and the port lock it must take the socket lock first
* to avoid the risk of deadlock.
*
* The dispatcher handling incoming messages cannot grab the socket lock in
* the standard fashion, since invoked it runs at the BH level and cannot block.
* Instead, it checks to see if the socket lock is currently owned by someone,
* and either handles the message itself or adds it to the socket's backlog
* queue; in the latter case the queued message is processed once the process
* owning the socket lock releases it.
*
* NOTE: Releasing the socket lock while an operation is sleeping overcomes
* the problem of a blocked socket operation preventing any other operations
* from occurring. However, applications must be careful if they have
* multiple threads trying to send (or receive) on the same socket, as these
* operations might interfere with each other. For example, doing a connect
* and a receive at the same time might allow the receive to consume the
* ACK message meant for the connect. While additional work could be done
* to try and overcome this, it doesn't seem to be worthwhile at the present.
*
* NOTE: Releasing the socket lock while an operation is sleeping also ensures
* that another operation that must be performed in a non-blocking manner is
* not delayed for very long because the lock has already been taken.
*
* NOTE: This code assumes that certain fields of a port/socket pair are
* constant over its lifetime; such fields can be examined without taking
* the socket lock and/or port lock, and do not need to be re-read even
* after resuming processing after waiting. These fields include:
* - socket type
* - pointer to socket sk structure (aka tipc_sock structure)
* - pointer to port structure
* - port reference
*/
/* Protects tipc socket hash table mutations */
static struct rhashtable tipc_sk_rht;
static u32 tsk_peer_node(struct tipc_sock *tsk)
return msg_destnode(&tsk->phdr);
static u32 tsk_peer_port(struct tipc_sock *tsk)
return msg_destport(&tsk->phdr);
static bool tsk_unreliable(struct tipc_sock *tsk)
return msg_src_droppable(&tsk->phdr) != 0;
static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
static bool tsk_unreturnable(struct tipc_sock *tsk)
return msg_dest_droppable(&tsk->phdr) != 0;
static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
static int tsk_importance(struct tipc_sock *tsk)
return msg_importance(&tsk->phdr);
static int tsk_set_importance(struct tipc_sock *tsk, int imp)
{
if (imp > TIPC_CRITICAL_IMPORTANCE)
return -EINVAL;
msg_set_importance(&tsk->phdr, (u32)imp);
static struct tipc_sock *tipc_sk(const struct sock *sk)
{
return container_of(sk, struct tipc_sock, sk);
}
static int tsk_conn_cong(struct tipc_sock *tsk)
{
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
}
* tsk_advance_rx_queue - discard first buffer in socket receive queue
*
* Caller must hold socket lock
static void tsk_advance_rx_queue(struct sock *sk)
kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
* tsk_rej_rx_queue - reject all buffers in socket receive queue
*
* Caller must hold socket lock
static void tsk_rej_rx_queue(struct sock *sk)
struct sk_buff *skb;
while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
/* tsk_peer_msg - verify if message was sent by connected port's peer
*
* Handles cases where the node's network address has changed from
* the default of <0.0.0> to its configured setting.
*/
static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
u32 peer_port = tsk_peer_port(tsk);
if (unlikely(!tsk->connected))
return false;
if (unlikely(msg_origport(msg) != peer_port))
return false;
orig_node = msg_orignode(msg);
peer_node = tsk_peer_node(tsk);
if (likely(orig_node == peer_node))
return true;
if (!orig_node && (peer_node == tipc_own_addr))
return true;
if (!peer_node && (orig_node == tipc_own_addr))
return true;
return false;
}
* tipc_sk_create - create a TIPC socket
* @net: network namespace (must be default network)
* @sock: pre-allocated socket structure
* @protocol: protocol indicator (must be 0)
* @kern: caused by kernel or by userspace?
* This routine creates additional data structures used by the TIPC socket,
* initializes them, and links them together.
static int tipc_sk_create(struct net *net, struct socket *sock,
int protocol, int kern)
const struct proto_ops *ops;
socket_state state;
struct tipc_sock *tsk;
struct tipc_msg *msg;
if (unlikely(protocol != 0))
return -EPROTONOSUPPORT;
switch (sock->type) {
case SOCK_STREAM:
ops = &stream_ops;
state = SS_UNCONNECTED;
ops = &packet_ops;
state = SS_UNCONNECTED;
ops = &msg_ops;
state = SS_READY;
default:
return -EPROTOTYPE;
/* Allocate socket's protocol area */
if (!kern)
sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
else
sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
INIT_LIST_HEAD(&tsk->publications);
msg = &tsk->phdr;
tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
NAMED_H_SIZE, 0);
/* Finish initializing socket data structures */
sock->ops = ops;
sock->state = state;
sock_init_data(sock, sk);
if (tipc_sk_insert(tsk)) {
pr_warn("Socket create failed; port numbrer exhausted\n");
return -EINVAL;
}
msg_set_origport(msg, tsk->portid);
setup_timer(&tsk->timer, tipc_sk_timeout, (unsigned long)tsk);
sk->sk_backlog_rcv = tipc_backlog_rcv;
sk->sk_rcvbuf = sysctl_tipc_rmem[1];
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
atomic_set(&tsk->dupl_rcvcnt, 0);
if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true);
tsk_set_unreliable(tsk, true);
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
/**
* tipc_sock_create_local - create TIPC socket from inside TIPC module
* @type: socket type - SOCK_RDM or SOCK_SEQPACKET
*
* We cannot use sock_creat_kern here because it bumps module user count.
* Since socket owner and creator is the same module we must make sure
* that module count remains zero for module local sockets, otherwise
* we cannot do rmmod.
*
* Returns 0 on success, errno otherwise
*/
int tipc_sock_create_local(int type, struct socket **res)
{
int rc;
rc = sock_create_lite(AF_TIPC, type, 0, res);
if (rc < 0) {
pr_err("Failed to create kernel socket\n");
return rc;
}
tipc_sk_create(&init_net, *res, 0, 1);
return 0;
}
/**
* tipc_sock_release_local - release socket created by tipc_sock_create_local
* @sock: the socket to be released.
*
* Module reference count is not incremented when such sockets are created,
* so we must keep it from being decremented when they are released.
*/
void tipc_sock_release_local(struct socket *sock)
{
tipc_release(sock);
sock->ops = NULL;
sock_release(sock);
}
/**
* tipc_sock_accept_local - accept a connection on a socket created
* with tipc_sock_create_local. Use this function to avoid that
* module reference count is inadvertently incremented.
*
* @sock: the accepting socket
* @newsock: reference to the new socket to be created
* @flags: socket flags
*/
int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
{
struct sock *sk = sock->sk;
int ret;
ret = sock_create_lite(sk->sk_family, sk->sk_type,
sk->sk_protocol, newsock);
if (ret < 0)
return ret;
ret = tipc_accept(sock, *newsock, flags);
if (ret < 0) {
sock_release(*newsock);
return ret;
}
(*newsock)->ops = sock->ops;
return ret;
}
static void tipc_sk_callback(struct rcu_head *head)
{
struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
sock_put(&tsk->sk);
}
* tipc_release - destroy a TIPC socket
* @sock: socket to destroy
*
* This routine cleans up any messages that are still queued on the socket.
* For DGRAM and RDM socket types, all queued messages are rejected.
* For SEQPACKET and STREAM socket types, the first message is rejected
* and any others are discarded. (If the first message on a STREAM socket
* is partially-read, it is discarded and the next one is rejected instead.)
* NOTE: Rejected messages are not necessarily returned to the sender! They
* are returned or discarded according to the "destination droppable" setting
* specified for the message by the sender.
*
* Returns 0 on success, errno otherwise
*/
static int tipc_release(struct socket *sock)
struct net *net = sock_net(sk);
struct tipc_sock *tsk;
struct sk_buff *skb;
u32 dnode, probing_state;
/*
* Exit if socket isn't fully initialized (occurs when a failed accept()
* releases a pre-allocated child socket that was never used)
*/
if (sk == NULL)
tsk = tipc_sk(sk);
lock_sock(sk);
/*
* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer)
*/
dnode = tsk_peer_node(tsk);
skb = __skb_dequeue(&sk->sk_receive_queue);
if (skb == NULL)
if (TIPC_SKB_CB(skb)->handle != NULL)
kfree_skb(skb);
else {
if ((sock->state == SS_CONNECTING) ||
(sock->state == SS_CONNECTED)) {
sock->state = SS_DISCONNECTING;
tipc_node_remove_conn(net, dnode, tsk->portid);
if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
tipc_link_xmit_skb(net, skb, dnode, 0);
tipc_sk_withdraw(tsk, 0, NULL);
probing_state = tsk->probing_state;
if (del_timer_sync(&tsk->timer) && probing_state != TIPC_CONN_PROBING)
sock_put(sk);
tipc_sk_remove(tsk);
if (tsk->connected) {
skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
SHORT_H_SIZE, 0, dnode, tipc_own_addr,
tsk->portid, TIPC_ERR_NO_PORT);
if (skb)
tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
tipc_node_remove_conn(net, dnode, tsk->portid);
/* Discard any remaining (connection-based) messages in receive queue */
__skb_queue_purge(&sk->sk_receive_queue);
/* Reject any messages that accumulated in backlog queue */
sock->state = SS_DISCONNECTING;
release_sock(sk);
call_rcu(&tsk->rcu, tipc_sk_callback);
* tipc_bind - associate or disassocate TIPC name(s) with a socket
* @sock: socket structure
* @uaddr: socket address describing name(s) and desired operation
* @uaddr_len: size of socket address data structure
* Name and name sequence binding is indicated using a positive scope value;
* a negative scope value unbinds the specified name. Specifying no name
* (i.e. a socket address length of 0) unbinds all names from the socket.
*
* NOTE: This routine doesn't need to take the socket lock since it doesn't
* access any non-constant socket information.
static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
int uaddr_len)
struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
struct tipc_sock *tsk = tipc_sk(sk);
lock_sock(sk);
if (unlikely(!uaddr_len)) {
res = tipc_sk_withdraw(tsk, 0, NULL);
if (uaddr_len < sizeof(struct sockaddr_tipc)) {
res = -EINVAL;
goto exit;
}
if (addr->family != AF_TIPC) {
res = -EAFNOSUPPORT;
goto exit;
}
if (addr->addrtype == TIPC_ADDR_NAME)
addr->addr.nameseq.upper = addr->addr.nameseq.lower;
else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
res = -EAFNOSUPPORT;
goto exit;
}
if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
(addr->addr.nameseq.type != TIPC_TOP_SRV) &&
(addr->addr.nameseq.type != TIPC_CFG_SRV)) {
res = -EACCES;
goto exit;
}
tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
exit:
release_sock(sk);
return res;
* tipc_getname - get port ID of socket or peer socket
* @sock: socket structure
* @uaddr: area for returned socket address
* @uaddr_len: area for returned length of socket address
* @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
* NOTE: This routine doesn't need to take the socket lock since it only
* accesses socket information that is unchanging (or which changes in
* a completely predictable manner).
static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
int *uaddr_len, int peer)
{
struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
struct tipc_sock *tsk = tipc_sk(sock->sk);
memset(addr, 0, sizeof(*addr));
if ((sock->state != SS_CONNECTED) &&
((peer != 2) || (sock->state != SS_DISCONNECTING)))
return -ENOTCONN;
addr->addr.id.ref = tsk_peer_port(tsk);
addr->addr.id.node = tsk_peer_node(tsk);
addr->addr.id.ref = tsk->portid;
*uaddr_len = sizeof(*addr);
addr->addrtype = TIPC_ADDR_ID;
addr->family = AF_TIPC;
addr->scope = 0;
addr->addr.name.domain = 0;
* tipc_poll - read and possibly block on pollmask
* @file: file structure associated with the socket
* @sock: socket for which to calculate the poll bits
* @wait: ???
*
* Returns pollmask value
*
* COMMENTARY:
* It appears that the usual socket locking mechanisms are not useful here
* since the pollmask info is potentially out-of-date the moment this routine
* exits. TCP and other protocols seem to rely on higher level poll routines
* to handle any preventable race conditions, so TIPC will do the same ...
*
* TIPC sets the returned events as follows:
*
* socket state flags set
* ------------ ---------
* unconnected no read flags
* POLLOUT if port is not congested
*
* connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue
* no write flags
*
* connected POLLIN/POLLRDNORM if data in rx queue
* POLLOUT if port is not congested
*
* disconnecting POLLIN/POLLRDNORM/POLLHUP
* no write flags
*
* listening POLLIN if SYN in rx queue
* no write flags
*
* ready POLLIN/POLLRDNORM if data in rx queue
* [connectionless] POLLOUT (since port cannot be congested)
*
* IMPORTANT: The fact that a read or write operation is indicated does NOT
* imply that the operation will succeed, merely that it should be performed
* and will not block.
static unsigned int tipc_poll(struct file *file, struct socket *sock,
poll_table *wait)
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
u32 mask = 0;
sock_poll_wait(file, sk_sleep(sk), wait);
switch ((int)sock->state) {
case SS_UNCONNECTED:
mask |= POLLOUT;
break;
case SS_READY:
case SS_CONNECTED:
if (!tsk->link_cong && !tsk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case SS_CONNECTING:
case SS_LISTENING:
if (!skb_queue_empty(&sk->sk_receive_queue))
mask |= (POLLIN | POLLRDNORM);
break;
case SS_DISCONNECTING:
mask = (POLLIN | POLLRDNORM | POLLHUP);
break;
}
/**
* tipc_sendmcast - send multicast message
* @sock: socket structure
* @seq: destination address
* @dsz: total length of message data
* @timeo: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct msghdr *msg, size_t dsz, long timeo)
{
struct sock *sk = sock->sk;
struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
struct sk_buff_head head;
uint mtu;
int rc;
msg_set_type(mhdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(mhdr, 0);
msg_set_destnode(mhdr, 0);
msg_set_nametype(mhdr, seq->type);
msg_set_namelower(mhdr, seq->lower);
msg_set_nameupper(mhdr, seq->upper);
msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
new_mtu:
mtu = tipc_bclink_get_mtu();
__skb_queue_head_init(&head);
rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
if (unlikely(rc < 0))
return rc;
do {
rc = tipc_bclink_xmit(net, &head);
if (likely(rc >= 0)) {
rc = dsz;
break;
}
if (rc == -EMSGSIZE)
goto new_mtu;
if (rc != -ELINKCONG)
break;
tipc_sk(sk)->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (rc)
__skb_queue_purge(&head);
} while (!rc);
return rc;
}
/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
*/
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_port_list dports = {0, NULL, };
struct tipc_port_list *item;
struct sk_buff *b;
uint i, last, dst = 0;
u32 scope = TIPC_CLUSTER_SCOPE;
if (in_own_node(msg_orignode(msg)))
scope = TIPC_NODE_SCOPE;
/* Create destination port list: */
tipc_nametbl_mc_translate(msg_nametype(msg),
msg_namelower(msg),
msg_nameupper(msg),
scope,
&dports);
last = dports.count;
if (!last) {
kfree_skb(buf);
return;
}
for (item = &dports; item; item = item->next) {
for (i = 0; i < PLSIZE && ++dst <= last; i++) {
b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
if (!b) {
pr_warn("Failed do clone mcast rcv buffer\n");
continue;
}
msg_set_destport(msg, item->ports[i]);
}
}
tipc_port_list_free(&dports);
}
/**
* tipc_sk_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket
* @dnode: node to send response message to, if any
* @buf: buffer containing protocol message
* Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
* (CONN_PROBE_REPLY) message should be forwarded.
*/
static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
/* Ignore if connection cannot be validated: */
tsk->probing_state = TIPC_CONN_OK;
if (msg_type(msg) == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
tsk->sent_unacked -= msg_msgcnt(msg);
if (conn_cong)
tsk->sk.sk_write_space(&tsk->sk);
} else if (msg_type(msg) == CONN_PROBE) {
if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
return TIPC_OK;
msg_set_type(msg, CONN_PROBE_REPLY);
return TIPC_FWD_MSG;
}
/* Do nothing if msg_type() == CONN_PROBE_REPLY */
exit:
kfree_skb(buf);
return TIPC_OK;
}
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
DEFINE_WAIT(wait);
int done;
do {
int err = sock_error(sk);
if (err)
return err;
if (sock->state == SS_DISCONNECTING)
return -EPIPE;
if (!*timeo_p)
return -EAGAIN;
if (signal_pending(current))
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
* tipc_sendmsg - send message in connectionless manner
* @iocb: if NULL, indicates that socket lock is already held
* @dsz: amount of user data to be sent
* Message must have an destination specified explicitly.
* Used for SOCK_RDM and SOCK_DGRAM messages,
* and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
* (Note: 'SYN+' is prohibited on SOCK_STREAM.)
* Returns the number of bytes sent on success, or errno otherwise
*/
static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t dsz)
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tsk->phdr;
u32 dnode, dport;
struct sk_buff_head head;
struct sk_buff *skb;
struct tipc_name_seq *seq = &dest->addr.nameseq;
u32 mtu;
if (unlikely((m->msg_namelen < sizeof(*dest)) ||
(dest->family != AF_TIPC)))
if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE;
if (iocb)
lock_sock(sk);
if (unlikely(sock->state != SS_READY)) {
if (sock->state == SS_LISTENING) {
rc = -EPIPE;
goto exit;
}
if (sock->state != SS_UNCONNECTED) {
rc = -EISCONN;
if (tsk->published) {
rc = -EOPNOTSUPP;
Allan Stephens
committed
if (dest->addrtype == TIPC_ADDR_NAME) {
tsk->conn_type = dest->addr.name.name.type;
tsk->conn_instance = dest->addr.name.name.instance;
Allan Stephens
committed
}
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
if (dest->addrtype == TIPC_ADDR_MCAST) {
rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
goto exit;
} else if (dest->addrtype == TIPC_ADDR_NAME) {
u32 type = dest->addr.name.name.type;
u32 inst = dest->addr.name.name.instance;
u32 domain = dest->addr.name.domain;
dnode = domain;
msg_set_type(mhdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
msg_set_nametype(mhdr, type);
msg_set_nameinst(mhdr, inst);
msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
dport = tipc_nametbl_translate(type, inst, &dnode);
msg_set_destnode(mhdr, dnode);
msg_set_destport(mhdr, dport);
if (unlikely(!dport && !dnode)) {
rc = -EHOSTUNREACH;
goto exit;
} else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node;
msg_set_type(mhdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(mhdr, 0);
msg_set_destnode(mhdr, dnode);
msg_set_destport(mhdr, dest->addr.id.ref);
msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
}
new_mtu:
mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
__skb_queue_head_init(&head);
rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
if (rc < 0)
goto exit;
do {
skb = skb_peek(&head);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, &head, dnode, tsk->portid);
if (likely(rc >= 0)) {
if (sock->state != SS_READY)
rc = dsz;
if (rc == -EMSGSIZE)
goto new_mtu;
if (rc != -ELINKCONG)
tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
__skb_queue_purge(&head);
} while (!rc);
exit:
if (iocb)
release_sock(sk);
return rc;
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
struct sock *sk = sock->sk;