dlm: move dlm_search_rsb_tree() out of lock

The rhashtable structure is lockless for readers such as
rhashtable_lookup_fast(). It should be save to call this lookup
functionality out of holding ls_rsbtbl_lock to get the rsb pointer out
of the hash. This reduce the contention time of ls_rsbtbl_lock in some
cases. We still need to check if the rsb is part of the check as this
state can be changed while ls_rsbtbl_lock is not held. If its part of
the rhashtable data structure we take a reference to be sure it will not
be freed after we drop the ls_rsbtbl_lock read lock.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2024-08-02 13:26:45 -04:00 committed by David Teigland
parent 98ff7d95d9
commit 5be323b0c6

View File

@ -733,11 +733,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
retry:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto do_new;
/* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) {
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
}
@ -918,11 +920,13 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
int error;
retry:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto do_new;
/* check if the rsb is in active state under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) {
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_new;
}
@ -1276,37 +1280,39 @@ static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *na
}
retry:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto not_found;
/* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_inactive;
}
/* because the rsb is active, we need to lock_rsb before
* checking/changing re_master_nodeid
*/
hold_rsb(r);
read_unlock_bh(&ls->ls_rsbtbl_lock);
lock_rsb(r);
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
flags, r_nodeid, result);
/* the rsb was active */
unlock_rsb(r);
put_rsb(r);
return 0;
} else {
if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto not_found;
}
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
goto do_inactive;
}
/* because the rsb is active, we need to lock_rsb before
* checking/changing re_master_nodeid
*/
hold_rsb(r);
read_unlock_bh(&ls->ls_rsbtbl_lock);
lock_rsb(r);
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
flags, r_nodeid, result);
/* the rsb was active */
unlock_rsb(r);
put_rsb(r);
return 0;
do_inactive:
/* unlikely path - check if still part of ls_rsbtbl */
write_lock_bh(&ls->ls_rsbtbl_lock);
@ -1403,14 +1409,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
struct dlm_rsb *r = NULL;
int error;
read_lock_bh(&ls->ls_rsbtbl_lock);
rcu_read_lock();
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error)
goto out;
dlm_dump_rsb(r);
out:
read_unlock_bh(&ls->ls_rsbtbl_lock);
rcu_read_unlock();
}
static void deactivate_rsb(struct kref *kref)
@ -4309,17 +4315,28 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
memset(name, 0, sizeof(name));
memcpy(name, ms->m_extra, len);
write_lock_bh(&ls->ls_rsbtbl_lock);
rcu_read_lock();
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (rv) {
rcu_read_unlock();
/* should not happen */
log_error(ls, "%s from %d not found %s", __func__,
from_nodeid, name);
write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
write_lock_bh(&ls->ls_rsbtbl_lock);
if (!rsb_flag(r, RSB_HASHED)) {
rcu_read_unlock();
write_unlock_bh(&ls->ls_rsbtbl_lock);
/* should not happen */
log_error(ls, "%s from %d got removed during removal %s",
__func__, from_nodeid, name);
return;
}
/* at this stage the rsb can only being freed here */
rcu_read_unlock();
if (!rsb_flag(r, RSB_INACTIVE)) {
if (r->res_master_nodeid != from_nodeid) {
/* should not happen */