diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index eadba62afa85..9dc239dfe192 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -35,742 +35,301 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include #include "socket.h" #include "msg.h" #include "bcast.h" #include "name_distr.h" -#include "core.h" +#include "link.h" +#include "node.h" -#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ const char tipc_bclink_name[] = "broadcast-link"; -static void tipc_nmap_diff(struct tipc_node_map *nm_a, - struct tipc_node_map *nm_b, - struct tipc_node_map *nm_diff); -static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); -static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); - -static void tipc_bclink_lock(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - spin_lock_bh(&tn->bclink->lock); -} - -static void tipc_bclink_unlock(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - spin_unlock_bh(&tn->bclink->lock); -} - -void tipc_bclink_input(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); -} - -uint tipc_bclink_get_mtu(void) -{ - return MAX_PKT_DEFAULT_MCAST; -} - -static u32 bcbuf_acks(struct sk_buff *buf) -{ - return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; -} - -static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) -{ - TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; -} - -static void bcbuf_decr_acks(struct sk_buff *buf) -{ - bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); -} - -void tipc_bclink_add_node(struct net *net, u32 addr) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_bclink_lock(net); - tipc_nmap_add(&tn->bclink->bcast_nodes, addr); - tipc_bclink_unlock(net); -} - -void tipc_bclink_remove_node(struct net *net, u32 addr) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - tipc_bclink_lock(net); - tipc_nmap_remove(&tn->bclink->bcast_nodes, addr); - - /* Last node? => reset backlog queue */ - if (!tn->bclink->bcast_nodes.count) - tipc_link_purge_backlog(&tn->bclink->link); - - tipc_bclink_unlock(net); -} - -static void bclink_set_last_sent(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - - bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1); -} - -u32 tipc_bclink_get_last_sent(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - - return tn->bcl->silent_intv_cnt; -} - -static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) -{ - node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? - seqno : node->bclink.last_sent; -} - /** - * tipc_bclink_retransmit_to - get most recent node to request retransmission - * - * Called with bclink_lock locked + * struct tipc_bc_base - base structure for keeping broadcast send state + * @link: broadcast send link structure + * @inputq: data input queue; will only carry SOCK_WAKEUP messages + * @dest: array keeping number of reachable destinations per bearer + * @primary_bearer: a bearer having links to all broadcast destinations, if any */ -struct tipc_node *tipc_bclink_retransmit_to(struct net *net) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); +struct tipc_bc_base { + struct tipc_link *link; + struct sk_buff_head inputq; + int dests[MAX_BEARERS]; + int primary_bearer; +}; - return tn->bclink->retransmit_to; +static struct tipc_bc_base *tipc_bc_base(struct net *net) +{ + return tipc_net(net)->bcbase; } -/** - * bclink_retransmit_pkt - retransmit broadcast packets - * @after: sequence number of last packet to *not* retransmit - * @to: sequence number of last packet to retransmit - * - * Called with bclink_lock locked - */ -static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) +int tipc_bcast_get_mtu(struct net *net) { - struct sk_buff *skb; - struct tipc_link *bcl = tn->bcl; - - skb_queue_walk(&bcl->transmq, skb) { - if (more(buf_seqno(skb), after)) { - tipc_link_retransmit(bcl, skb, mod(to - after)); - break; - } - } + return tipc_link_mtu(tipc_bc_sndlink(net)); } -/** - * bclink_prepare_wakeup - prepare users for wakeup after congestion - * @bcl: broadcast link - * @resultq: queue for users which can be woken up - * Move a number of waiting users, as permitted by available space in - * the send queue, from link wait queue to specified queue for wakeup +/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, + * if any, and make it primary bearer */ -static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq) +static void tipc_bcbase_select_primary(struct net *net) { - int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; - int imp, lim; - struct sk_buff *skb, *tmp; + struct tipc_bc_base *bb = tipc_bc_base(net); + int all_dests = tipc_link_bc_peers(bb->link); + int i, mtu; - skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) { - imp = TIPC_SKB_CB(skb)->chain_imp; - lim = bcl->window + bcl->backlog[imp].limit; - pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; - if ((pnd[imp] + bcl->backlog[imp].len) >= lim) + bb->primary_bearer = INVALID_BEARER_ID; + + if (!all_dests) + return; + + for (i = 0; i < MAX_BEARERS; i++) { + if (!bb->dests[i]) continue; - skb_unlink(skb, &bcl->wakeupq); - skb_queue_tail(resultq, skb); + + mtu = tipc_bearer_mtu(net, i); + if (mtu < tipc_link_mtu(bb->link)) + tipc_link_set_mtu(bb->link, mtu); + + if (bb->dests[i] < all_dests) + continue; + + bb->primary_bearer = i; + + /* Reduce risk that all nodes select same primary */ + if ((i ^ tipc_own_addr(net)) & 1) + break; } } -/** - * tipc_bclink_wakeup_users - wake up pending users - * - * Called with no locks taken - */ -void tipc_bclink_wakeup_users(struct net *net) +void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct sk_buff_head resultq; + struct tipc_bc_base *bb = tipc_bc_base(net); - skb_queue_head_init(&resultq); - bclink_prepare_wakeup(bcl, &resultq); - tipc_sk_rcv(net, &resultq); + tipc_bcast_lock(net); + bb->dests[bearer_id]++; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); } -/** - * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets - * @n_ptr: node that sent acknowledgement info - * @acked: broadcast sequence # that has been acknowledged - * - * Node is locked, bclink_lock unlocked. - */ -void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) +void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) { - struct sk_buff *skb, *tmp; - unsigned int released = 0; - struct net *net = n_ptr->net; - struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_bc_base *bb = tipc_bc_base(net); - if (unlikely(!n_ptr->bclink.recv_permitted)) + tipc_bcast_lock(net); + bb->dests[bearer_id]--; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} + +/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers + * + * Note that number of reachable destinations, as indicated in the dests[] + * array, may transitionally differ from the number of destinations indicated + * in each sent buffer. We can sustain this. Excess destination nodes will + * drop and never acknowledge the unexpected packets, and missing destinations + * will either require retransmission (if they are just about to be added to + * the bearer), or be removed from the buffer's 'ackers' counter (if they + * just went down) + */ +static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) +{ + int bearer_id; + struct tipc_bc_base *bb = tipc_bc_base(net); + struct sk_buff *skb, *_skb; + struct sk_buff_head _xmitq; + + if (skb_queue_empty(xmitq)) return; - tipc_bclink_lock(net); - - /* Bail out if tx queue is empty (no clean up is required) */ - skb = skb_peek(&tn->bcl->transmq); - if (!skb) - goto exit; - - /* Determine which messages need to be acknowledged */ - if (acked == INVALID_LINK_SEQ) { - /* - * Contact with specified node has been lost, so need to - * acknowledge sent messages only (if other nodes still exist) - * or both sent and unsent messages (otherwise) - */ - if (tn->bclink->bcast_nodes.count) - acked = tn->bcl->silent_intv_cnt; - else - acked = tn->bcl->snd_nxt; - } else { - /* - * Bail out if specified sequence number does not correspond - * to a message that has been sent and not yet acknowledged - */ - if (less(acked, buf_seqno(skb)) || - less(tn->bcl->silent_intv_cnt, acked) || - less_eq(acked, n_ptr->bclink.acked)) - goto exit; + /* The typical case: at least one bearer has links to all nodes */ + bearer_id = bb->primary_bearer; + if (bearer_id >= 0) { + tipc_bearer_bc_xmit(net, bearer_id, xmitq); + return; } - /* Skip over packets that node has previously acknowledged */ - skb_queue_walk(&tn->bcl->transmq, skb) { - if (more(buf_seqno(skb), n_ptr->bclink.acked)) - break; - } + /* We have to transmit across all bearers */ + skb_queue_head_init(&_xmitq); + for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { + if (!bb->dests[bearer_id]) + continue; - /* Update packets that node is now acknowledging */ - skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { - if (more(buf_seqno(skb), acked)) - break; - bcbuf_decr_acks(skb); - bclink_set_last_sent(net); - if (bcbuf_acks(skb) == 0) { - __skb_unlink(skb, &tn->bcl->transmq); - kfree_skb(skb); - released = 1; + skb_queue_walk(xmitq, skb) { + _skb = pskb_copy_for_clone(skb, GFP_ATOMIC); + if (!_skb) + break; + __skb_queue_tail(&_xmitq, _skb); } + tipc_bearer_bc_xmit(net, bearer_id, &_xmitq); } - n_ptr->bclink.acked = acked; - - /* Try resolving broadcast link congestion, if necessary */ - if (unlikely(skb_peek(&tn->bcl->backlogq))) { - tipc_link_push_packets(tn->bcl); - bclink_set_last_sent(net); - } - if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq))) - n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; -exit: - tipc_bclink_unlock(net); + __skb_queue_purge(xmitq); + __skb_queue_purge(&_xmitq); } -/** - * tipc_bclink_update_link_state - update broadcast link state - * - * RCU and node lock set - */ -void tipc_bclink_update_link_state(struct tipc_node *n_ptr, - u32 last_sent) -{ - struct sk_buff *buf; - struct net *net = n_ptr->net; - struct tipc_net *tn = net_generic(net, tipc_net_id); - - /* Ignore "stale" link state info */ - if (less_eq(last_sent, n_ptr->bclink.last_in)) - return; - - /* Update link synchronization state; quit if in sync */ - bclink_update_last_sent(n_ptr, last_sent); - - if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) - return; - - /* Update out-of-sync state; quit if loss is still unconfirmed */ - if ((++n_ptr->bclink.oos_state) == 1) { - if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) - return; - n_ptr->bclink.oos_state++; - } - - /* Don't NACK if one has been recently sent (or seen) */ - if (n_ptr->bclink.oos_state & 0x1) - return; - - /* Send NACK */ - buf = tipc_buf_acquire(INT_H_SIZE); - if (buf) { - struct tipc_msg *msg = buf_msg(buf); - struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq); - u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; - - tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, - INT_H_SIZE, n_ptr->addr); - msg_set_non_seq(msg, 1); - msg_set_mc_netid(msg, tn->net_id); - msg_set_bcast_ack(msg, n_ptr->bclink.last_in); - msg_set_bcgap_after(msg, n_ptr->bclink.last_in); - msg_set_bcgap_to(msg, to); - - tipc_bclink_lock(net); - tipc_bearer_send(net, MAX_BEARERS, buf, NULL); - tn->bcl->stats.sent_nacks++; - tipc_bclink_unlock(net); - kfree_skb(buf); - - n_ptr->bclink.oos_state++; - } -} - -void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) -{ - u16 last = msg_last_bcast(hdr); - int mtyp = msg_type(hdr); - - if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) - return; - if (mtyp == STATE_MSG) { - tipc_bclink_update_link_state(n, last); - return; - } - /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, - * and transfer synch info in LINK_PROTOCOL messages. - */ - if (tipc_node_is_up(n)) - return; - if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG)) - return; - n->bclink.last_sent = last; - n->bclink.last_in = last; - n->bclink.oos_state = 0; -} - -/** - * bclink_peek_nack - monitor retransmission requests sent by other nodes - * - * Delay any upcoming NACK by this node if another node has already - * requested the first message this node is going to ask for. - */ -static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) -{ - struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg)); - - if (unlikely(!n_ptr)) - return; - - tipc_node_lock(n_ptr); - if (n_ptr->bclink.recv_permitted && - (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && - (n_ptr->bclink.last_in == msg_bcgap_after(msg))) - n_ptr->bclink.oos_state = 2; - tipc_node_unlock(n_ptr); - tipc_node_put(n_ptr); -} - -/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster +/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster * and to identified node local sockets * @net: the applicable net namespace * @list: chain of buffers containing message * Consumes the buffer chain, except when returning -ELINKCONG * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ -int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct tipc_bclink *bclink = tn->bclink; + struct tipc_link *l = tipc_bc_sndlink(net); + struct sk_buff_head xmitq, inputq, rcvq; int rc = 0; - int bc = 0; - struct sk_buff *skb; - struct sk_buff_head arrvq; - struct sk_buff_head inputq; - /* Prepare clone of message for local node */ - skb = tipc_msg_reassemble(list); - if (unlikely(!skb)) + __skb_queue_head_init(&rcvq); + __skb_queue_head_init(&xmitq); + skb_queue_head_init(&inputq); + + /* Prepare message clone for local node */ + if (unlikely(!tipc_msg_reassemble(list, &rcvq))) return -EHOSTUNREACH; - /* Broadcast to all nodes */ - if (likely(bclink)) { - tipc_bclink_lock(net); - if (likely(bclink->bcast_nodes.count)) { - rc = __tipc_link_xmit(net, bcl, list); - if (likely(!rc)) { - u32 len = skb_queue_len(&bcl->transmq); - - bclink_set_last_sent(net); - bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += len; - } - bc = 1; - } - tipc_bclink_unlock(net); - } - - if (unlikely(!bc)) - __skb_queue_purge(list); + tipc_bcast_lock(net); + if (tipc_link_bc_peers(l)) + rc = tipc_link_xmit(l, list, &xmitq); + tipc_bcast_unlock(net); + /* Don't send to local node if adding to link failed */ if (unlikely(rc)) { - kfree_skb(skb); + __skb_queue_purge(&rcvq); return rc; } - /* Deliver message clone */ - __skb_queue_head_init(&arrvq); - skb_queue_head_init(&inputq); - __skb_queue_tail(&arrvq, skb); - tipc_sk_mcast_rcv(net, &arrvq, &inputq); - return rc; -} - -/** - * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet - * - * Called with both sending node's lock and bclink_lock taken. - */ -static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) -{ - struct tipc_net *tn = net_generic(node->net, tipc_net_id); - - bclink_update_last_sent(node, seqno); - node->bclink.last_in = seqno; - node->bclink.oos_state = 0; - tn->bcl->stats.recv_info++; - - /* - * Unicast an ACK periodically, ensuring that - * all nodes in the cluster don't ACK at the same time - */ - if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { - tipc_link_proto_xmit(node_active_link(node, node->addr), - STATE_MSG, 0, 0, 0, 0); - tn->bcl->stats.sent_acks++; - } -} - -/** - * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards - * - * RCU is locked, no other locks set - */ -void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) -{ - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct tipc_msg *msg = buf_msg(buf); - struct tipc_node *node; - u32 next_in; - u32 seqno; - int deferred = 0; - int pos = 0; - struct sk_buff *iskb; - struct sk_buff_head *arrvq, *inputq; - - /* Screen out unwanted broadcast messages */ - if (msg_mc_netid(msg) != tn->net_id) - goto exit; - - node = tipc_node_find(net, msg_prevnode(msg)); - if (unlikely(!node)) - goto exit; - - tipc_node_lock(node); - if (unlikely(!node->bclink.recv_permitted)) - goto unlock; - - /* Handle broadcast protocol message */ - if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { - if (msg_type(msg) != STATE_MSG) - goto unlock; - if (msg_destnode(msg) == tn->own_addr) { - tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); - tipc_bclink_lock(net); - bcl->stats.recv_nacks++; - tn->bclink->retransmit_to = node; - bclink_retransmit_pkt(tn, msg_bcgap_after(msg), - msg_bcgap_to(msg)); - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else { - tipc_node_unlock(node); - bclink_peek_nack(net, msg); - } - tipc_node_put(node); - goto exit; - } - - /* Handle in-sequence broadcast message */ - seqno = msg_seqno(msg); - next_in = mod(node->bclink.last_in + 1); - arrvq = &tn->bclink->arrvq; - inputq = &tn->bclink->inputq; - - if (likely(seqno == next_in)) { -receive: - /* Deliver message to destination */ - if (likely(msg_isdata(msg))) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - spin_lock_bh(&inputq->lock); - __skb_queue_tail(arrvq, buf); - spin_unlock_bh(&inputq->lock); - node->action_flags |= TIPC_BCAST_MSG_EVT; - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else if (msg_user(msg) == MSG_BUNDLER) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - bcl->stats.recv_bundles++; - bcl->stats.recv_bundled += msg_msgcnt(msg); - pos = 0; - while (tipc_msg_extract(buf, &iskb, &pos)) { - spin_lock_bh(&inputq->lock); - __skb_queue_tail(arrvq, iskb); - spin_unlock_bh(&inputq->lock); - } - node->action_flags |= TIPC_BCAST_MSG_EVT; - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else if (msg_user(msg) == MSG_FRAGMENTER) { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - tipc_buf_append(&node->bclink.reasm_buf, &buf); - if (unlikely(!buf && !node->bclink.reasm_buf)) { - tipc_bclink_unlock(net); - goto unlock; - } - bcl->stats.recv_fragments++; - if (buf) { - bcl->stats.recv_fragmented++; - msg = buf_msg(buf); - tipc_bclink_unlock(net); - goto receive; - } - tipc_bclink_unlock(net); - tipc_node_unlock(node); - } else { - tipc_bclink_lock(net); - bclink_accept_pkt(node, seqno); - tipc_bclink_unlock(net); - tipc_node_unlock(node); - kfree_skb(buf); - } - buf = NULL; - - /* Determine new synchronization state */ - tipc_node_lock(node); - if (unlikely(!tipc_node_is_up(node))) - goto unlock; - - if (node->bclink.last_in == node->bclink.last_sent) - goto unlock; - - if (skb_queue_empty(&node->bclink.deferdq)) { - node->bclink.oos_state = 1; - goto unlock; - } - - msg = buf_msg(skb_peek(&node->bclink.deferdq)); - seqno = msg_seqno(msg); - next_in = mod(next_in + 1); - if (seqno != next_in) - goto unlock; - - /* Take in-sequence message from deferred queue & deliver it */ - buf = __skb_dequeue(&node->bclink.deferdq); - goto receive; - } - - /* Handle out-of-sequence broadcast message */ - if (less(next_in, seqno)) { - deferred = tipc_link_defer_pkt(&node->bclink.deferdq, - buf); - bclink_update_last_sent(node, seqno); - buf = NULL; - } - - tipc_bclink_lock(net); - - if (deferred) - bcl->stats.deferred_recv++; - else - bcl->stats.duplicates++; - - tipc_bclink_unlock(net); - -unlock: - tipc_node_unlock(node); - tipc_node_put(node); -exit: - kfree_skb(buf); -} - -u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) -{ - return (n_ptr->bclink.recv_permitted && - (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked)); -} - - -/** - * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer - * - * Send packet over as many bearers as necessary to reach all nodes - * that have joined the broadcast link. - * - * Returns 0 (packet sent successfully) under all circumstances, - * since the broadcast link's pseudo-bearer never blocks - */ -static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, - struct tipc_bearer *unused1, - struct tipc_media_addr *unused2) -{ - int bp_index; - struct tipc_msg *msg = buf_msg(buf); - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bcbearer *bcbearer = tn->bcbearer; - struct tipc_bclink *bclink = tn->bclink; - - /* Prepare broadcast link message for reliable transmission, - * if first time trying to send it; - * preparation is skipped for broadcast link protocol messages - * since they are sent in an unreliable manner and don't need it - */ - if (likely(!msg_non_seq(buf_msg(buf)))) { - bcbuf_set_acks(buf, bclink->bcast_nodes.count); - msg_set_non_seq(msg, 1); - msg_set_mc_netid(msg, tn->net_id); - tn->bcl->stats.sent_info++; - if (WARN_ON(!bclink->bcast_nodes.count)) { - dump_stack(); - return 0; - } - } - - /* Send buffer over bearers until all targets reached */ - bcbearer->remains = bclink->bcast_nodes; - - for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { - struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; - struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; - struct tipc_bearer *bp[2] = {p, s}; - struct tipc_bearer *b = bp[msg_link_selector(msg)]; - struct sk_buff *tbuf; - - if (!p) - break; /* No more bearers to try */ - if (!b) - b = p; - tipc_nmap_diff(&bcbearer->remains, &b->nodes, - &bcbearer->remains_new); - if (bcbearer->remains_new.count == bcbearer->remains.count) - continue; /* Nothing added by bearer pair */ - - if (bp_index == 0) { - /* Use original buffer for first bearer */ - tipc_bearer_send(net, b->identity, buf, &b->bcast_addr); - } else { - /* Avoid concurrent buffer access */ - tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); - if (!tbuf) - break; - tipc_bearer_send(net, b->identity, tbuf, - &b->bcast_addr); - kfree_skb(tbuf); /* Bearer keeps a clone */ - } - if (bcbearer->remains_new.count == 0) - break; /* All targets reached */ - - bcbearer->remains = bcbearer->remains_new; - } + /* Broadcast to all nodes, inluding local node */ + tipc_bcbase_xmit(net, &xmitq); + tipc_sk_mcast_rcv(net, &rcvq, &inputq); + __skb_queue_purge(list); return 0; } -/** - * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer +/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link + * + * RCU is locked, no other locks set */ -void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, - u32 node, bool action) +int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bcbearer *bcbearer = tn->bcbearer; - struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; - struct tipc_bcbearer_pair *bp_curr; - struct tipc_bearer *b; - int b_index; - int pri; + struct tipc_msg *hdr = buf_msg(skb); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; + int rc; - tipc_bclink_lock(net); + __skb_queue_head_init(&xmitq); - if (action) - tipc_nmap_add(nm_ptr, node); + if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { + kfree_skb(skb); + return 0; + } + + tipc_bcast_lock(net); + if (msg_user(hdr) == BCAST_PROTOCOL) + rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); else - tipc_nmap_remove(nm_ptr, node); + rc = tipc_link_rcv(l, skb, NULL); + tipc_bcast_unlock(net); - /* Group bearers by priority (can assume max of two per priority) */ - memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); + tipc_bcbase_xmit(net, &xmitq); - rcu_read_lock(); - for (b_index = 0; b_index < MAX_BEARERS; b_index++) { - b = rcu_dereference_rtnl(tn->bearer_list[b_index]); - if (!b || !b->nodes.count) - continue; + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); - if (!bp_temp[b->priority].primary) - bp_temp[b->priority].primary = b; - else - bp_temp[b->priority].secondary = b; + return rc; +} + +/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge + * + * RCU is locked, no other locks set + */ +void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked) +{ + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + tipc_bcast_lock(net); + tipc_link_bc_ack_rcv(l, acked, &xmitq); + tipc_bcast_unlock(net); + + tipc_bcbase_xmit(net, &xmitq); + + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); +} + +/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state + * + * RCU is locked, no other locks set + */ +void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, + struct tipc_msg *hdr) +{ + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + tipc_bcast_lock(net); + if (msg_type(hdr) == STATE_MSG) { + tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); + tipc_link_bc_sync_rcv(l, hdr, &xmitq); + } else { + tipc_link_bc_init_rcv(l, hdr); } - rcu_read_unlock(); + tipc_bcast_unlock(net); - /* Create array of bearer pairs for broadcasting */ - bp_curr = bcbearer->bpairs; - memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); + tipc_bcbase_xmit(net, &xmitq); - for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); +} - if (!bp_temp[pri].primary) - continue; +/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer + * + * RCU is locked, node lock is set + */ +void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, + struct sk_buff_head *xmitq) +{ + struct tipc_link *snd_l = tipc_bc_sndlink(net); - bp_curr->primary = bp_temp[pri].primary; + tipc_bcast_lock(net); + tipc_link_add_bc_peer(snd_l, uc_l, xmitq); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} - if (bp_temp[pri].secondary) { - if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, - &bp_temp[pri].secondary->nodes)) { - bp_curr->secondary = bp_temp[pri].secondary; - } else { - bp_curr++; - bp_curr->primary = bp_temp[pri].secondary; - } - } +/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer + * + * RCU is locked, node lock is set + */ +void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) +{ + struct tipc_link *snd_l = tipc_bc_sndlink(net); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct sk_buff_head xmitq; - bp_curr++; - } + __skb_queue_head_init(&xmitq); - tipc_bclink_unlock(net); + tipc_bcast_lock(net); + tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); + + tipc_bcbase_xmit(net, &xmitq); + + /* Any socket wakeup messages ? */ + if (!skb_queue_empty(inputq)) + tipc_sk_rcv(net, inputq); } static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, @@ -836,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) if (!bcl) return 0; - tipc_bclink_lock(net); + tipc_bcast_lock(net); hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, NLM_F_MULTI, TIPC_NL_LINK_GET); @@ -871,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) if (err) goto attr_msg_full; - tipc_bclink_unlock(net); + tipc_bcast_unlock(net); nla_nest_end(msg->skb, attrs); genlmsg_end(msg->skb, hdr); @@ -882,7 +441,7 @@ prop_msg_full: attr_msg_full: nla_nest_cancel(msg->skb, attrs); msg_full: - tipc_bclink_unlock(net); + tipc_bcast_unlock(net); genlmsg_cancel(msg->skb, hdr); return -EMSGSIZE; @@ -896,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net) if (!bcl) return -ENOPROTOOPT; - tipc_bclink_lock(net); + tipc_bcast_lock(net); memset(&bcl->stats, 0, sizeof(bcl->stats)); - tipc_bclink_unlock(net); + tipc_bcast_unlock(net); return 0; } -int tipc_bclink_set_queue_limits(struct net *net, u32 limit) +static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; + struct tipc_link *l = tipc_bc_sndlink(net); - if (!bcl) + if (!l) return -ENOPROTOOPT; if (limit < BCLINK_WIN_MIN) limit = BCLINK_WIN_MIN; if (limit > TIPC_MAX_LINK_WIN) return -EINVAL; - tipc_bclink_lock(net); - tipc_link_set_queue_limits(bcl, limit); - tipc_bclink_unlock(net); + tipc_bcast_lock(net); + tipc_link_set_queue_limits(l, limit); + tipc_bcast_unlock(net); return 0; } @@ -937,123 +495,51 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); - return tipc_bclink_set_queue_limits(net, win); + return tipc_bc_link_set_queue_limits(net, win); } -int tipc_bclink_init(struct net *net) +int tipc_bcast_init(struct net *net) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bcbearer *bcbearer; - struct tipc_bclink *bclink; - struct tipc_link *bcl; + struct tipc_net *tn = tipc_net(net); + struct tipc_bc_base *bb = NULL; + struct tipc_link *l = NULL; - bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); - if (!bcbearer) - return -ENOMEM; + bb = kzalloc(sizeof(*bb), GFP_ATOMIC); + if (!bb) + goto enomem; + tn->bcbase = bb; + spin_lock_init(&tipc_net(net)->bclock); - bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); - if (!bclink) { - kfree(bcbearer); - return -ENOMEM; - } - - bcl = &bclink->link; - bcbearer->bearer.media = &bcbearer->media; - bcbearer->media.send_msg = tipc_bcbearer_send; - sprintf(bcbearer->media.name, "tipc-broadcast"); - - spin_lock_init(&bclink->lock); - __skb_queue_head_init(&bcl->transmq); - __skb_queue_head_init(&bcl->backlogq); - __skb_queue_head_init(&bcl->deferdq); - skb_queue_head_init(&bcl->wakeupq); - bcl->snd_nxt = 1; - spin_lock_init(&bclink->node.lock); - __skb_queue_head_init(&bclink->arrvq); - skb_queue_head_init(&bclink->inputq); - bcl->owner = &bclink->node; - bcl->owner->net = net; - bcl->mtu = MAX_PKT_DEFAULT_MCAST; - tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); - bcl->bearer_id = MAX_BEARERS; - rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); - bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; - msg_set_prevnode(bcl->pmsg, tn->own_addr); - strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); - tn->bcbearer = bcbearer; - tn->bclink = bclink; - tn->bcl = bcl; + if (!tipc_link_bc_create(net, 0, 0, + U16_MAX, + BCLINK_WIN_DEFAULT, + 0, + &bb->inputq, + NULL, + NULL, + &l)) + goto enomem; + bb->link = l; + tn->bcl = l; return 0; +enomem: + kfree(bb); + kfree(l); + return -ENOMEM; } -void tipc_bclink_stop(struct net *net) +void tipc_bcast_reinit(struct net *net) +{ + struct tipc_bc_base *b = tipc_bc_base(net); + + msg_set_prevnode(b->link->pmsg, tipc_own_addr(net)); +} + +void tipc_bcast_stop(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); - tipc_bclink_lock(net); - tipc_link_purge_queues(tn->bcl); - tipc_bclink_unlock(net); - - RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); synchronize_net(); - kfree(tn->bcbearer); - kfree(tn->bclink); -} - -/** - * tipc_nmap_add - add a node to a node map - */ -static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) -{ - int n = tipc_node(node); - int w = n / WSIZE; - u32 mask = (1 << (n % WSIZE)); - - if ((nm_ptr->map[w] & mask) == 0) { - nm_ptr->count++; - nm_ptr->map[w] |= mask; - } -} - -/** - * tipc_nmap_remove - remove a node from a node map - */ -static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) -{ - int n = tipc_node(node); - int w = n / WSIZE; - u32 mask = (1 << (n % WSIZE)); - - if ((nm_ptr->map[w] & mask) != 0) { - nm_ptr->map[w] &= ~mask; - nm_ptr->count--; - } -} - -/** - * tipc_nmap_diff - find differences between node maps - * @nm_a: input node map A - * @nm_b: input node map B - * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) - */ -static void tipc_nmap_diff(struct tipc_node_map *nm_a, - struct tipc_node_map *nm_b, - struct tipc_node_map *nm_diff) -{ - int stop = ARRAY_SIZE(nm_a->map); - int w; - int b; - u32 map; - - memset(nm_diff, 0, sizeof(*nm_diff)); - for (w = 0; w < stop; w++) { - map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); - nm_diff->map[w] = map; - if (map != 0) { - for (b = 0 ; b < WSIZE; b++) { - if (map & (1 << b)) - nm_diff->count++; - } - } - } + kfree(tn->bcbase); + kfree(tn->bcl); } diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index d74c69bcf60b..2855b9356a15 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -37,102 +37,44 @@ #ifndef _TIPC_BCAST_H #define _TIPC_BCAST_H -#include -#include "link.h" -#include "node.h" - -/** - * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link - * @primary: pointer to primary bearer - * @secondary: pointer to secondary bearer - * - * Bearers must have same priority and same set of reachable destinations - * to be paired. - */ - -struct tipc_bcbearer_pair { - struct tipc_bearer *primary; - struct tipc_bearer *secondary; -}; - -#define BCBEARER MAX_BEARERS - -/** - * struct tipc_bcbearer - bearer used by broadcast link - * @bearer: (non-standard) broadcast bearer structure - * @media: (non-standard) broadcast media structure - * @bpairs: array of bearer pairs - * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort() - * @remains: temporary node map used by tipc_bcbearer_send() - * @remains_new: temporary node map used tipc_bcbearer_send() - * - * Note: The fields labelled "temporary" are incorporated into the bearer - * to avoid consuming potentially limited stack space through the use of - * large local variables within multicast routines. Concurrent access is - * prevented through use of the spinlock "bclink_lock". - */ -struct tipc_bcbearer { - struct tipc_bearer bearer; - struct tipc_media media; - struct tipc_bcbearer_pair bpairs[MAX_BEARERS]; - struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; - struct tipc_node_map remains; - struct tipc_node_map remains_new; -}; - -/** - * struct tipc_bclink - link used for broadcast messages - * @lock: spinlock governing access to structure - * @link: (non-standard) broadcast link structure - * @node: (non-standard) node structure representing b'cast link's peer node - * @bcast_nodes: map of broadcast-capable nodes - * @retransmit_to: node that most recently requested a retransmit - * - * Handles sequence numbering, fragmentation, bundling, etc. - */ -struct tipc_bclink { - spinlock_t lock; - struct tipc_link link; - struct tipc_node node; - struct sk_buff_head arrvq; - struct sk_buff_head inputq; - struct tipc_node_map bcast_nodes; - struct tipc_node *retransmit_to; -}; +#include "core.h" struct tipc_node; -extern const char tipc_bclink_name[]; +struct tipc_msg; +struct tipc_nl_msg; +struct tipc_node_map; -/** - * tipc_nmap_equal - test for equality of node maps - */ -static inline int tipc_nmap_equal(struct tipc_node_map *nm_a, - struct tipc_node_map *nm_b) -{ - return !memcmp(nm_a, nm_b, sizeof(*nm_a)); -} - -int tipc_bclink_init(struct net *net); -void tipc_bclink_stop(struct net *net); -void tipc_bclink_add_node(struct net *net, u32 addr); -void tipc_bclink_remove_node(struct net *net, u32 addr); -struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); -void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); -void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); -u32 tipc_bclink_get_last_sent(struct net *net); -u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); -void tipc_bclink_update_link_state(struct tipc_node *node, - u32 last_sent); -int tipc_bclink_reset_stats(struct net *net); -int tipc_bclink_set_queue_limits(struct net *net, u32 limit); -void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, - u32 node, bool action); -uint tipc_bclink_get_mtu(void); -int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list); -void tipc_bclink_wakeup_users(struct net *net); +int tipc_bcast_init(struct net *net); +void tipc_bcast_reinit(struct net *net); +void tipc_bcast_stop(struct net *net); +void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, + struct sk_buff_head *xmitq); +void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); +void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); +void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); +int tipc_bcast_get_mtu(struct net *net); +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); +int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); +void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked); +void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, + struct tipc_msg *hdr); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); -void tipc_bclink_input(struct net *net); -void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg); +int tipc_bclink_reset_stats(struct net *net); + +static inline void tipc_bcast_lock(struct net *net) +{ + spin_lock_bh(&tipc_net(net)->bclock); +} + +static inline void tipc_bcast_unlock(struct net *net) +{ + spin_unlock_bh(&tipc_net(net)->bclock); +} + +static inline struct tipc_link *tipc_bc_sndlink(struct net *net) +{ + return tipc_net(net)->bcl; +} #endif diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 82b278668ab7..648f2a67f314 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest) rcu_read_lock(); b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); - if (b_ptr) { - tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true); + if (b_ptr) tipc_disc_add_dest(b_ptr->link_req); - } rcu_read_unlock(); } @@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest) rcu_read_lock(); b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); - if (b_ptr) { - tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false); + if (b_ptr) tipc_disc_remove_dest(b_ptr->link_req); - } rcu_read_unlock(); } @@ -418,10 +414,9 @@ void tipc_disable_l2_media(struct tipc_bearer *b) * @b_ptr: the bearer through which the packet is to be sent * @dest: peer destination address */ -int tipc_l2_send_msg(struct net *net, struct sk_buff *buf, +int tipc_l2_send_msg(struct net *net, struct sk_buff *skb, struct tipc_bearer *b, struct tipc_media_addr *dest) { - struct sk_buff *clone; struct net_device *dev; int delta; @@ -429,42 +424,48 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *buf, if (!dev) return 0; - clone = skb_clone(buf, GFP_ATOMIC); - if (!clone) - return 0; - - delta = dev->hard_header_len - skb_headroom(buf); + delta = dev->hard_header_len - skb_headroom(skb); if ((delta > 0) && - pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { - kfree_skb(clone); + pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { + kfree_skb(skb); return 0; } - skb_reset_network_header(clone); - clone->dev = dev; - clone->protocol = htons(ETH_P_TIPC); - dev_hard_header(clone, dev, ETH_P_TIPC, dest->value, - dev->dev_addr, clone->len); - dev_queue_xmit(clone); + skb_reset_network_header(skb); + skb->dev = dev; + skb->protocol = htons(ETH_P_TIPC); + dev_hard_header(skb, dev, ETH_P_TIPC, dest->value, + dev->dev_addr, skb->len); + dev_queue_xmit(skb); return 0; } -/* tipc_bearer_send- sends buffer to destination over bearer - * - * IMPORTANT: - * The media send routine must not alter the buffer being passed in - * as it may be needed for later retransmission! - */ -void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, - struct tipc_media_addr *dest) +int tipc_bearer_mtu(struct net *net, u32 bearer_id) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_bearer *b_ptr; + int mtu = 0; + struct tipc_bearer *b; rcu_read_lock(); - b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); - if (likely(b_ptr)) - b_ptr->media->send_msg(net, buf, b_ptr, dest); + b = rcu_dereference_rtnl(tipc_net(net)->bearer_list[bearer_id]); + if (b) + mtu = b->mtu; + rcu_read_unlock(); + return mtu; +} + +/* tipc_bearer_xmit_skb - sends buffer to destination over bearer + */ +void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, + struct sk_buff *skb, + struct tipc_media_addr *dest) +{ + struct tipc_net *tn = tipc_net(net); + struct tipc_bearer *b; + + rcu_read_lock(); + b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + if (likely(b)) + b->media->send_msg(net, skb, b, dest); rcu_read_unlock(); } @@ -487,8 +488,31 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id, skb_queue_walk_safe(xmitq, skb, tmp) { __skb_dequeue(xmitq); b->media->send_msg(net, skb, b, dst); - /* Until we remove cloning in tipc_l2_send_msg(): */ - kfree_skb(skb); + } + } + rcu_read_unlock(); +} + +/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations + */ +void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq) +{ + struct tipc_net *tn = tipc_net(net); + int net_id = tn->net_id; + struct tipc_bearer *b; + struct sk_buff *skb, *tmp; + struct tipc_msg *hdr; + + rcu_read_lock(); + b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + if (likely(b)) { + skb_queue_walk_safe(xmitq, skb, tmp) { + hdr = buf_msg(skb); + msg_set_non_seq(hdr, 1); + msg_set_mc_netid(hdr, net_id); + __skb_dequeue(xmitq); + b->media->send_msg(net, skb, b, &b->bcast_addr); } } rcu_read_unlock(); diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 6426f242f626..552185bc4773 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -163,6 +163,7 @@ struct tipc_bearer { u32 identity; struct tipc_link_req *link_req; char net_plane; + int node_cnt; struct tipc_node_map nodes; }; @@ -215,10 +216,14 @@ struct tipc_media *tipc_media_find(const char *name); int tipc_bearer_setup(void); void tipc_bearer_cleanup(void); void tipc_bearer_stop(struct net *net); -void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, - struct tipc_media_addr *dest); +int tipc_bearer_mtu(struct net *net, u32 bearer_id); +void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, + struct sk_buff *skb, + struct tipc_media_addr *dest); void tipc_bearer_xmit(struct net *net, u32 bearer_id, struct sk_buff_head *xmitq, struct tipc_media_addr *dst); +void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq); #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/core.c b/net/tipc/core.c index 005ba5eb0ea4..03a842870c52 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -42,6 +42,7 @@ #include "bearer.h" #include "net.h" #include "socket.h" +#include "bcast.h" #include @@ -71,8 +72,15 @@ static int __net_init tipc_init_net(struct net *net) err = tipc_topsrv_start(net); if (err) goto out_subscr; + + err = tipc_bcast_init(net); + if (err) + goto out_bclink; + return 0; +out_bclink: + tipc_bcast_stop(net); out_subscr: tipc_nametbl_stop(net); out_nametbl: @@ -85,6 +93,7 @@ static void __net_exit tipc_exit_net(struct net *net) { tipc_topsrv_stop(net); tipc_net_stop(net); + tipc_bcast_stop(net); tipc_nametbl_stop(net); tipc_sk_rht_destroy(net); } diff --git a/net/tipc/core.h b/net/tipc/core.h index b96b41eabf12..18e95a8020cd 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -62,8 +62,7 @@ struct tipc_node; struct tipc_bearer; -struct tipc_bcbearer; -struct tipc_bclink; +struct tipc_bc_base; struct tipc_link; struct tipc_name_table; struct tipc_server; @@ -93,8 +92,8 @@ struct tipc_net { struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1]; /* Broadcast link */ - struct tipc_bcbearer *bcbearer; - struct tipc_bclink *bclink; + spinlock_t bclock; + struct tipc_bc_base *bcbase; struct tipc_link *bcl; /* Socket hash table */ @@ -114,6 +113,11 @@ static inline struct tipc_net *tipc_net(struct net *net) return net_generic(net, tipc_net_id); } +static inline int tipc_netid(struct net *net) +{ + return tipc_net(net)->net_id; +} + static inline u16 mod(u16 x) { return x & 0xffffu; diff --git a/net/tipc/discover.c b/net/tipc/discover.c index d14e0a4aa9af..afe8c47c4085 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -89,7 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type, MAX_H_SIZE, dest_domain); msg_set_non_seq(msg, 1); msg_set_node_sig(msg, tn->random); - msg_set_node_capabilities(msg, 0); + msg_set_node_capabilities(msg, TIPC_NODE_CAPABILITIES); msg_set_dest_domain(msg, dest_domain); msg_set_bc_netid(msg, tn->net_id); b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); @@ -167,11 +167,10 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *skb, /* Send response, if necessary */ if (respond && (mtyp == DSC_REQ_MSG)) { rskb = tipc_buf_acquire(MAX_H_SIZE); - if (rskb) { - tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); - tipc_bearer_send(net, bearer->identity, rskb, &maddr); - kfree_skb(rskb); - } + if (!rskb) + return; + tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); + tipc_bearer_xmit_skb(net, bearer->identity, rskb, &maddr); } } @@ -225,6 +224,7 @@ void tipc_disc_remove_dest(struct tipc_link_req *req) static void disc_timeout(unsigned long data) { struct tipc_link_req *req = (struct tipc_link_req *)data; + struct sk_buff *skb; int max_delay; spin_lock_bh(&req->lock); @@ -242,9 +242,9 @@ static void disc_timeout(unsigned long data) * hold at fast polling rate if don't have any associated nodes, * otherwise hold at slow polling rate */ - tipc_bearer_send(req->net, req->bearer_id, req->buf, &req->dest); - - + skb = skb_clone(req->buf, GFP_ATOMIC); + if (skb) + tipc_bearer_xmit_skb(req->net, req->bearer_id, skb, &req->dest); req->timer_intv *= 2; if (req->num_nodes) max_delay = TIPC_LINK_REQ_SLOW; @@ -271,6 +271,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr, struct tipc_media_addr *dest) { struct tipc_link_req *req; + struct sk_buff *skb; req = kmalloc(sizeof(*req), GFP_ATOMIC); if (!req) @@ -292,7 +293,9 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr, setup_timer(&req->timer, disc_timeout, (unsigned long)req); mod_timer(&req->timer, jiffies + req->timer_intv); b_ptr->link_req = req; - tipc_bearer_send(net, req->bearer_id, req->buf, &req->dest); + skb = skb_clone(req->buf, GFP_ATOMIC); + if (skb) + tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest); return 0; } @@ -316,6 +319,7 @@ void tipc_disc_delete(struct tipc_link_req *req) void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr) { struct tipc_link_req *req = b_ptr->link_req; + struct sk_buff *skb; spin_lock_bh(&req->lock); tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr); @@ -325,6 +329,8 @@ void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr) req->num_nodes = 0; req->timer_intv = TIPC_LINK_REQ_INIT; mod_timer(&req->timer, jiffies + req->timer_intv); - tipc_bearer_send(net, req->bearer_id, req->buf, &req->dest); + skb = skb_clone(req->buf, GFP_ATOMIC); + if (skb) + tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest); spin_unlock_bh(&req->lock); } diff --git a/net/tipc/link.c b/net/tipc/link.c index ff9b0b92e62e..4449fa01e232 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -50,6 +50,7 @@ */ static const char *link_co_err = "Link tunneling error, "; static const char *link_rst_msg = "Resetting link "; +static const char tipc_bclink_name[] = "broadcast-link"; static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, @@ -75,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = { [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } }; +/* Send states for broadcast NACKs + */ +enum { + BC_NACK_SND_CONDITIONAL, + BC_NACK_SND_UNCONDITIONAL, + BC_NACK_SND_SUPPRESS, +}; + /* * Interval between NACKs when packets arrive out of order */ @@ -110,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, struct sk_buff_head *xmitq); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static void tipc_link_build_nack_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +static void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +static bool tipc_link_release_pkts(struct tipc_link *l, u16 to); /* * Simple non-static link routines (i.e. referenced outside this file) @@ -150,11 +163,66 @@ bool tipc_link_is_blocked(struct tipc_link *l) return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); } +bool link_is_bc_sndlink(struct tipc_link *l) +{ + return !l->bc_sndlink; +} + +bool link_is_bc_rcvlink(struct tipc_link *l) +{ + return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l)); +} + int tipc_link_is_active(struct tipc_link *l) { - struct tipc_node *n = l->owner; + return l->active; +} - return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); +void tipc_link_set_active(struct tipc_link *l, bool active) +{ + l->active = active; +} + +void tipc_link_add_bc_peer(struct tipc_link *snd_l, + struct tipc_link *uc_l, + struct sk_buff_head *xmitq) +{ + struct tipc_link *rcv_l = uc_l->bc_rcvlink; + + snd_l->ackers++; + rcv_l->acked = snd_l->snd_nxt - 1; + tipc_link_build_bc_init_msg(uc_l, xmitq); +} + +void tipc_link_remove_bc_peer(struct tipc_link *snd_l, + struct tipc_link *rcv_l, + struct sk_buff_head *xmitq) +{ + u16 ack = snd_l->snd_nxt - 1; + + snd_l->ackers--; + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); + tipc_link_reset(rcv_l); + rcv_l->state = LINK_RESET; + if (!snd_l->ackers) { + tipc_link_reset(snd_l); + __skb_queue_purge(xmitq); + } +} + +int tipc_link_bc_peers(struct tipc_link *l) +{ + return l->ackers; +} + +void tipc_link_set_mtu(struct tipc_link *l, int mtu) +{ + l->mtu = mtu; +} + +int tipc_link_mtu(struct tipc_link *l) +{ + return l->mtu; } static u32 link_own_addr(struct tipc_link *l) @@ -165,57 +233,72 @@ static u32 link_own_addr(struct tipc_link *l) /** * tipc_link_create - create a new link * @n: pointer to associated node - * @b: pointer to associated bearer + * @if_name: associated interface name + * @bearer_id: id (index) of associated bearer + * @tolerance: link tolerance to be used by link + * @net_plane: network plane (A,B,c..) this link belongs to + * @mtu: mtu to be advertised by link + * @priority: priority to be used by link + * @window: send window to be used by link + * @session: session to be used by link * @ownnode: identity of own node - * @peer: identity of peer node - * @maddr: media address to be used + * @peer: node id of peer node + * @peer_caps: bitmap describing peer node capabilities + * @bc_sndlink: the namespace global link used for broadcast sending + * @bc_rcvlink: the peer specific link used for broadcast reception * @inputq: queue to put messages ready for delivery * @namedq: queue to put binding table update messages ready for delivery * @link: return value, pointer to put the created link * * Returns true if link was created, otherwise false */ -bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, - u32 ownnode, u32 peer, struct tipc_media_addr *maddr, - struct sk_buff_head *inputq, struct sk_buff_head *namedq, +bool tipc_link_create(struct net *net, char *if_name, int bearer_id, + int tolerance, char net_plane, u32 mtu, int priority, + int window, u32 session, u32 ownnode, u32 peer, + u16 peer_caps, + struct tipc_link *bc_sndlink, + struct tipc_link *bc_rcvlink, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, struct tipc_link **link) { struct tipc_link *l; struct tipc_msg *hdr; - char *if_name; l = kzalloc(sizeof(*l), GFP_ATOMIC); if (!l) return false; *link = l; - - /* Note: peer i/f name is completed by reset/activate message */ - if_name = strchr(b->name, ':') + 1; - sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", - tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), - if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); - - l->addr = peer; - l->media_addr = maddr; - l->owner = n; - l->peer_session = WILDCARD_SESSION; - l->bearer_id = b->identity; - l->tolerance = b->tolerance; - l->net_plane = b->net_plane; - l->advertised_mtu = b->mtu; - l->mtu = b->mtu; - l->priority = b->priority; - tipc_link_set_queue_limits(l, b->window); - l->inputq = inputq; - l->namedq = namedq; - l->state = LINK_RESETTING; l->pmsg = (struct tipc_msg *)&l->proto_msg; hdr = l->pmsg; tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); msg_set_size(hdr, sizeof(l->proto_msg)); msg_set_session(hdr, session); msg_set_bearer_id(hdr, l->bearer_id); + + /* Note: peer i/f name is completed by reset/activate message */ + sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", + tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), + if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); strcpy((char *)msg_data(hdr), if_name); + + l->addr = peer; + l->peer_caps = peer_caps; + l->net = net; + l->peer_session = WILDCARD_SESSION; + l->bearer_id = bearer_id; + l->tolerance = tolerance; + l->net_plane = net_plane; + l->advertised_mtu = mtu; + l->mtu = mtu; + l->priority = priority; + tipc_link_set_queue_limits(l, window); + l->ackers = 1; + l->bc_sndlink = bc_sndlink; + l->bc_rcvlink = bc_rcvlink; + l->inputq = inputq; + l->namedq = namedq; + l->state = LINK_RESETTING; __skb_queue_head_init(&l->transmq); __skb_queue_head_init(&l->backlogq); __skb_queue_head_init(&l->deferdq); @@ -224,27 +307,43 @@ bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, return true; } -/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. +/** + * tipc_link_bc_create - create new link to be used for broadcast + * @n: pointer to associated node + * @mtu: mtu to be used + * @window: send window to be used + * @inputq: queue to put messages ready for delivery + * @namedq: queue to put binding table update messages ready for delivery + * @link: return value, pointer to put the created link * - * Give a newly added peer node the sequence number where it should - * start receiving and acking broadcast packets. + * Returns true if link was created, otherwise false */ -void tipc_link_build_bcast_sync_msg(struct tipc_link *l, - struct sk_buff_head *xmitq) +bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, + int mtu, int window, u16 peer_caps, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, + struct tipc_link *bc_sndlink, + struct tipc_link **link) { - struct sk_buff *skb; - struct sk_buff_head list; - u16 last_sent; + struct tipc_link *l; - skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, - 0, l->addr, link_own_addr(l), 0, 0, 0); - if (!skb) - return; - last_sent = tipc_bclink_get_last_sent(l->owner->net); - msg_set_last_bcast(buf_msg(skb), last_sent); - __skb_queue_head_init(&list); - __skb_queue_tail(&list, skb); - tipc_link_xmit(l, &list, xmitq); + if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, + 0, ownnode, peer, peer_caps, bc_sndlink, + NULL, inputq, namedq, link)) + return false; + + l = *link; + strcpy(l->name, tipc_bclink_name); + tipc_link_reset(l); + l->state = LINK_RESET; + l->ackers = 0; + l->bc_rcvlink = l; + + /* Broadcast send link is always up */ + if (link_is_bc_sndlink(l)) + l->state = LINK_ESTABLISHED; + + return true; } /** @@ -449,6 +548,8 @@ static void link_profile_stats(struct tipc_link *l) l->stats.msg_length_profile[6]++; } +/* tipc_link_timeout - perform periodic task as instructed from node timeout + */ /* tipc_link_timeout - perform periodic task as instructed from node timeout */ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) @@ -457,6 +558,9 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) int mtyp = STATE_MSG; bool xmit = false; bool prb = false; + u16 bc_snt = l->bc_sndlink->snd_nxt - 1; + u16 bc_acked = l->bc_rcvlink->acked; + bool bc_up = link_is_up(l->bc_rcvlink); link_profile_stats(l); @@ -464,7 +568,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) case LINK_ESTABLISHED: case LINK_SYNCHING: if (!l->silent_intv_cnt) { - if (tipc_bclink_acks_missing(l->owner)) + if (bc_up && (bc_acked != bc_snt)) xmit = true; } else if (l->silent_intv_cnt <= l->abort_limit) { xmit = true; @@ -555,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l) } } -/** - * tipc_link_reset_fragments - purge link's inbound message fragments queue - * @l_ptr: pointer to link - */ -void tipc_link_reset_fragments(struct tipc_link *l_ptr) -{ - kfree_skb(l_ptr->reasm_buf); - l_ptr->reasm_buf = NULL; -} - -void tipc_link_purge_backlog(struct tipc_link *l) -{ - __skb_queue_purge(&l->backlogq); - l->backlog[TIPC_LOW_IMPORTANCE].len = 0; - l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; - l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; - l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; - l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; -} - -/** - * tipc_link_purge_queues - purge all pkt queues associated with link - * @l_ptr: pointer to link - */ -void tipc_link_purge_queues(struct tipc_link *l_ptr) -{ - __skb_queue_purge(&l_ptr->deferdq); - __skb_queue_purge(&l_ptr->transmq); - tipc_link_purge_backlog(l_ptr); - tipc_link_reset_fragments(l_ptr); -} - void tipc_link_reset(struct tipc_link *l) { /* Link is down, accept any session */ @@ -598,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l) /* Prepare for renewed mtu size negotiation */ l->mtu = l->advertised_mtu; - /* Clean up all queues: */ + /* Clean up all queues and counters: */ __skb_queue_purge(&l->transmq); __skb_queue_purge(&l->deferdq); skb_queue_splice_init(&l->wakeupq, l->inputq); - - tipc_link_purge_backlog(l); + __skb_queue_purge(&l->backlogq); + l->backlog[TIPC_LOW_IMPORTANCE].len = 0; + l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; + l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; + l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; + l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; @@ -611,80 +687,14 @@ void tipc_link_reset(struct tipc_link *l) l->rcv_unacked = 0; l->snd_nxt = 1; l->rcv_nxt = 1; + l->acked = 0; l->silent_intv_cnt = 0; l->stats.recv_info = 0; l->stale_count = 0; + l->bc_peer_is_up = false; link_reset_statistics(l); } -/** - * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked - * @link: link to use - * @list: chain of buffers containing message - * - * Consumes the buffer chain, except when returning an error code, - * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS - * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted - */ -int __tipc_link_xmit(struct net *net, struct tipc_link *link, - struct sk_buff_head *list) -{ - struct tipc_msg *msg = buf_msg(skb_peek(list)); - unsigned int maxwin = link->window; - unsigned int i, imp = msg_importance(msg); - uint mtu = link->mtu; - u16 ack = mod(link->rcv_nxt - 1); - u16 seqno = link->snd_nxt; - u16 bc_last_in = link->owner->bclink.last_in; - struct tipc_media_addr *addr = link->media_addr; - struct sk_buff_head *transmq = &link->transmq; - struct sk_buff_head *backlogq = &link->backlogq; - struct sk_buff *skb, *bskb; - - /* Match msg importance against this and all higher backlog limits: */ - for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { - if (unlikely(link->backlog[i].len >= link->backlog[i].limit)) - return link_schedule_user(link, list); - } - if (unlikely(msg_size(msg) > mtu)) - return -EMSGSIZE; - - /* Prepare each packet for sending, and add to relevant queue: */ - while (skb_queue_len(list)) { - skb = skb_peek(list); - msg = buf_msg(skb); - msg_set_seqno(msg, seqno); - msg_set_ack(msg, ack); - msg_set_bcast_ack(msg, bc_last_in); - - if (likely(skb_queue_len(transmq) < maxwin)) { - __skb_dequeue(list); - __skb_queue_tail(transmq, skb); - tipc_bearer_send(net, link->bearer_id, skb, addr); - link->rcv_unacked = 0; - seqno++; - continue; - } - if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) { - kfree_skb(__skb_dequeue(list)); - link->stats.sent_bundled++; - continue; - } - if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) { - kfree_skb(__skb_dequeue(list)); - __skb_queue_tail(backlogq, bskb); - link->backlog[msg_importance(buf_msg(bskb))].len++; - link->stats.sent_bundled++; - link->stats.sent_bundles++; - continue; - } - link->backlog[imp].len += skb_queue_len(list); - skb_queue_splice_tail_init(list, backlogq); - } - link->snd_nxt = seqno; - return 0; -} - /** * tipc_link_xmit(): enqueue buffer list according to queue situation * @link: link to use @@ -705,7 +715,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, unsigned int mtu = l->mtu; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; - u16 bc_last_in = l->owner->bclink.last_in; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; struct sk_buff_head *transmq = &l->transmq; struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff *skb, *_skb, *bskb; @@ -724,7 +734,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, hdr = buf_msg(skb); msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_last_in); + msg_set_bcast_ack(hdr, bc_ack); if (likely(skb_queue_len(transmq) < maxwin)) { _skb = skb_clone(skb, GFP_ATOMIC); @@ -733,6 +743,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, __skb_dequeue(list); __skb_queue_tail(transmq, skb); __skb_queue_tail(xmitq, _skb); + TIPC_SKB_CB(skb)->ackers = l->ackers; l->rcv_unacked = 0; seqno++; continue; @@ -757,62 +768,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return 0; } -/* - * tipc_link_sync_rcv - synchronize broadcast link endpoints. - * Receive the sequence number where we should start receiving and - * acking broadcast packets from a newly added peer node, and open - * up for reception of such packets. - * - * Called with node locked - */ -static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) -{ - struct tipc_msg *msg = buf_msg(buf); - - n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); - n->bclink.recv_permitted = true; - kfree_skb(buf); -} - -/* - * tipc_link_push_packets - push unsent packets to bearer - * - * Push out the unsent messages of a link where congestion - * has abated. Node is locked. - * - * Called with node locked - */ -void tipc_link_push_packets(struct tipc_link *link) -{ - struct sk_buff *skb; - struct tipc_msg *msg; - u16 seqno = link->snd_nxt; - u16 ack = mod(link->rcv_nxt - 1); - - while (skb_queue_len(&link->transmq) < link->window) { - skb = __skb_dequeue(&link->backlogq); - if (!skb) - break; - msg = buf_msg(skb); - link->backlog[msg_importance(msg)].len--; - msg_set_ack(msg, ack); - msg_set_seqno(msg, seqno); - seqno = mod(seqno + 1); - msg_set_bcast_ack(msg, link->owner->bclink.last_in); - link->rcv_unacked = 0; - __skb_queue_tail(&link->transmq, skb); - tipc_bearer_send(link->owner->net, link->bearer_id, - skb, link->media_addr); - } - link->snd_nxt = seqno; -} - void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) { struct sk_buff *skb, *_skb; struct tipc_msg *hdr; u16 seqno = l->snd_nxt; u16 ack = l->rcv_nxt - 1; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; while (skb_queue_len(&l->transmq) < l->window) { skb = skb_peek(&l->backlogq); @@ -826,96 +788,35 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) l->backlog[msg_importance(hdr)].len--; __skb_queue_tail(&l->transmq, skb); __skb_queue_tail(xmitq, _skb); - msg_set_ack(hdr, ack); + TIPC_SKB_CB(skb)->ackers = l->ackers; msg_set_seqno(hdr, seqno); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); l->rcv_unacked = 0; seqno++; } l->snd_nxt = seqno; } -static void link_retransmit_failure(struct tipc_link *l_ptr, - struct sk_buff *buf) +static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb) { - struct tipc_msg *msg = buf_msg(buf); - struct net *net = l_ptr->owner->net; + struct tipc_msg *hdr = buf_msg(skb); - pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); - - if (l_ptr->addr) { - /* Handle failure on standard link */ - link_print(l_ptr, "Resetting link "); - pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", - msg_user(msg), msg_type(msg), msg_size(msg), - msg_errcode(msg)); - pr_info("sqno %u, prev: %x, src: %x\n", - msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg)); - } else { - /* Handle failure on broadcast link */ - struct tipc_node *n_ptr; - char addr_string[16]; - - pr_info("Msg seq number: %u, ", msg_seqno(msg)); - pr_cont("Outstanding acks: %lu\n", - (unsigned long) TIPC_SKB_CB(buf)->handle); - - n_ptr = tipc_bclink_retransmit_to(net); - - tipc_addr_string_fill(addr_string, n_ptr->addr); - pr_info("Broadcast link info for %s\n", addr_string); - pr_info("Reception permitted: %d, Acked: %u\n", - n_ptr->bclink.recv_permitted, - n_ptr->bclink.acked); - pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", - n_ptr->bclink.last_in, - n_ptr->bclink.oos_state, - n_ptr->bclink.last_sent); - - n_ptr->action_flags |= TIPC_BCAST_RESET; - l_ptr->stale_count = 0; - } + pr_warn("Retransmission failure on link <%s>\n", l->name); + link_print(l, "Resetting link "); + pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", + msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr)); + pr_info("sqno %u, prev: %x, src: %x\n", + msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); } -void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, - u32 retransmits) -{ - struct tipc_msg *msg; - - if (!skb) - return; - - msg = buf_msg(skb); - - /* Detect repeated retransmit failures */ - if (l_ptr->last_retransm == msg_seqno(msg)) { - if (++l_ptr->stale_count > 100) { - link_retransmit_failure(l_ptr, skb); - return; - } - } else { - l_ptr->last_retransm = msg_seqno(msg); - l_ptr->stale_count = 1; - } - - skb_queue_walk_from(&l_ptr->transmq, skb) { - if (!retransmits) - break; - msg = buf_msg(skb); - msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); - msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); - tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb, - l_ptr->media_addr); - retransmits--; - l_ptr->stats.retransmitted++; - } -} - -static int tipc_link_retransm(struct tipc_link *l, int retransm, - struct sk_buff_head *xmitq) +int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, + struct sk_buff_head *xmitq) { struct sk_buff *_skb, *skb = skb_peek(&l->transmq); struct tipc_msg *hdr; + u16 ack = l->rcv_nxt - 1; + u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; if (!skb) return 0; @@ -928,19 +829,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, link_retransmit_failure(l, skb); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } + + /* Move forward to where retransmission should start */ skb_queue_walk(&l->transmq, skb) { - if (!retransm) - return 0; + if (!less(buf_seqno(skb), from)) + break; + } + + skb_queue_walk_from(&l->transmq, skb) { + if (more(buf_seqno(skb), to)) + break; hdr = buf_msg(skb); _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); if (!_skb) return 0; hdr = buf_msg(_skb); - msg_set_ack(hdr, l->rcv_nxt - 1); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); - retransm--; l->stats.retransmitted++; } return 0; @@ -951,11 +858,9 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm, * Consumes buffer if message is of right type * Node lock must be held */ -static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, +static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - struct tipc_node *node = link->owner; - switch (msg_user(buf_msg(skb))) { case TIPC_LOW_IMPORTANCE: case TIPC_MEDIUM_IMPORTANCE: @@ -965,8 +870,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: - node->bclink.recv_permitted = true; - skb_queue_tail(link->namedq, skb); + l->bc_rcvlink->state = LINK_ESTABLISHED; + skb_queue_tail(l->namedq, skb); return true; case MSG_BUNDLER: case TUNNEL_PROTOCOL: @@ -987,7 +892,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { - struct tipc_node *node = l->owner; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff **reasm_skb = &l->reasm_buf; struct sk_buff *iskb; @@ -1028,13 +932,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, if (tipc_buf_append(reasm_skb, &skb)) { l->stats.recv_fragmented++; tipc_data_input(l, skb, inputq); - } else if (!*reasm_skb) { + } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) { + pr_warn_ratelimited("Unable to build fragment list\n"); return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } return 0; } else if (usr == BCAST_PROTOCOL) { - tipc_link_sync_rcv(node, skb); - return 0; + tipc_bcast_lock(l->net); + tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); + tipc_bcast_unlock(l->net); } drop: kfree_skb(skb); @@ -1057,12 +963,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) } /* tipc_link_build_ack_msg: prepare link acknowledge message for transmission + * + * Note that sending of broadcast ack is coordinated among nodes, to reduce + * risk of ack storms towards the sender */ -void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) +int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) { + if (!l) + return 0; + + /* Broadcast ACK must be sent via a unicast link => defer to caller */ + if (link_is_bc_rcvlink(l)) { + if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf) + return 0; + l->rcv_unacked = 0; + return TIPC_LINK_SND_BC_ACK; + } + + /* Unicast ACK */ l->rcv_unacked = 0; l->stats.sent_acks++; tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); + return 0; } /* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message @@ -1084,6 +1006,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l, { u32 def_cnt = ++l->stats.deferred_recv; + if (link_is_bc_rcvlink(l)) + return; + if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); } @@ -1144,12 +1069,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, l->rcv_nxt++; l->stats.recv_info++; if (!tipc_data_input(l, skb, l->inputq)) - rc = tipc_link_input(l, skb, l->inputq); - if (unlikely(rc)) - break; + rc |= tipc_link_input(l, skb, l->inputq); if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) - tipc_link_build_ack_msg(l, xmitq); - + rc |= tipc_link_build_ack_msg(l, xmitq); + if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK)) + break; } while ((skb = __skb_dequeue(defq))); return rc; @@ -1158,45 +1082,6 @@ drop: return rc; } -/** - * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue - * - * Returns increase in queue length (i.e. 0 or 1) - */ -u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb) -{ - struct sk_buff *skb1; - u16 seq_no = buf_seqno(skb); - - /* Empty queue ? */ - if (skb_queue_empty(list)) { - __skb_queue_tail(list, skb); - return 1; - } - - /* Last ? */ - if (less(buf_seqno(skb_peek_tail(list)), seq_no)) { - __skb_queue_tail(list, skb); - return 1; - } - - /* Locate insertion point in queue, then insert; discard if duplicate */ - skb_queue_walk(list, skb1) { - u16 curr_seqno = buf_seqno(skb1); - - if (seq_no == curr_seqno) { - kfree_skb(skb); - return 0; - } - - if (less(seq_no, curr_seqno)) - break; - } - - __skb_queue_before(list, skb1, skb); - return 1; -} - /* * Send protocol message to the other endpoint. */ @@ -1212,23 +1097,17 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, skb = __skb_dequeue(&xmitq); if (!skb) return; - tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); + tipc_bearer_xmit_skb(l->net, l->bearer_id, skb, l->media_addr); l->rcv_unacked = 0; - kfree_skb(skb); } -/* tipc_link_build_proto_msg: prepare link protocol message for transmission - */ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq) { struct sk_buff *skb = NULL; struct tipc_msg *hdr = l->pmsg; - u16 snd_nxt = l->snd_nxt; - u16 rcv_nxt = l->rcv_nxt; - u16 rcv_last = rcv_nxt - 1; - int node_up = l->owner->bclink.recv_permitted; + bool node_up = link_is_up(l->bc_rcvlink); /* Don't send protocol message during reset or link failover */ if (tipc_link_is_blocked(l)) @@ -1236,33 +1115,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_type(hdr, mtyp); msg_set_net_plane(hdr, l->net_plane); - msg_set_bcast_ack(hdr, l->owner->bclink.last_in); - msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); + msg_set_next_sent(hdr, l->snd_nxt); + msg_set_ack(hdr, l->rcv_nxt - 1); + msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1); + msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); msg_set_link_tolerance(hdr, tolerance); msg_set_linkprio(hdr, priority); msg_set_redundant_link(hdr, node_up); msg_set_seq_gap(hdr, 0); /* Compatibility: created msg must not be in sequence with pkt flow */ - msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); + msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2); if (mtyp == STATE_MSG) { if (!tipc_link_is_up(l)) return; - msg_set_next_sent(hdr, snd_nxt); /* Override rcvgap if there are packets in deferred queue */ if (!skb_queue_empty(&l->deferdq)) - rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; + rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt; if (rcvgap) { msg_set_seq_gap(hdr, rcvgap); l->stats.sent_nacks++; } - msg_set_ack(hdr, rcv_last); msg_set_probe(hdr, probe); if (probe) l->stats.sent_probes++; l->stats.sent_states++; + l->rcv_unacked = 0; } else { /* RESET_MSG or ACTIVATE_MSG */ msg_set_max_pkt(hdr, l->advertised_mtu); @@ -1354,7 +1234,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, { struct tipc_msg *hdr = buf_msg(skb); u16 rcvgap = 0; - u16 nacked_gap = msg_seq_gap(hdr); + u16 ack = msg_ack(hdr); + u16 gap = msg_seq_gap(hdr); u16 peers_snd_nxt = msg_next_sent(hdr); u16 peers_tol = msg_link_tolerance(hdr); u16 peers_prio = msg_linkprio(hdr); @@ -1363,7 +1244,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, char *if_name; int rc = 0; - if (tipc_link_is_blocked(l)) + if (tipc_link_is_blocked(l) || !xmitq) goto exit; if (link_own_addr(l) > msg_prevnode(hdr)) @@ -1433,11 +1314,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, if (rcvgap || (msg_probe(hdr))) tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 0, 0, xmitq); - tipc_link_release_pkts(l, msg_ack(hdr)); + tipc_link_release_pkts(l, ack); /* If NACK, retransmit will now start at right position */ - if (nacked_gap) { - rc = tipc_link_retransm(l, nacked_gap, xmitq); + if (gap) { + rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq); l->stats.recv_nacks++; } @@ -1450,6 +1331,188 @@ exit: return rc; } +/* tipc_link_build_bc_proto_msg() - create broadcast protocol message + */ +static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast, + u16 peers_snd_nxt, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb; + struct tipc_msg *hdr; + struct sk_buff *dfrd_skb = skb_peek(&l->deferdq); + u16 ack = l->rcv_nxt - 1; + u16 gap_to = peers_snd_nxt - 1; + + skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, + 0, l->addr, link_own_addr(l), 0, 0, 0); + if (!skb) + return false; + hdr = buf_msg(skb); + msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1); + msg_set_bcast_ack(hdr, ack); + msg_set_bcgap_after(hdr, ack); + if (dfrd_skb) + gap_to = buf_seqno(dfrd_skb) - 1; + msg_set_bcgap_to(hdr, gap_to); + msg_set_non_seq(hdr, bcast); + __skb_queue_tail(xmitq, skb); + return true; +} + +/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + */ +void tipc_link_build_bc_init_msg(struct tipc_link *l, + struct sk_buff_head *xmitq) +{ + struct sk_buff_head list; + + __skb_queue_head_init(&list); + if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list)) + return; + tipc_link_xmit(l, &list, xmitq); +} + +/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer + */ +void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr) +{ + int mtyp = msg_type(hdr); + u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + + if (link_is_up(l)) + return; + + if (msg_user(hdr) == BCAST_PROTOCOL) { + l->rcv_nxt = peers_snd_nxt; + l->state = LINK_ESTABLISHED; + return; + } + + if (l->peer_caps & TIPC_BCAST_SYNCH) + return; + + if (msg_peer_node_is_up(hdr)) + return; + + /* Compatibility: accept older, less safe initial synch data */ + if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG)) + l->rcv_nxt = peers_snd_nxt; +} + +/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state + */ +void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) +{ + u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); + + if (!link_is_up(l)) + return; + + if (!msg_peer_node_is_up(hdr)) + return; + + l->bc_peer_is_up = true; + + /* Ignore if peers_snd_nxt goes beyond receive window */ + if (more(peers_snd_nxt, l->rcv_nxt + l->window)) + return; + + if (!more(peers_snd_nxt, l->rcv_nxt)) { + l->nack_state = BC_NACK_SND_CONDITIONAL; + return; + } + + /* Don't NACK if one was recently sent or peeked */ + if (l->nack_state == BC_NACK_SND_SUPPRESS) { + l->nack_state = BC_NACK_SND_UNCONDITIONAL; + return; + } + + /* Conditionally delay NACK sending until next synch rcv */ + if (l->nack_state == BC_NACK_SND_CONDITIONAL) { + l->nack_state = BC_NACK_SND_UNCONDITIONAL; + if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN) + return; + } + + /* Send NACK now but suppress next one */ + tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq); + l->nack_state = BC_NACK_SND_SUPPRESS; +} + +void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb, *tmp; + struct tipc_link *snd_l = l->bc_sndlink; + + if (!link_is_up(l) || !l->bc_peer_is_up) + return; + + if (!more(acked, l->acked)) + return; + + /* Skip over packets peer has already acked */ + skb_queue_walk(&snd_l->transmq, skb) { + if (more(buf_seqno(skb), l->acked)) + break; + } + + /* Update/release the packets peer is acking now */ + skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) { + if (more(buf_seqno(skb), acked)) + break; + if (!--TIPC_SKB_CB(skb)->ackers) { + __skb_unlink(skb, &snd_l->transmq); + kfree_skb(skb); + } + } + l->acked = acked; + tipc_link_advance_backlog(snd_l, xmitq); + if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) + link_prepare_wakeup(snd_l); +} + +/* tipc_link_bc_nack_rcv(): receive broadcast nack message + */ +int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + u32 dnode = msg_destnode(hdr); + int mtyp = msg_type(hdr); + u16 acked = msg_bcast_ack(hdr); + u16 from = acked + 1; + u16 to = msg_bcgap_to(hdr); + u16 peers_snd_nxt = to + 1; + int rc = 0; + + kfree_skb(skb); + + if (!tipc_link_is_up(l) || !l->bc_peer_is_up) + return 0; + + if (mtyp != STATE_MSG) + return 0; + + if (dnode == link_own_addr(l)) { + tipc_link_bc_ack_rcv(l, acked, xmitq); + rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq); + l->stats.recv_nacks++; + return rc; + } + + /* Msg for other node => suppress own NACK at next sync if applicable */ + if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from)) + l->nack_state = BC_NACK_SND_SUPPRESS; + + return 0; +} + void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) { int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); @@ -1514,7 +1577,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr) static void link_print(struct tipc_link *l, const char *str) { struct sk_buff *hskb = skb_peek(&l->transmq); - u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; + u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1; u16 tail = l->snd_nxt - 1; pr_info("%s Link <%s> state %x\n", str, l->name, l->state); @@ -1738,7 +1801,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, if (tipc_link_is_up(link)) if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) goto attr_msg_full; - if (tipc_link_is_active(link)) + if (link->active) if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) goto attr_msg_full; diff --git a/net/tipc/link.h b/net/tipc/link.h index 0201212cb49a..66d859b66c84 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -66,7 +66,8 @@ enum { */ enum { TIPC_LINK_UP_EVT = 1, - TIPC_LINK_DOWN_EVT = (1 << 1) + TIPC_LINK_DOWN_EVT = (1 << 1), + TIPC_LINK_SND_BC_ACK = (1 << 2) }; /* Starting value for maximum packet size negotiation on unicast links @@ -110,7 +111,7 @@ struct tipc_stats { * @name: link name character string * @media_addr: media address to use when sending messages over link * @timer: link timer - * @owner: pointer to peer node + * @net: pointer to namespace struct * @refcnt: reference counter for permanent references (owner node & timer) * @peer_session: link session # being used by peer end of link * @peer_bearer_id: bearer id used by link's peer endpoint @@ -119,6 +120,7 @@ struct tipc_stats { * @keepalive_intv: link keepalive timer interval * @abort_limit: # of unacknowledged continuity probes needed to reset link * @state: current state of link FSM + * @peer_caps: bitmap describing capabilities of peer node * @silent_intv_cnt: # of timer intervals without any reception from peer * @proto_msg: template for control messages generated by link * @pmsg: convenience pointer to "proto_msg" field @@ -134,6 +136,8 @@ struct tipc_stats { * @snt_nxt: next sequence number to use for outbound messages * @last_retransmitted: sequence number of most recently retransmitted message * @stale_count: # of identical retransmit requests made by peer + * @ackers: # of peers that needs to ack each packet before it can be released + * @acked: # last packet acked by a certain peer. Used for broadcast. * @rcv_nxt: next sequence number to expect for inbound messages * @deferred_queue: deferred queue saved OOS b'cast message received from node * @unacked_window: # of inbound messages rx'd without ack'ing back to peer @@ -143,13 +147,14 @@ struct tipc_stats { * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate * @long_msg_seq_no: next identifier to use for outbound fragmented messages * @reasm_buf: head of partially reassembled inbound message fragments + * @bc_rcvr: marks that this is a broadcast receiver link * @stats: collects statistics regarding link activity */ struct tipc_link { u32 addr; char name[TIPC_MAX_LINK_NAME]; struct tipc_media_addr *media_addr; - struct tipc_node *owner; + struct net *net; /* Management and link supervision data */ u32 peer_session; @@ -159,6 +164,8 @@ struct tipc_link { unsigned long keepalive_intv; u32 abort_limit; u32 state; + u16 peer_caps; + bool active; u32 silent_intv_cnt; struct { unchar hdr[INT_H_SIZE]; @@ -201,18 +208,35 @@ struct tipc_link { /* Fragmentation/reassembly */ struct sk_buff *reasm_buf; + /* Broadcast */ + u16 ackers; + u16 acked; + struct tipc_link *bc_rcvlink; + struct tipc_link *bc_sndlink; + int nack_state; + bool bc_peer_is_up; + /* Statistics */ struct tipc_stats stats; }; -bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, - u32 ownnode, u32 peer, struct tipc_media_addr *maddr, - struct sk_buff_head *inputq, struct sk_buff_head *namedq, +bool tipc_link_create(struct net *net, char *if_name, int bearer_id, + int tolerance, char net_plane, u32 mtu, int priority, + int window, u32 session, u32 ownnode, u32 peer, + u16 peer_caps, + struct tipc_link *bc_sndlink, + struct tipc_link *bc_rcvlink, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, struct tipc_link **link); +bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, + int mtu, int window, u16 peer_caps, + struct sk_buff_head *inputq, + struct sk_buff_head *namedq, + struct tipc_link *bc_sndlink, + struct tipc_link **link); void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, int mtyp, struct sk_buff_head *xmitq); -void tipc_link_build_bcast_sync_msg(struct tipc_link *l, - struct sk_buff_head *xmitq); void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq); int tipc_link_fsm_evt(struct tipc_link *l, int evt); void tipc_link_reset_fragments(struct tipc_link *l_ptr); @@ -223,23 +247,11 @@ bool tipc_link_is_establishing(struct tipc_link *l); bool tipc_link_is_synching(struct tipc_link *l); bool tipc_link_is_failingover(struct tipc_link *l); bool tipc_link_is_blocked(struct tipc_link *l); -int tipc_link_is_active(struct tipc_link *l_ptr); -void tipc_link_purge_queues(struct tipc_link *l_ptr); -void tipc_link_purge_backlog(struct tipc_link *l); +void tipc_link_set_active(struct tipc_link *l, bool active); void tipc_link_reset(struct tipc_link *l_ptr); -int __tipc_link_xmit(struct net *net, struct tipc_link *link, - struct sk_buff_head *list); int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, struct sk_buff_head *xmitq); -void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, - u32 gap, u32 tolerance, u32 priority); -void tipc_link_push_packets(struct tipc_link *l_ptr); -u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf); -void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); -void tipc_link_retransmit(struct tipc_link *l_ptr, - struct sk_buff *start, u32 retransmits); -struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, - const struct sk_buff *skb); +void tipc_link_set_queue_limits(struct tipc_link *l, u32 window); int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); @@ -249,5 +261,23 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq); - +int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq); +void tipc_link_add_bc_peer(struct tipc_link *snd_l, + struct tipc_link *uc_l, + struct sk_buff_head *xmitq); +void tipc_link_remove_bc_peer(struct tipc_link *snd_l, + struct tipc_link *rcv_l, + struct sk_buff_head *xmitq); +int tipc_link_bc_peers(struct tipc_link *l); +void tipc_link_set_mtu(struct tipc_link *l, int mtu); +int tipc_link_mtu(struct tipc_link *l); +void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, + struct sk_buff_head *xmitq); +void tipc_link_build_bc_sync_msg(struct tipc_link *l, + struct sk_buff_head *xmitq); +void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); +void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, + struct sk_buff_head *xmitq); +int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, + struct sk_buff_head *xmitq); #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 26d38b3d8760..8740930f0787 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -182,7 +182,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) *buf = NULL; return 0; err: - pr_warn_ratelimited("Unable to build fragment list\n"); kfree_skb(*buf); kfree_skb(*headbuf); *buf = *headbuf = NULL; @@ -565,18 +564,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err) /* tipc_msg_reassemble() - clone a buffer chain of fragments and * reassemble the clones into one message */ -struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) +bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq) { - struct sk_buff *skb; + struct sk_buff *skb, *_skb; struct sk_buff *frag = NULL; struct sk_buff *head = NULL; - int hdr_sz; + int hdr_len; /* Copy header if single buffer */ if (skb_queue_len(list) == 1) { skb = skb_peek(list); - hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); - return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); + hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); + _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC); + if (!_skb) + return false; + __skb_queue_tail(rcvq, _skb); + return true; } /* Clone all fragments and reassemble */ @@ -590,11 +593,12 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) if (!head) goto error; } - return frag; + __skb_queue_tail(rcvq, frag); + return true; error: pr_warn("Failed do clone local mcast rcv buffer\n"); kfree_skb(head); - return NULL; + return false; } /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 9f0ef54be612..55778a0aebf3 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -112,6 +112,7 @@ struct tipc_skb_cb { bool wakeup_pending; u16 chain_sz; u16 chain_imp; + u16 ackers; }; #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) @@ -600,6 +601,11 @@ static inline u32 msg_last_bcast(struct tipc_msg *m) return msg_bits(m, 4, 16, 0xffff); } +static inline u32 msg_bc_snd_nxt(struct tipc_msg *m) +{ + return msg_last_bcast(m) + 1; +} + static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) { msg_set_bits(m, 4, 16, 0xffff, n); @@ -789,7 +795,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); -struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); +bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index e6018b7eb197..c07612bab95c 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb) if (!oskb) break; msg_set_destnode(buf_msg(oskb), dnode); - tipc_node_xmit_skb(net, oskb, dnode, dnode); + tipc_node_xmit_skb(net, oskb, dnode, 0); } rcu_read_unlock(); @@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode) &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); rcu_read_unlock(); - tipc_node_xmit(net, &head, dnode, dnode); + tipc_node_xmit(net, &head, dnode, 0); } static void tipc_publ_subscribe(struct net *net, struct publication *publ, diff --git a/net/tipc/net.c b/net/tipc/net.c index d6d1399ae229..77bf9113c7a7 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -112,14 +112,11 @@ int tipc_net_start(struct net *net, u32 addr) { struct tipc_net *tn = net_generic(net, tipc_net_id); char addr_string[16]; - int res; tn->own_addr = addr; tipc_named_reinit(net); tipc_sk_reinit(net); - res = tipc_bclink_init(net); - if (res) - return res; + tipc_bcast_reinit(net); tipc_nametbl_publish(net, TIPC_CFG_SRV, tn->own_addr, tn->own_addr, TIPC_ZONE_SCOPE, 0, tn->own_addr); @@ -142,7 +139,6 @@ void tipc_net_stop(struct net *net) tn->own_addr); rtnl_lock(); tipc_bearer_stop(net); - tipc_bclink_stop(net); tipc_node_stop(net); rtnl_unlock(); diff --git a/net/tipc/node.c b/net/tipc/node.c index 2670751d0e2e..7493506b069b 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete); static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); -static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); static void tipc_node_timeout(unsigned long data); static void tipc_node_fsm_evt(struct tipc_node *n, int evt); @@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->publ_list); INIT_LIST_HEAD(&n_ptr->conn_sks); - skb_queue_head_init(&n_ptr->bclink.namedq); - __skb_queue_head_init(&n_ptr->bclink.deferdq); + skb_queue_head_init(&n_ptr->bc_entry.namedq); + skb_queue_head_init(&n_ptr->bc_entry.inputq1); + __skb_queue_head_init(&n_ptr->bc_entry.arrvq); + skb_queue_head_init(&n_ptr->bc_entry.inputq2); hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); list_for_each_entry_rcu(temp_node, &tn->node_list, list) { if (n_ptr->addr < temp_node->addr) @@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) n_ptr->signature = INVALID_NODE_SIG; n_ptr->active_links[0] = INVALID_BEARER_ID; n_ptr->active_links[1] = INVALID_BEARER_ID; + if (!tipc_link_bc_create(net, tipc_own_addr(net), n_ptr->addr, + U16_MAX, tipc_bc_sndlink(net)->window, + n_ptr->capabilities, + &n_ptr->bc_entry.inputq1, + &n_ptr->bc_entry.namedq, + tipc_bc_sndlink(net), + &n_ptr->bc_entry.link)) { + pr_warn("Broadcast rcv link creation failed, no memory\n"); + kfree(n_ptr); + n_ptr = NULL; + goto exit; + } tipc_node_get(n_ptr); setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); n_ptr->keepalive_intv = U32_MAX; @@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node) { list_del_rcu(&node->list); hlist_del_rcu(&node->hash); + kfree(node->bc_entry.link); kfree_rcu(node, rcu); } @@ -332,6 +346,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; tipc_bearer_add_dest(n->net, bearer_id, n->addr); + tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id); pr_debug("Established link <%s> on network plane %c\n", nl->name, nl->net_plane); @@ -340,8 +355,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, if (!ol) { *slot0 = bearer_id; *slot1 = bearer_id; - tipc_link_build_bcast_sync_msg(nl, xmitq); - node_established_contact(n); + tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT); + n->action_flags |= TIPC_NOTIFY_NODE_UP; + tipc_bcast_add_peer(n->net, nl, xmitq); return; } @@ -350,8 +366,11 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, pr_debug("Old link <%s> becomes standby\n", ol->name); *slot0 = bearer_id; *slot1 = bearer_id; + tipc_link_set_active(nl, true); + tipc_link_set_active(ol, false); } else if (nl->priority == ol->priority) { - *slot0 = bearer_id; + tipc_link_set_active(nl, true); + *slot1 = bearer_id; } else { pr_debug("New link <%s> is standby\n", nl->name); } @@ -428,8 +447,10 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, tipc_link_build_reset_msg(l, xmitq); *maddr = &n->links[*bearer_id].maddr; node_lost_contact(n, &le->inputq); + tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id); return; } + tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id); /* There is still a working link => initiate failover */ tnl = node_active_link(n, 0); @@ -493,6 +514,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, bool link_up = false; bool accept_addr = false; bool reset = true; + char *if_name; *dupl_addr = false; *respond = false; @@ -579,9 +601,15 @@ void tipc_node_check_dest(struct net *net, u32 onode, pr_warn("Cannot establish 3rd link to %x\n", n->addr); goto exit; } - if (!tipc_link_create(n, b, mod(tipc_net(net)->random), - tipc_own_addr(net), onode, &le->maddr, - &le->inputq, &n->bclink.namedq, &l)) { + if_name = strchr(b->name, ':') + 1; + if (!tipc_link_create(net, if_name, b->identity, b->tolerance, + b->net_plane, b->mtu, b->priority, + b->window, mod(tipc_net(net)->random), + tipc_own_addr(net), onode, + n->capabilities, + tipc_bc_sndlink(n->net), n->bc_entry.link, + &le->inputq, + &n->bc_entry.namedq, &l)) { *respond = false; goto exit; } @@ -824,58 +852,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr) return true; } -static void node_established_contact(struct tipc_node *n_ptr) -{ - tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT); - n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP; - n_ptr->bclink.oos_state = 0; - n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net); - tipc_bclink_add_node(n_ptr->net, n_ptr->addr); -} - -static void node_lost_contact(struct tipc_node *n_ptr, +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq) { char addr_string[16]; struct tipc_sock_conn *conn, *safe; struct tipc_link *l; - struct list_head *conns = &n_ptr->conn_sks; + struct list_head *conns = &n->conn_sks; struct sk_buff *skb; - struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); uint i; pr_debug("Lost contact with %s\n", - tipc_addr_string_fill(addr_string, n_ptr->addr)); + tipc_addr_string_fill(addr_string, n->addr)); - /* Flush broadcast link info associated with lost node */ - if (n_ptr->bclink.recv_permitted) { - __skb_queue_purge(&n_ptr->bclink.deferdq); - - if (n_ptr->bclink.reasm_buf) { - kfree_skb(n_ptr->bclink.reasm_buf); - n_ptr->bclink.reasm_buf = NULL; - } - - tipc_bclink_remove_node(n_ptr->net, n_ptr->addr); - tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); - - n_ptr->bclink.recv_permitted = false; - } + /* Clean up broadcast state */ + tipc_bcast_remove_peer(n->net, n->bc_entry.link); /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - l = n_ptr->links[i].link; + l = n->links[i].link; if (l) tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); } /* Notify publications from this node */ - n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; + n->action_flags |= TIPC_NOTIFY_NODE_DOWN; /* Notify sockets connected to node */ list_for_each_entry_safe(conn, safe, conns, list) { skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, - SHORT_H_SIZE, 0, tn->own_addr, + SHORT_H_SIZE, 0, tipc_own_addr(n->net), conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); if (likely(skb)) @@ -937,18 +943,13 @@ void tipc_node_unlock(struct tipc_node *node) publ_list = &node->publ_list; node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | - TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | - TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT | - TIPC_BCAST_RESET); + TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP); spin_unlock_bh(&node->lock); if (flags & TIPC_NOTIFY_NODE_DOWN) tipc_publ_notify(net, publ_list, addr); - if (flags & TIPC_WAKEUP_BCAST_USERS) - tipc_bclink_wakeup_users(net); - if (flags & TIPC_NOTIFY_NODE_UP) tipc_named_node_up(net, addr); @@ -960,11 +961,6 @@ void tipc_node_unlock(struct tipc_node *node) tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, link_id, addr); - if (flags & TIPC_BCAST_MSG_EVT) - tipc_bclink_input(net); - - if (flags & TIPC_BCAST_RESET) - tipc_node_reset_links(node); } /* Caller should hold node lock for the passed node */ @@ -1079,6 +1075,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, return 0; } +/** + * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node + * @net: the applicable net namespace + * @skb: TIPC packet + * @bearer_id: id of bearer message arrived on + * + * Invoked with no locks held. + */ +void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id) +{ + int rc; + struct sk_buff_head xmitq; + struct tipc_bclink_entry *be; + struct tipc_link_entry *le; + struct tipc_msg *hdr = buf_msg(skb); + int usr = msg_user(hdr); + u32 dnode = msg_destnode(hdr); + struct tipc_node *n; + + __skb_queue_head_init(&xmitq); + + /* If NACK for other node, let rcv link for that node peek into it */ + if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net))) + n = tipc_node_find(net, dnode); + else + n = tipc_node_find(net, msg_prevnode(hdr)); + if (!n) { + kfree_skb(skb); + return; + } + be = &n->bc_entry; + le = &n->links[bearer_id]; + + rc = tipc_bcast_rcv(net, be->link, skb); + + /* Broadcast link reset may happen at reassembly failure */ + if (rc & TIPC_LINK_DOWN_EVT) + tipc_node_reset_links(n); + + /* Broadcast ACKs are sent on a unicast link */ + if (rc & TIPC_LINK_SND_BC_ACK) { + tipc_node_lock(n); + tipc_link_build_ack_msg(le->link, &xmitq); + tipc_node_unlock(n); + } + + if (!skb_queue_empty(&xmitq)) + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + + /* Deliver. 'arrvq' is under inputq2's lock protection */ + if (!skb_queue_empty(&be->inputq1)) { + spin_lock_bh(&be->inputq2.lock); + spin_lock_bh(&be->inputq1.lock); + skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); + spin_unlock_bh(&be->inputq1.lock); + spin_unlock_bh(&be->inputq2.lock); + tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2); + } + tipc_node_put(n); +} + /** * tipc_node_check_state - check and if necessary update node state * @skb: TIPC packet @@ -1221,6 +1278,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) int usr = msg_user(hdr); int bearer_id = b->identity; struct tipc_link_entry *le; + u16 bc_ack = msg_bcast_ack(hdr); int rc = 0; __skb_queue_head_init(&xmitq); @@ -1229,13 +1287,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(!tipc_msg_validate(skb))) goto discard; - /* Handle arrival of a non-unicast link packet */ + /* Handle arrival of discovery or broadcast packet */ if (unlikely(msg_non_seq(hdr))) { - if (usr == LINK_CONFIG) - tipc_disc_rcv(net, skb, b); + if (unlikely(usr == LINK_CONFIG)) + return tipc_disc_rcv(net, skb, b); else - tipc_bclink_rcv(net, skb); - return; + return tipc_node_bc_rcv(net, skb, bearer_id); } /* Locate neighboring node that sent packet */ @@ -1244,19 +1301,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) goto discard; le = &n->links[bearer_id]; + /* Ensure broadcast reception is in synch with peer's send state */ + if (unlikely(usr == LINK_PROTOCOL)) + tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr); + else if (unlikely(n->bc_entry.link->acked != bc_ack)) + tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack); + tipc_node_lock(n); /* Is reception permitted at the moment ? */ if (!tipc_node_filter_pkt(n, hdr)) goto unlock; - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) - tipc_bclink_sync_state(n, hdr); - - /* Release acked broadcast packets */ - if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) - tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); - /* Check and if necessary update node state */ if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { rc = tipc_link_rcv(le->link, skb, &xmitq); @@ -1271,8 +1327,8 @@ unlock: if (unlikely(rc & TIPC_LINK_DOWN_EVT)) tipc_node_link_down(n, bearer_id, false); - if (unlikely(!skb_queue_empty(&n->bclink.namedq))) - tipc_named_rcv(net, &n->bclink.namedq); + if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) + tipc_named_rcv(net, &n->bc_entry.namedq); if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); diff --git a/net/tipc/node.h b/net/tipc/node.h index 344b3e7594fd..6734562d3c6e 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -55,36 +55,18 @@ enum { TIPC_NOTIFY_NODE_DOWN = (1 << 3), TIPC_NOTIFY_NODE_UP = (1 << 4), - TIPC_WAKEUP_BCAST_USERS = (1 << 5), TIPC_NOTIFY_LINK_UP = (1 << 6), - TIPC_NOTIFY_LINK_DOWN = (1 << 7), - TIPC_BCAST_MSG_EVT = (1 << 9), - TIPC_BCAST_RESET = (1 << 10) + TIPC_NOTIFY_LINK_DOWN = (1 << 7) }; -/** - * struct tipc_node_bclink - TIPC node bclink structure - * @acked: sequence # of last outbound b'cast message acknowledged by node - * @last_in: sequence # of last in-sequence b'cast message received from node - * @last_sent: sequence # of last b'cast message sent by node - * @oos_state: state tracker for handling OOS b'cast messages - * @deferred_queue: deferred queue saved OOS b'cast message received from node - * @reasm_buf: broadcast reassembly queue head from node - * @inputq_map: bitmap indicating which inqueues should be kicked - * @recv_permitted: true if node is allowed to receive b'cast messages +/* Optional capabilities supported by this code version */ -struct tipc_node_bclink { - u32 acked; - u32 last_in; - u32 last_sent; - u32 oos_state; - u32 deferred_size; - struct sk_buff_head deferdq; - struct sk_buff *reasm_buf; - struct sk_buff_head namedq; - bool recv_permitted; +enum { + TIPC_BCAST_SYNCH = (1 << 1) }; +#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH + struct tipc_link_entry { struct tipc_link *link; u32 mtu; @@ -92,6 +74,14 @@ struct tipc_link_entry { struct tipc_media_addr maddr; }; +struct tipc_bclink_entry { + struct tipc_link *link; + struct sk_buff_head inputq1; + struct sk_buff_head arrvq; + struct sk_buff_head inputq2; + struct sk_buff_head namedq; +}; + /** * struct tipc_node - TIPC node structure * @addr: network address of node @@ -104,7 +94,6 @@ struct tipc_link_entry { * @active_links: bearer ids of active links, used as index into links[] array * @links: array containing references to all links to node * @action_flags: bit mask of different types of node actions - * @bclink: broadcast-related info * @state: connectivity state vs peer node * @sync_point: sequence number where synch/failover is finished * @list: links to adjacent nodes in sorted list of cluster's nodes @@ -124,8 +113,8 @@ struct tipc_node { struct hlist_node hash; int active_links[2]; struct tipc_link_entry links[MAX_BEARERS]; + struct tipc_bclink_entry bc_entry; int action_flags; - struct tipc_node_bclink bclink; struct list_head list; int state; u16 sync_point; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 1060d52ff23e..552dbaba9cf3 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -689,13 +689,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, msg_set_hdr_sz(mhdr, MCAST_H_SIZE); new_mtu: - mtu = tipc_bclink_get_mtu(); + mtu = tipc_bcast_get_mtu(net); rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain); if (unlikely(rc < 0)) return rc; do { - rc = tipc_bclink_xmit(net, pktchain); + rc = tipc_bcast_xmit(net, pktchain); if (likely(!rc)) return dsz; diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index 0021c01dec17..816914ef228d 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -155,14 +155,12 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, struct udp_bearer *ub; struct udp_media_addr *dst = (struct udp_media_addr *)&dest->value; struct udp_media_addr *src = (struct udp_media_addr *)&b->addr.value; - struct sk_buff *clone; struct rtable *rt; if (skb_headroom(skb) < UDP_MIN_HEADROOM) pskb_expand_head(skb, UDP_MIN_HEADROOM, 0, GFP_ATOMIC); - clone = skb_clone(skb, GFP_ATOMIC); - skb_set_inner_protocol(clone, htons(ETH_P_TIPC)); + skb_set_inner_protocol(skb, htons(ETH_P_TIPC)); ub = rcu_dereference_rtnl(b->media_ptr); if (!ub) { err = -ENODEV; @@ -172,7 +170,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, struct flowi4 fl = { .daddr = dst->ipv4.s_addr, .saddr = src->ipv4.s_addr, - .flowi4_mark = clone->mark, + .flowi4_mark = skb->mark, .flowi4_proto = IPPROTO_UDP }; rt = ip_route_output_key(net, &fl); @@ -181,7 +179,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, goto tx_error; } ttl = ip4_dst_hoplimit(&rt->dst); - err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, clone, + err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, skb, src->ipv4.s_addr, dst->ipv4.s_addr, 0, ttl, 0, src->udp_port, dst->udp_port, @@ -204,7 +202,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, if (err) goto tx_error; ttl = ip6_dst_hoplimit(ndst); - err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, clone, + err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, skb, ndst->dev, &src->ipv6, &dst->ipv6, 0, ttl, src->udp_port, dst->udp_port, false); @@ -213,7 +211,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, return err; tx_error: - kfree_skb(clone); + kfree_skb(skb); return err; }