aio: v4 ensure access to ctx->ring_pages is correctly serialised for migration

As reported by Tang Chen, Gu Zheng and Yasuaki Isimatsu, the following issues
exist in the aio ring page migration support.

As a result, for example, we have the following problem:

            thread 1                      |              thread 2
                                          |
aio_migratepage()                         |
 |-> take ctx->completion_lock            |
 |-> migrate_page_copy(new, old)          |
 |   *NOW*, ctx->ring_pages[idx] == old   |
                                          |
                                          |    *NOW*, ctx->ring_pages[idx] == old
                                          |    aio_read_events_ring()
                                          |     |-> ring = kmap_atomic(ctx->ring_pages[0])
                                          |     |-> ring->head = head;          *HERE, write to the old ring page*
                                          |     |-> kunmap_atomic(ring);
                                          |
 |-> ctx->ring_pages[idx] = new           |
 |   *BUT NOW*, the content of            |
 |    ring_pages[idx] is old.             |
 |-> release ctx->completion_lock         |

As above, the new ring page will not be updated.

Fix this issue, as well as prevent races in aio_ring_setup() by holding
the ring_lock mutex during kioctx setup and page migration.  This avoids
the overhead of taking another spinlock in aio_read_events_ring() as Tang's
and Gu's original fix did, pushing the overhead into the migration code.

Note that to handle the nesting of ring_lock inside of mmap_sem, the
migratepage operation uses mutex_trylock().  Page migration is not a 100%
critical operation in this case, so the ocassional failure can be
tolerated.  This issue was reported by Sasha Levin.

Based on feedback from Linus, avoid the extra taking of ctx->completion_lock.
Instead, make page migration fully serialised by mapping->private_lock, and
have aio_free_ring() simply disconnect the kioctx from the mapping by calling
put_aio_ring_file() before touching ctx->ring_pages[].  This simplifies the
error handling logic in aio_migratepage(), and should improve robustness.

v4: always do mutex_unlock() in cases when kioctx setup fails.

Reported-by: Yasuaki Ishimatsu <isimatu.yasuaki@jp.fujitsu.com>
Reported-by: Sasha Levin <sasha.levin@oracle.com>
Signed-off-by: Benjamin LaHaise <bcrl@kvack.org>
Cc: Tang Chen <tangchen@cn.fujitsu.com>
Cc: Gu Zheng <guz.fnst@cn.fujitsu.com>
Cc: stable@vger.kernel.org
This commit is contained in:
Benjamin LaHaise 2014-03-28 10:14:45 -04:00
parent 8712a00514
commit fa8a53c39f

124
fs/aio.c
View File

