rcu: Add multiple in-flight batches of kfree_rcu() work

During testing, it was observed that amount of memory consumed due
kfree_rcu() batching is 300-400MB. Previously we had only a single
head_free pointer pointing to the list of rcu_head(s) that are to be
freed after a grace period. Until this list is drained, we cannot queue
any more objects on it since such objects may not be ready to be
reclaimed when the worker thread eventually gets to drainin g the
head_free list.

We can do better by maintaining multiple lists as done by this patch.
Testing shows that memory consumption came down by around 100-150MB with
just adding another list. Adding more than 1 additional list did not
show any improvement.

Suggested-by: Paul E. McKenney <paulmck@linux.ibm.com>
Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
[ paulmck: Code style and initialization handling. ]
[ paulmck: Fix field name, reported by kbuild test robot <lkp@intel.com>. ]
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
This commit is contained in:
Joel Fernandes (Google) 2019-09-19 14:58:26 -07:00 committed by Paul E. McKenney
parent 569d767087
commit 0392bebebf

View File

@ -2686,12 +2686,25 @@ EXPORT_SYMBOL_GPL(call_rcu);
/* Maximum number of jiffies to wait before draining a batch. */
#define KFREE_DRAIN_JIFFIES (HZ / 50)
#define KFREE_N_BATCHES 2
/**
* struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
* @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
* @head_free: List of kfree_rcu() objects waiting for a grace period
* @krcp: Pointer to @kfree_rcu_cpu structure
*/
struct kfree_rcu_cpu_work {
struct rcu_work rcu_work;
struct rcu_head *head_free;
struct kfree_rcu_cpu *krcp;
};
/**
* struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
* @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
* @head: List of kfree_rcu() objects not yet waiting for a grace period
* @head_free: List of kfree_rcu() objects already waiting for a grace period
* @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
* @lock: Synchronize access to this structure
* @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
* @monitor_todo: Tracks whether a @monitor_work delayed work is pending
@ -2703,9 +2716,8 @@ EXPORT_SYMBOL_GPL(call_rcu);
* the interactions with the slab allocators.
*/
struct kfree_rcu_cpu {
struct rcu_work rcu_work;
struct rcu_head *head;
struct rcu_head *head_free;
struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
spinlock_t lock;
struct delayed_work monitor_work;
bool monitor_todo;
@ -2723,11 +2735,14 @@ static void kfree_rcu_work(struct work_struct *work)
unsigned long flags;
struct rcu_head *head, *next;
struct kfree_rcu_cpu *krcp;
struct kfree_rcu_cpu_work *krwp;
krcp = container_of(to_rcu_work(work), struct kfree_rcu_cpu, rcu_work);
krwp = container_of(to_rcu_work(work),
struct kfree_rcu_cpu_work, rcu_work);
krcp = krwp->krcp;
spin_lock_irqsave(&krcp->lock, flags);
head = krcp->head_free;
krcp->head_free = NULL;
head = krwp->head_free;
krwp->head_free = NULL;
spin_unlock_irqrestore(&krcp->lock, flags);
// List "head" is now private, so traverse locklessly.
@ -2747,17 +2762,25 @@ static void kfree_rcu_work(struct work_struct *work)
*/
static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
{
int i;
struct kfree_rcu_cpu_work *krwp = NULL;
lockdep_assert_held(&krcp->lock);
for (i = 0; i < KFREE_N_BATCHES; i++)
if (!krcp->krw_arr[i].head_free) {
krwp = &(krcp->krw_arr[i]);
break;
}
// If a previous RCU batch is in progress, we cannot immediately
// queue another one, so return false to tell caller to retry.
if (krcp->head_free)
if (!krwp)
return false;
krcp->head_free = krcp->head;
krwp->head_free = krcp->head;
krcp->head = NULL;
INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
queue_rcu_work(system_wq, &krcp->rcu_work);
INIT_RCU_WORK(&krwp->rcu_work, kfree_rcu_work);
queue_rcu_work(system_wq, &krwp->rcu_work);
return true;
}
@ -2863,7 +2886,8 @@ void __init kfree_rcu_scheduler_running(void)
continue;
}
krcp->monitor_todo = true;
schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
schedule_delayed_work_on(cpu, &krcp->monitor_work,
KFREE_DRAIN_JIFFIES);
spin_unlock_irqrestore(&krcp->lock, flags);
}
}
@ -3732,11 +3756,14 @@ struct workqueue_struct *rcu_par_gp_wq;
static void __init kfree_rcu_batch_init(void)
{
int cpu;
int i;
for_each_possible_cpu(cpu) {
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
spin_lock_init(&krcp->lock);
for (i = 0; i < KFREE_N_BATCHES; i++)
krcp->krw_arr[i].krcp = krcp;
INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
krcp->initialized = true;
}