From 3067bf8c596d59164f48569a2d362de5b4c42f59 Mon Sep 17 00:00:00 2001 From: David Howells Date: Wed, 3 Jun 2020 22:21:16 +0100 Subject: [PATCH 1/2] rxrpc: Move the call completion handling out of line Move the handling of call completion out of line so that the next patch can add more code in that area. Signed-off-by: David Howells Reviewed-by: Marc Dionne --- net/rxrpc/ar-internal.h | 119 +++++++++------------------------------- net/rxrpc/recvmsg.c | 74 +++++++++++++++++++++++++ net/rxrpc/sendmsg.c | 8 +-- 3 files changed, 103 insertions(+), 98 deletions(-) diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index 9fe264bec70c..9a2139ebd67d 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -809,100 +809,6 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call) return !rxrpc_is_service_call(call); } -/* - * Transition a call to the complete state. - */ -static inline bool __rxrpc_set_call_completion(struct rxrpc_call *call, - enum rxrpc_call_completion compl, - u32 abort_code, - int error) -{ - if (call->state < RXRPC_CALL_COMPLETE) { - call->abort_code = abort_code; - call->error = error; - call->completion = compl, - call->state = RXRPC_CALL_COMPLETE; - trace_rxrpc_call_complete(call); - wake_up(&call->waitq); - return true; - } - return false; -} - -static inline bool rxrpc_set_call_completion(struct rxrpc_call *call, - enum rxrpc_call_completion compl, - u32 abort_code, - int error) -{ - bool ret; - - write_lock_bh(&call->state_lock); - ret = __rxrpc_set_call_completion(call, compl, abort_code, error); - write_unlock_bh(&call->state_lock); - return ret; -} - -/* - * Record that a call successfully completed. - */ -static inline bool __rxrpc_call_completed(struct rxrpc_call *call) -{ - return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); -} - -static inline bool rxrpc_call_completed(struct rxrpc_call *call) -{ - bool ret; - - write_lock_bh(&call->state_lock); - ret = __rxrpc_call_completed(call); - write_unlock_bh(&call->state_lock); - return ret; -} - -/* - * Record that a call is locally aborted. - */ -static inline bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call, - rxrpc_seq_t seq, - u32 abort_code, int error) -{ - trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, - abort_code, error); - return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, - abort_code, error); -} - -static inline bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, - rxrpc_seq_t seq, u32 abort_code, int error) -{ - bool ret; - - write_lock_bh(&call->state_lock); - ret = __rxrpc_abort_call(why, call, seq, abort_code, error); - write_unlock_bh(&call->state_lock); - return ret; -} - -/* - * Abort a call due to a protocol error. - */ -static inline bool __rxrpc_abort_eproto(struct rxrpc_call *call, - struct sk_buff *skb, - const char *eproto_why, - const char *why, - u32 abort_code) -{ - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - - trace_rxrpc_rx_eproto(call, sp->hdr.serial, eproto_why); - return rxrpc_abort_call(why, call, sp->hdr.seq, abort_code, -EPROTO); -} - -#define rxrpc_abort_eproto(call, skb, eproto_why, abort_why, abort_code) \ - __rxrpc_abort_eproto((call), (skb), tracepoint_string(eproto_why), \ - (abort_why), (abort_code)) - /* * conn_client.c */ @@ -1101,8 +1007,33 @@ extern const struct seq_operations rxrpc_peer_seq_ops; * recvmsg.c */ void rxrpc_notify_socket(struct rxrpc_call *); +bool __rxrpc_set_call_completion(struct rxrpc_call *, enum rxrpc_call_completion, u32, int); +bool rxrpc_set_call_completion(struct rxrpc_call *, enum rxrpc_call_completion, u32, int); +bool __rxrpc_call_completed(struct rxrpc_call *); +bool rxrpc_call_completed(struct rxrpc_call *); +bool __rxrpc_abort_call(const char *, struct rxrpc_call *, rxrpc_seq_t, u32, int); +bool rxrpc_abort_call(const char *, struct rxrpc_call *, rxrpc_seq_t, u32, int); int rxrpc_recvmsg(struct socket *, struct msghdr *, size_t, int); +/* + * Abort a call due to a protocol error. + */ +static inline bool __rxrpc_abort_eproto(struct rxrpc_call *call, + struct sk_buff *skb, + const char *eproto_why, + const char *why, + u32 abort_code) +{ + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); + + trace_rxrpc_rx_eproto(call, sp->hdr.serial, eproto_why); + return rxrpc_abort_call(why, call, sp->hdr.seq, abort_code, -EPROTO); +} + +#define rxrpc_abort_eproto(call, skb, eproto_why, abort_why, abort_code) \ + __rxrpc_abort_eproto((call), (skb), tracepoint_string(eproto_why), \ + (abort_why), (abort_code)) + /* * rtt.c */ diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index 8578c39ec839..6c4ba4224ddc 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -58,6 +58,80 @@ void rxrpc_notify_socket(struct rxrpc_call *call) _leave(""); } +/* + * Transition a call to the complete state. + */ +bool __rxrpc_set_call_completion(struct rxrpc_call *call, + enum rxrpc_call_completion compl, + u32 abort_code, + int error) +{ + if (call->state < RXRPC_CALL_COMPLETE) { + call->abort_code = abort_code; + call->error = error; + call->completion = compl, + call->state = RXRPC_CALL_COMPLETE; + trace_rxrpc_call_complete(call); + wake_up(&call->waitq); + return true; + } + return false; +} + +bool rxrpc_set_call_completion(struct rxrpc_call *call, + enum rxrpc_call_completion compl, + u32 abort_code, + int error) +{ + bool ret; + + write_lock_bh(&call->state_lock); + ret = __rxrpc_set_call_completion(call, compl, abort_code, error); + write_unlock_bh(&call->state_lock); + return ret; +} + +/* + * Record that a call successfully completed. + */ +bool __rxrpc_call_completed(struct rxrpc_call *call) +{ + return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); +} + +bool rxrpc_call_completed(struct rxrpc_call *call) +{ + bool ret; + + write_lock_bh(&call->state_lock); + ret = __rxrpc_call_completed(call); + write_unlock_bh(&call->state_lock); + return ret; +} + +/* + * Record that a call is locally aborted. + */ +bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call, + rxrpc_seq_t seq, u32 abort_code, int error) +{ + trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, + abort_code, error); + return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, + abort_code, error); +} + +bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, + rxrpc_seq_t seq, u32 abort_code, int error) +{ + bool ret; + + write_lock_bh(&call->state_lock); + ret = __rxrpc_abort_call(why, call, seq, abort_code, error); + write_unlock_bh(&call->state_lock); + return ret; +} + /* * Pass a call terminating message to userspace. */ diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 5e9c43d4a314..5dd9ba000c00 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -261,10 +261,10 @@ static int rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, case -ENETUNREACH: case -EHOSTUNREACH: case -ECONNREFUSED: - rxrpc_set_call_completion(call, - RXRPC_CALL_LOCAL_ERROR, - 0, ret); - rxrpc_notify_socket(call); + if (rxrpc_set_call_completion(call, + RXRPC_CALL_LOCAL_ERROR, + 0, ret)) + rxrpc_notify_socket(call); goto out; } _debug("need instant resend %d", ret); From 5ac0d62226a07849b1a5233af8c800a19cecab83 Mon Sep 17 00:00:00 2001 From: David Howells Date: Wed, 3 Jun 2020 22:21:16 +0100 Subject: [PATCH 2/2] rxrpc: Fix missing notification Under some circumstances, rxrpc will fail a transmit a packet through the underlying UDP socket (ie. UDP sendmsg returns an error). This may result in a call getting stuck. In the instance being seen, where AFS tries to send a probe to the Volume Location server, tracepoints show the UDP Tx failure (in this case returing error 99 EADDRNOTAVAIL) and then nothing more: afs_make_vl_call: c=0000015d VL.GetCapabilities rxrpc_call: c=0000015d NWc u=1 sp=rxrpc_kernel_begin_call+0x106/0x170 [rxrpc] a=00000000dd89ee8a rxrpc_call: c=0000015d Gus u=2 sp=rxrpc_new_client_call+0x14f/0x580 [rxrpc] a=00000000e20e4b08 rxrpc_call: c=0000015d SEE u=2 sp=rxrpc_activate_one_channel+0x7b/0x1c0 [rxrpc] a=00000000e20e4b08 rxrpc_call: c=0000015d CON u=2 sp=rxrpc_kernel_begin_call+0x106/0x170 [rxrpc] a=00000000e20e4b08 rxrpc_tx_fail: c=0000015d r=1 ret=-99 CallDataNofrag The problem is that if the initial packet fails and the retransmission timer hasn't been started, the call is set to completed and an error is returned from rxrpc_send_data_packet() to rxrpc_queue_packet(). Though rxrpc_instant_resend() is called, this does nothing because the call is marked completed. So rxrpc_notify_socket() isn't called and the error is passed back up to rxrpc_send_data(), rxrpc_kernel_send_data() and thence to afs_make_call() and afs_vl_get_capabilities() where it is simply ignored because it is assumed that the result of a probe will be collected asynchronously. Fileserver probing is similarly affected via afs_fs_get_capabilities(). Fix this by always issuing a notification in __rxrpc_set_call_completion() if it shifts a call to the completed state, even if an error is also returned to the caller through the function return value. Also put in a little bit of optimisation to avoid taking the call state_lock and disabling softirqs if the call is already in the completed state and remove some now redundant rxrpc_notify_socket() calls. Fixes: f5c17aaeb2ae ("rxrpc: Calls should only have one terminal state") Reported-by: Gerry Seidman Signed-off-by: David Howells Reviewed-by: Marc Dionne --- net/rxrpc/call_event.c | 1 - net/rxrpc/conn_event.c | 7 +++---- net/rxrpc/input.c | 7 ++----- net/rxrpc/peer_event.c | 4 +--- net/rxrpc/recvmsg.c | 21 +++++++++++++-------- net/rxrpc/sendmsg.c | 6 ++---- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c index 2a65ac41055f..61a51c251e1b 100644 --- a/net/rxrpc/call_event.c +++ b/net/rxrpc/call_event.c @@ -320,7 +320,6 @@ recheck_state: if (call->state == RXRPC_CALL_COMPLETE) { del_timer_sync(&call->timer); - rxrpc_notify_socket(call); goto out_put; } diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c index 06fcff2ebbba..447f55ca6886 100644 --- a/net/rxrpc/conn_event.c +++ b/net/rxrpc/conn_event.c @@ -173,10 +173,9 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, else trace_rxrpc_rx_abort(call, serial, conn->abort_code); - if (rxrpc_set_call_completion(call, compl, - conn->abort_code, - conn->error)) - rxrpc_notify_socket(call); + rxrpc_set_call_completion(call, compl, + conn->abort_code, + conn->error); } } diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 3be4177baf70..299ac98e9754 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -275,7 +275,6 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, case RXRPC_CALL_SERVER_AWAIT_ACK: __rxrpc_call_completed(call); - rxrpc_notify_socket(call); state = call->state; break; @@ -1013,9 +1012,8 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb) _proto("Rx ABORT %%%u { %x }", sp->hdr.serial, abort_code); - if (rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, - abort_code, -ECONNABORTED)) - rxrpc_notify_socket(call); + rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, + abort_code, -ECONNABORTED); } /* @@ -1102,7 +1100,6 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx, spin_lock(&rx->incoming_lock); __rxrpc_disconnect_call(conn, call); spin_unlock(&rx->incoming_lock); - rxrpc_notify_socket(call); } /* diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c index b1449d971883..4704a8dceced 100644 --- a/net/rxrpc/peer_event.c +++ b/net/rxrpc/peer_event.c @@ -289,9 +289,7 @@ static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error, hlist_for_each_entry_rcu(call, &peer->error_targets, error_link) { rxrpc_see_call(call); - if (call->state < RXRPC_CALL_COMPLETE && - rxrpc_set_call_completion(call, compl, 0, -error)) - rxrpc_notify_socket(call); + rxrpc_set_call_completion(call, compl, 0, -error); } } diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index 6c4ba4224ddc..2989742a4aa1 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -73,6 +73,7 @@ bool __rxrpc_set_call_completion(struct rxrpc_call *call, call->state = RXRPC_CALL_COMPLETE; trace_rxrpc_call_complete(call); wake_up(&call->waitq); + rxrpc_notify_socket(call); return true; } return false; @@ -83,11 +84,13 @@ bool rxrpc_set_call_completion(struct rxrpc_call *call, u32 abort_code, int error) { - bool ret; + bool ret = false; - write_lock_bh(&call->state_lock); - ret = __rxrpc_set_call_completion(call, compl, abort_code, error); - write_unlock_bh(&call->state_lock); + if (call->state < RXRPC_CALL_COMPLETE) { + write_lock_bh(&call->state_lock); + ret = __rxrpc_set_call_completion(call, compl, abort_code, error); + write_unlock_bh(&call->state_lock); + } return ret; } @@ -101,11 +104,13 @@ bool __rxrpc_call_completed(struct rxrpc_call *call) bool rxrpc_call_completed(struct rxrpc_call *call) { - bool ret; + bool ret = false; - write_lock_bh(&call->state_lock); - ret = __rxrpc_call_completed(call); - write_unlock_bh(&call->state_lock); + if (call->state < RXRPC_CALL_COMPLETE) { + write_lock_bh(&call->state_lock); + ret = __rxrpc_call_completed(call); + write_unlock_bh(&call->state_lock); + } return ret; } diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 5dd9ba000c00..1304b8608f56 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -261,10 +261,8 @@ static int rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, case -ENETUNREACH: case -EHOSTUNREACH: case -ECONNREFUSED: - if (rxrpc_set_call_completion(call, - RXRPC_CALL_LOCAL_ERROR, - 0, ret)) - rxrpc_notify_socket(call); + rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, + 0, ret); goto out; } _debug("need instant resend %d", ret);