@ -52,7 +52,8 @@
struct aio_ring { struct aio_ring {
unsigned id; /* kernel internal index number */ unsigned id; /* kernel internal index number */
unsigned nr; /* number of io_events */ unsigned nr; /* number of io_events */
unsigned head; unsigned head; /* Written to by userland or under ring_lock
* mutex by aio_read_events_ring(). */
unsigned tail; unsigned tail;
unsigned magic; unsigned magic;
@ -243,6 +244,11 @@ static void aio_free_ring(struct kioctx *ctx)
{ {
int i; int i;
/* Disconnect the kiotx from the ring file. This prevents future
* accesses to the kioctx from page migration.
*/
put_aio_ring_file(ctx);
for (i = 0; i < ctx->nr_pages; i++) { for (i = 0; i < ctx->nr_pages; i++) {
struct page *page; struct page *page;
pr_debug("pid(%d) [%d] page->count=%d\n", current->pid, i, pr_debug("pid(%d) [%d] page->count=%d\n", current->pid, i,
@ -254,8 +260,6 @@ static void aio_free_ring(struct kioctx *ctx)
put_page(page); put_page(page);
} }
put_aio_ring_file(ctx);
if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages) { if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages) {
kfree(ctx->ring_pages); kfree(ctx->ring_pages);
ctx->ring_pages = NULL; ctx->ring_pages = NULL;
@ -283,29 +287,38 @@ static int aio_migratepage(struct address_space *mapping, struct page *new,
{ {
struct kioctx *ctx; struct kioctx *ctx;
unsigned long flags; unsigned long flags;
pgoff_t idx;
int rc; int rc;
rc = 0; rc = 0;
/* Make sure the old page hasn't already been changed */ /* mapping->private_lock here protects against the kioctx teardown. */
spin_lock(&mapping->private_lock); spin_lock(&mapping->private_lock);
ctx = mapping->private_data; ctx = mapping->private_data;
if (ctx) { if (!ctx) {
pgoff_t idx; rc = -EINVAL;
spin_lock_irqsave(&ctx->completion_lock, flags); goto out;
idx = old->index; }
if (idx < (pgoff_t)ctx->nr_pages) {
if (ctx->ring_pages[idx] != old) /* The ring_lock mutex. The prevents aio_read_events() from writing
rc = -EAGAIN; * to the ring's head, and prevents page migration from mucking in
} else * a partially initialized kiotx.
rc = -EINVAL; */
spin_unlock_irqrestore(&ctx->completion_lock, flags); if (!mutex_trylock(&ctx->ring_lock)) {
rc = -EAGAIN;
goto out;
}
idx = old->index;
if (idx < (pgoff_t)ctx->nr_pages) {
/* Make sure the old page hasn't already been changed */
if (ctx->ring_pages[idx] != old)
rc = -EAGAIN;
} else } else
rc = -EINVAL; rc = -EINVAL;
spin_unlock(&mapping->private_lock);
if (rc != 0) if (rc != 0)
return rc; goto out_unlock;
/* Writeback must be complete */ /* Writeback must be complete */
BUG_ON(PageWriteback(old)); BUG_ON(PageWriteback(old));
@ -314,38 +327,26 @@ static int aio_migratepage(struct address_space *mapping, struct page *new,
rc = migrate_page_move_mapping(mapping, new, old, NULL, mode, 1); rc = migrate_page_move_mapping(mapping, new, old, NULL, mode, 1);
if (rc != MIGRATEPAGE_SUCCESS) { if (rc != MIGRATEPAGE_SUCCESS) {
put_page(new); put_page(new);
return rc; goto out_unlock;
} }
/* We can potentially race against kioctx teardown here. Use the /* Take completion_lock to prevent other writes to the ring buffer
* address_space's private data lock to protect the mapping's * while the old page is copied to the new. This prevents new
* private_data. * events from being lost.
*/ */
spin_lock(&mapping->private_lock); spin_lock_irqsave(&ctx->completion_lock, flags);
ctx = mapping->private_data; migrate_page_copy(new, old);
if (ctx) { BUG_ON(ctx->ring_pages[idx] != old);
pgoff_t idx; ctx->ring_pages[idx] = new;
spin_lock_irqsave(&ctx->completion_lock, flags); spin_unlock_irqrestore(&ctx->completion_lock, flags);
migrate_page_copy(new, old);
idx = old->index; /* The old page is no longer accessible. */
if (idx < (pgoff_t)ctx->nr_pages) { put_page(old);
/* And only do the move if things haven't changed */
if (ctx->ring_pages[idx] == old) out_unlock:
ctx->ring_pages[idx] = new; mutex_unlock(&ctx->ring_lock);
else out:
rc = -EAGAIN;
} else
rc = -EINVAL;
spin_unlock_irqrestore(&ctx->completion_lock, flags);
} else
rc = -EBUSY;
spin_unlock(&mapping->private_lock); spin_unlock(&mapping->private_lock);
if (rc == MIGRATEPAGE_SUCCESS)
put_page(old);
else
put_page(new);
return rc; return rc;
} }
#endif #endif
@ -380,7 +381,7 @@ static int aio_setup_ring(struct kioctx *ctx)
file = aio_private_file(ctx, nr_pages); file = aio_private_file(ctx, nr_pages);
if (IS_ERR(file)) { if (IS_ERR(file)) {
ctx->aio_ring_file = NULL; ctx->aio_ring_file = NULL;
return -EAGAIN; return -ENOMEM;
} }
ctx->aio_ring_file = file; ctx->aio_ring_file = file;
@ -415,7 +416,7 @@ static int aio_setup_ring(struct kioctx *ctx)
if (unlikely(i != nr_pages)) { if (unlikely(i != nr_pages)) {
aio_free_ring(ctx); aio_free_ring(ctx);
return -EAGAIN; return -ENOMEM;
} }
ctx->mmap_size = nr_pages * PAGE_SIZE; ctx->mmap_size = nr_pages * PAGE_SIZE;
@ -429,7 +430,7 @@ static int aio_setup_ring(struct kioctx *ctx)
if (IS_ERR((void *)ctx->mmap_base)) { if (IS_ERR((void *)ctx->mmap_base)) {
ctx->mmap_size = 0; ctx->mmap_size = 0;
aio_free_ring(ctx); aio_free_ring(ctx);
return -EAGAIN; return -ENOMEM;
} }
pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base); pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base);
@ -556,6 +557,10 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
rcu_read_unlock(); rcu_read_unlock();
spin_unlock(&mm->ioctx_lock); spin_unlock(&mm->ioctx_lock);
/* While kioctx setup is in progress,
* we are protected from page migration
* changes ring_pages by ->ring_lock.
*/
ring = kmap_atomic(ctx->ring_pages[0]); ring = kmap_atomic(ctx->ring_pages[0]);
ring->id = ctx->id; ring->id = ctx->id;
kunmap_atomic(ring); kunmap_atomic(ring);
@ -640,24 +645,28 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
ctx->max_reqs = nr_events; ctx->max_reqs = nr_events;
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
/* Protect against page migration throughout kiotx setup by keeping
* the ring_lock mutex held until setup is complete. */
mutex_lock(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
if (percpu_ref_init(&ctx->users, free_ioctx_users)) if (percpu_ref_init(&ctx->users, free_ioctx_users))
goto err; goto err;
if (percpu_ref_init(&ctx->reqs, free_ioctx_reqs)) if (percpu_ref_init(&ctx->reqs, free_ioctx_reqs))
goto err; goto err;
spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
INIT_LIST_HEAD(&ctx->active_reqs);
ctx->cpu = alloc_percpu(struct kioctx_cpu); ctx->cpu = alloc_percpu(struct kioctx_cpu);
if (!ctx->cpu) if (!ctx->cpu)
goto err; goto err;
if (aio_setup_ring(ctx) < 0) err = aio_setup_ring(ctx);
if (err < 0)
goto err; goto err;
atomic_set(&ctx->reqs_available, ctx->nr_events - 1); atomic_set(&ctx->reqs_available, ctx->nr_events - 1);
@ -683,6 +692,9 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
if (err) if (err)
goto err_cleanup; goto err_cleanup;
/* Release the ring_lock mutex now that all setup is complete. */
mutex_unlock(&ctx->ring_lock);
pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n", pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
ctx, ctx->user_id, mm, ctx->nr_events); ctx, ctx->user_id, mm, ctx->nr_events);
return ctx; return ctx;
@ -692,6 +704,7 @@ err_cleanup:
err_ctx: err_ctx:
aio_free_ring(ctx); aio_free_ring(ctx);
err: err:
mutex_unlock(&ctx->ring_lock);
free_percpu(ctx->cpu); free_percpu(ctx->cpu);
free_percpu(ctx->reqs.pcpu_count); free_percpu(ctx->reqs.pcpu_count);
free_percpu(ctx->users.pcpu_count); free_percpu(ctx->users.pcpu_count);
@ -1024,6 +1037,7 @@ static long aio_read_events_ring(struct kioctx *ctx,
mutex_lock(&ctx->ring_lock); mutex_lock(&ctx->ring_lock);
/* Access to ->ring_pages here is protected by ctx->ring_lock. */
ring = kmap_atomic(ctx->ring_pages[0]); ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head; head = ring->head;
tail = ring->tail; tail = ring->tail;