xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue
To address a connection-close ordering problem, we need the ability to drain the RPC completions running on rpcrdma_receive_wq for just one transport. Give each transport its own RPC completion workqueue, and drain that workqueue when disconnecting the transport. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
This commit is contained in:
parent
6ceea36890
commit
6d2d0ee27c
@ -1356,7 +1356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
|
|||||||
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
|
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
|
||||||
|
|
||||||
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
|
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
|
||||||
queue_work(rpcrdma_receive_wq, &rep->rr_work);
|
queue_work(buf->rb_completion_wq, &rep->rr_work);
|
||||||
return;
|
return;
|
||||||
|
|
||||||
out_badversion:
|
out_badversion:
|
||||||
|
@ -444,10 +444,14 @@ xprt_rdma_close(struct rpc_xprt *xprt)
|
|||||||
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
|
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
|
||||||
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
|
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
|
||||||
|
|
||||||
|
might_sleep();
|
||||||
|
|
||||||
dprintk("RPC: %s: closing xprt %p\n", __func__, xprt);
|
dprintk("RPC: %s: closing xprt %p\n", __func__, xprt);
|
||||||
|
|
||||||
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
|
/* Prevent marshaling and sending of new requests */
|
||||||
xprt_clear_connected(xprt);
|
xprt_clear_connected(xprt);
|
||||||
|
|
||||||
|
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
|
||||||
rpcrdma_ia_remove(ia);
|
rpcrdma_ia_remove(ia);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -858,8 +862,6 @@ void xprt_rdma_cleanup(void)
|
|||||||
dprintk("RPC: %s: xprt_unregister returned %i\n",
|
dprintk("RPC: %s: xprt_unregister returned %i\n",
|
||||||
__func__, rc);
|
__func__, rc);
|
||||||
|
|
||||||
rpcrdma_destroy_wq();
|
|
||||||
|
|
||||||
rc = xprt_unregister_transport(&xprt_rdma_bc);
|
rc = xprt_unregister_transport(&xprt_rdma_bc);
|
||||||
if (rc)
|
if (rc)
|
||||||
dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
|
dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
|
||||||
@ -870,20 +872,13 @@ int xprt_rdma_init(void)
|
|||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
rc = rpcrdma_alloc_wq();
|
rc = xprt_register_transport(&xprt_rdma);
|
||||||
if (rc)
|
if (rc)
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
rc = xprt_register_transport(&xprt_rdma);
|
|
||||||
if (rc) {
|
|
||||||
rpcrdma_destroy_wq();
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = xprt_register_transport(&xprt_rdma_bc);
|
rc = xprt_register_transport(&xprt_rdma_bc);
|
||||||
if (rc) {
|
if (rc) {
|
||||||
xprt_unregister_transport(&xprt_rdma);
|
xprt_unregister_transport(&xprt_rdma);
|
||||||
rpcrdma_destroy_wq();
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
|
|||||||
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
|
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
|
||||||
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
|
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
|
||||||
|
|
||||||
struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
|
/* Wait for outstanding transport work to finish.
|
||||||
|
*/
|
||||||
int
|
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
|
||||||
rpcrdma_alloc_wq(void)
|
|
||||||
{
|
{
|
||||||
struct workqueue_struct *recv_wq;
|
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
|
||||||
|
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
|
||||||
|
|
||||||
recv_wq = alloc_workqueue("xprtrdma_receive",
|
/* Flush Receives, then wait for deferred Reply work
|
||||||
WQ_MEM_RECLAIM | WQ_HIGHPRI,
|
* to complete.
|
||||||
0);
|
*/
|
||||||
if (!recv_wq)
|
ib_drain_qp(ia->ri_id->qp);
|
||||||
return -ENOMEM;
|
drain_workqueue(buf->rb_completion_wq);
|
||||||
|
|
||||||
rpcrdma_receive_wq = recv_wq;
|
/* Deferred Reply processing might have scheduled
|
||||||
return 0;
|
* local invalidations.
|
||||||
}
|
*/
|
||||||
|
ib_drain_sq(ia->ri_id->qp);
|
||||||
void
|
|
||||||
rpcrdma_destroy_wq(void)
|
|
||||||
{
|
|
||||||
struct workqueue_struct *wq;
|
|
||||||
|
|
||||||
if (rpcrdma_receive_wq) {
|
|
||||||
wq = rpcrdma_receive_wq;
|
|
||||||
rpcrdma_receive_wq = NULL;
|
|
||||||
destroy_workqueue(wq);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
|
|||||||
* connection is already gone.
|
* connection is already gone.
|
||||||
*/
|
*/
|
||||||
if (ia->ri_id->qp) {
|
if (ia->ri_id->qp) {
|
||||||
ib_drain_qp(ia->ri_id->qp);
|
rpcrdma_xprt_drain(r_xprt);
|
||||||
rdma_destroy_qp(ia->ri_id);
|
rdma_destroy_qp(ia->ri_id);
|
||||||
ia->ri_id->qp = NULL;
|
ia->ri_id->qp = NULL;
|
||||||
}
|
}
|
||||||
@ -825,8 +815,10 @@ out_noupdate:
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* rpcrdma_ep_disconnect
|
* rpcrdma_ep_disconnect - Disconnect underlying transport
|
||||||
|
* @ep: endpoint to disconnect
|
||||||
|
* @ia: associated interface adapter
|
||||||
*
|
*
|
||||||
* This is separate from destroy to facilitate the ability
|
* This is separate from destroy to facilitate the ability
|
||||||
* to reconnect without recreating the endpoint.
|
* to reconnect without recreating the endpoint.
|
||||||
@ -837,19 +829,20 @@ out_noupdate:
|
|||||||
void
|
void
|
||||||
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
|
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
|
||||||
{
|
{
|
||||||
|
struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
|
||||||
|
rx_ep);
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
|
/* returns without wait if ID is not connected */
|
||||||
rc = rdma_disconnect(ia->ri_id);
|
rc = rdma_disconnect(ia->ri_id);
|
||||||
if (!rc)
|
if (!rc)
|
||||||
/* returns without wait if not connected */
|
|
||||||
wait_event_interruptible(ep->rep_connect_wait,
|
wait_event_interruptible(ep->rep_connect_wait,
|
||||||
ep->rep_connected != 1);
|
ep->rep_connected != 1);
|
||||||
else
|
else
|
||||||
ep->rep_connected = rc;
|
ep->rep_connected = rc;
|
||||||
trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
|
trace_xprtrdma_disconnect(r_xprt, rc);
|
||||||
rx_ep), rc);
|
|
||||||
|
|
||||||
ib_drain_qp(ia->ri_id->qp);
|
rpcrdma_xprt_drain(r_xprt);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Fixed-size circular FIFO queue. This implementation is wait-free and
|
/* Fixed-size circular FIFO queue. This implementation is wait-free and
|
||||||
@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
|
|||||||
if (rc)
|
if (rc)
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
|
buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
|
||||||
|
WQ_MEM_RECLAIM | WQ_HIGHPRI,
|
||||||
|
0,
|
||||||
|
r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
|
||||||
|
if (!buf->rb_completion_wq)
|
||||||
|
goto out;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
out:
|
out:
|
||||||
rpcrdma_buffer_destroy(buf);
|
rpcrdma_buffer_destroy(buf);
|
||||||
@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
|
|||||||
{
|
{
|
||||||
cancel_delayed_work_sync(&buf->rb_refresh_worker);
|
cancel_delayed_work_sync(&buf->rb_refresh_worker);
|
||||||
|
|
||||||
|
if (buf->rb_completion_wq) {
|
||||||
|
destroy_workqueue(buf->rb_completion_wq);
|
||||||
|
buf->rb_completion_wq = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
rpcrdma_sendctxs_destroy(buf);
|
rpcrdma_sendctxs_destroy(buf);
|
||||||
|
|
||||||
while (!list_empty(&buf->rb_recv_bufs)) {
|
while (!list_empty(&buf->rb_recv_bufs)) {
|
||||||
|
@ -412,6 +412,7 @@ struct rpcrdma_buffer {
|
|||||||
|
|
||||||
u32 rb_bc_max_requests;
|
u32 rb_bc_max_requests;
|
||||||
|
|
||||||
|
struct workqueue_struct *rb_completion_wq;
|
||||||
struct delayed_work rb_refresh_worker;
|
struct delayed_work rb_refresh_worker;
|
||||||
};
|
};
|
||||||
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
|
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
|
||||||
@ -547,8 +548,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
|
|||||||
bool frwr_is_supported(struct rpcrdma_ia *);
|
bool frwr_is_supported(struct rpcrdma_ia *);
|
||||||
bool fmr_is_supported(struct rpcrdma_ia *);
|
bool fmr_is_supported(struct rpcrdma_ia *);
|
||||||
|
|
||||||
extern struct workqueue_struct *rpcrdma_receive_wq;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Endpoint calls - xprtrdma/verbs.c
|
* Endpoint calls - xprtrdma/verbs.c
|
||||||
*/
|
*/
|
||||||
@ -603,9 +602,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
|
|||||||
return __rpcrdma_dma_map_regbuf(ia, rb);
|
return __rpcrdma_dma_map_regbuf(ia, rb);
|
||||||
}
|
}
|
||||||
|
|
||||||
int rpcrdma_alloc_wq(void);
|
|
||||||
void rpcrdma_destroy_wq(void);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Wrappers for chunk registration, shared by read/write chunk code.
|
* Wrappers for chunk registration, shared by read/write chunk code.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user