ocfs2: Fix quota locking

OCFS2 had three issues with quota locking:
a) When reading dquot from global quota file, we started a transaction while
   holding dqio_mutex which is prone to deadlocks because other paths do it
   the other way around
b) During ocfs2_sync_dquot we were not protected against concurrent writers
   on the same node. Because we first copy data to local buffer, a race
   could happen resulting in old data being written to global quota file and
   thus causing quota inconsistency after a crash.
c) ip_alloc_sem of quota files was acquired while a transaction is started
   in ocfs2_quota_write which can deadlock because we first get ip_alloc_sem
   and then start a transaction when extending quota files.

We fix the problem a) by pulling all necessary code to ocfs2_acquire_dquot
and ocfs2_release_dquot. Thus we no longer depend on generic dquot_acquire
to do the locking and can force proper lock ordering.

Problems b) and c) are fixed by locking i_mutex and ip_alloc_sem of
global quota file in ocfs2_lock_global_qf and removing ip_alloc_sem from
ocfs2_quota_read and ocfs2_quota_write.

Acked-by: Joel Becker <Joel.Becker@oracle.com>
Signed-off-by: Jan Kara <jack@suse.cz>
This commit is contained in:
Jan Kara 2010-03-31 16:25:37 +02:00
parent ae4f6ef134
commit fb8dd8d780
3 changed files with 214 additions and 186 deletions

View File

@ -104,10 +104,11 @@ static inline int ocfs2_global_release_dquot(struct dquot *dquot)
int ocfs2_lock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex); int ocfs2_lock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex);
void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex); void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex);
int ocfs2_read_quota_block(struct inode *inode, u64 v_block, int ocfs2_validate_quota_block(struct super_block *sb, struct buffer_head *bh);
struct buffer_head **bh);
int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block, int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block,
struct buffer_head **bh); struct buffer_head **bh);
int ocfs2_create_local_dquot(struct dquot *dquot);
int ocfs2_local_release_dquot(handle_t *handle, struct dquot *dquot);
extern const struct dquot_operations ocfs2_quota_operations; extern const struct dquot_operations ocfs2_quota_operations;
extern struct quota_format_type ocfs2_quota_format; extern struct quota_format_type ocfs2_quota_format;

View File

@ -28,6 +28,41 @@
#include "buffer_head_io.h" #include "buffer_head_io.h"
#include "quota.h" #include "quota.h"
/*
* Locking of quotas with OCFS2 is rather complex. Here are rules that
* should be obeyed by all the functions:
* - any write of quota structure (either to local or global file) is protected
* by dqio_mutex or dquot->dq_lock.
* - any modification of global quota file holds inode cluster lock, i_mutex,
* and ip_alloc_sem of the global quota file (achieved by
* ocfs2_lock_global_qf). It also has to hold qinfo_lock.
* - an allocation of new blocks for local quota file is protected by
* its ip_alloc_sem
*
* A rough sketch of locking dependencies (lf = local file, gf = global file):
* Normal filesystem operation:
* start_trans -> dqio_mutex -> write to lf
* Syncing of local and global file:
* ocfs2_lock_global_qf -> start_trans -> dqio_mutex -> qinfo_lock ->
* write to gf
* -> write to lf
* Acquire dquot for the first time:
* dq_lock -> ocfs2_lock_global_qf -> qinfo_lock -> read from gf
* -> alloc space for gf
* -> start_trans -> qinfo_lock -> write to gf
* -> ip_alloc_sem of lf -> alloc space for lf
* -> write to lf
* Release last reference to dquot:
* dq_lock -> ocfs2_lock_global_qf -> start_trans -> qinfo_lock -> write to gf
* -> write to lf
* Note that all the above operations also hold the inode cluster lock of lf.
* Recovery:
* inode cluster lock of recovered lf
* -> read bitmaps -> ip_alloc_sem of lf
* -> ocfs2_lock_global_qf -> start_trans -> dqio_mutex -> qinfo_lock ->
* write to gf
*/
static struct workqueue_struct *ocfs2_quota_wq = NULL; static struct workqueue_struct *ocfs2_quota_wq = NULL;
static void qsync_work_fn(struct work_struct *work); static void qsync_work_fn(struct work_struct *work);
@ -92,8 +127,7 @@ struct qtree_fmt_operations ocfs2_global_ops = {
.is_id = ocfs2_global_is_id, .is_id = ocfs2_global_is_id,
}; };
static int ocfs2_validate_quota_block(struct super_block *sb, int ocfs2_validate_quota_block(struct super_block *sb, struct buffer_head *bh)
struct buffer_head *bh)
{ {
struct ocfs2_disk_dqtrailer *dqt = struct ocfs2_disk_dqtrailer *dqt =
ocfs2_block_dqtrailer(sb->s_blocksize, bh->b_data); ocfs2_block_dqtrailer(sb->s_blocksize, bh->b_data);
@ -111,33 +145,6 @@ static int ocfs2_validate_quota_block(struct super_block *sb,
return ocfs2_validate_meta_ecc(sb, bh->b_data, &dqt->dq_check); return ocfs2_validate_meta_ecc(sb, bh->b_data, &dqt->dq_check);
} }
int ocfs2_read_quota_block(struct inode *inode, u64 v_block,
struct buffer_head **bh)
{
int rc = 0;
struct buffer_head *tmp = *bh;
if (i_size_read(inode) >> inode->i_sb->s_blocksize_bits <= v_block) {
ocfs2_error(inode->i_sb,
"Quota file %llu is probably corrupted! Requested "
"to read block %Lu but file has size only %Lu\n",
(unsigned long long)OCFS2_I(inode)->ip_blkno,
(unsigned long long)v_block,
(unsigned long long)i_size_read(inode));
return -EIO;
}
rc = ocfs2_read_virt_blocks(inode, v_block, 1, &tmp, 0,
ocfs2_validate_quota_block);
if (rc)
mlog_errno(rc);
/* If ocfs2_read_virt_blocks() got us a new bh, pass it up. */
if (!rc && !*bh)
*bh = tmp;
return rc;
}
int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block, int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block,
struct buffer_head **bhp) struct buffer_head **bhp)
{ {
@ -151,27 +158,6 @@ int ocfs2_read_quota_phys_block(struct inode *inode, u64 p_block,
return rc; return rc;
} }
static int ocfs2_get_quota_block(struct inode *inode, int block,
struct buffer_head **bh)
{
u64 pblock, pcount;
int err;
down_read(&OCFS2_I(inode)->ip_alloc_sem);
err = ocfs2_extent_map_get_blocks(inode, block, &pblock, &pcount, NULL);
up_read(&OCFS2_I(inode)->ip_alloc_sem);
if (err) {
mlog_errno(err);
return err;
}
*bh = sb_getblk(inode->i_sb, pblock);
if (!*bh) {
err = -EIO;
mlog_errno(err);
}
return err;
}
/* Read data from global quotafile - avoid pagecache and such because we cannot /* Read data from global quotafile - avoid pagecache and such because we cannot
* afford acquiring the locks... We use quota cluster lock to serialize * afford acquiring the locks... We use quota cluster lock to serialize
* operations. Caller is responsible for acquiring it. */ * operations. Caller is responsible for acquiring it. */
@ -186,6 +172,7 @@ ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data,
int err = 0; int err = 0;
struct buffer_head *bh; struct buffer_head *bh;
size_t toread, tocopy; size_t toread, tocopy;
u64 pblock = 0, pcount = 0;
if (off > i_size) if (off > i_size)
return 0; return 0;
@ -194,8 +181,19 @@ ssize_t ocfs2_quota_read(struct super_block *sb, int type, char *data,
toread = len; toread = len;
while (toread > 0) { while (toread > 0) {
tocopy = min_t(size_t, (sb->s_blocksize - offset), toread); tocopy = min_t(size_t, (sb->s_blocksize - offset), toread);
if (!pcount) {
err = ocfs2_extent_map_get_blocks(gqinode, blk, &pblock,
&pcount, NULL);
if (err) {
mlog_errno(err);
return err;
}
} else {
pcount--;
pblock++;
}
bh = NULL; bh = NULL;
err = ocfs2_read_quota_block(gqinode, blk, &bh); err = ocfs2_read_quota_phys_block(gqinode, pblock, &bh);
if (err) { if (err) {
mlog_errno(err); mlog_errno(err);
return err; return err;
@ -223,6 +221,7 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
int err = 0, new = 0, ja_type; int err = 0, new = 0, ja_type;
struct buffer_head *bh = NULL; struct buffer_head *bh = NULL;
handle_t *handle = journal_current_handle(); handle_t *handle = journal_current_handle();
u64 pblock, pcount;
if (!handle) { if (!handle) {
mlog(ML_ERROR, "Quota write (off=%llu, len=%llu) cancelled " mlog(ML_ERROR, "Quota write (off=%llu, len=%llu) cancelled "
@ -235,12 +234,11 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
len = sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE - offset; len = sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE - offset;
} }
mutex_lock_nested(&gqinode->i_mutex, I_MUTEX_QUOTA);
if (gqinode->i_size < off + len) { if (gqinode->i_size < off + len) {
loff_t rounded_end = loff_t rounded_end =
ocfs2_align_bytes_to_blocks(sb, off + len); ocfs2_align_bytes_to_blocks(sb, off + len);
/* Space is already allocated in ocfs2_global_read_dquot() */ /* Space is already allocated in ocfs2_acquire_dquot() */
err = ocfs2_simple_size_update(gqinode, err = ocfs2_simple_size_update(gqinode,
oinfo->dqi_gqi_bh, oinfo->dqi_gqi_bh,
rounded_end); rounded_end);
@ -248,13 +246,20 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
goto out; goto out;
new = 1; new = 1;
} }
err = ocfs2_extent_map_get_blocks(gqinode, blk, &pblock, &pcount, NULL);
if (err) {
mlog_errno(err);
goto out;
}
/* Not rewriting whole block? */ /* Not rewriting whole block? */
if ((offset || len < sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE) && if ((offset || len < sb->s_blocksize - OCFS2_QBLK_RESERVED_SPACE) &&
!new) { !new) {
err = ocfs2_read_quota_block(gqinode, blk, &bh); err = ocfs2_read_quota_phys_block(gqinode, pblock, &bh);
ja_type = OCFS2_JOURNAL_ACCESS_WRITE; ja_type = OCFS2_JOURNAL_ACCESS_WRITE;
} else { } else {
err = ocfs2_get_quota_block(gqinode, blk, &bh); bh = sb_getblk(sb, pblock);
if (!bh)
err = -ENOMEM;
ja_type = OCFS2_JOURNAL_ACCESS_CREATE; ja_type = OCFS2_JOURNAL_ACCESS_CREATE;
} }
if (err) { if (err) {
@ -279,13 +284,11 @@ ssize_t ocfs2_quota_write(struct super_block *sb, int type,
brelse(bh); brelse(bh);
out: out:
if (err) { if (err) {
mutex_unlock(&gqinode->i_mutex);
mlog_errno(err); mlog_errno(err);
return err; return err;
} }
gqinode->i_version++; gqinode->i_version++;
ocfs2_mark_inode_dirty(handle, gqinode, oinfo->dqi_gqi_bh); ocfs2_mark_inode_dirty(handle, gqinode, oinfo->dqi_gqi_bh);
mutex_unlock(&gqinode->i_mutex);
return len; return len;
} }
@ -303,11 +306,23 @@ int ocfs2_lock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex)
else else
WARN_ON(bh != oinfo->dqi_gqi_bh); WARN_ON(bh != oinfo->dqi_gqi_bh);
spin_unlock(&dq_data_lock); spin_unlock(&dq_data_lock);
if (ex) {
mutex_lock(&oinfo->dqi_gqinode->i_mutex);
down_write(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
} else {
down_read(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
}
return 0; return 0;
} }
void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex) void ocfs2_unlock_global_qf(struct ocfs2_mem_dqinfo *oinfo, int ex)
{ {
if (ex) {
up_write(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
mutex_unlock(&oinfo->dqi_gqinode->i_mutex);
} else {
up_read(&OCFS2_I(oinfo->dqi_gqinode)->ip_alloc_sem);
}
ocfs2_inode_unlock(oinfo->dqi_gqinode, ex); ocfs2_inode_unlock(oinfo->dqi_gqinode, ex);
brelse(oinfo->dqi_gqi_bh); brelse(oinfo->dqi_gqi_bh);
spin_lock(&dq_data_lock); spin_lock(&dq_data_lock);
@ -458,75 +473,6 @@ static int ocfs2_calc_global_qinit_credits(struct super_block *sb, int type)
OCFS2_QUOTA_BLOCK_UPDATE_CREDITS; OCFS2_QUOTA_BLOCK_UPDATE_CREDITS;
} }
/* Read in information from global quota file and acquire a reference to it.
* dquot_acquire() has already started the transaction and locked quota file */
int ocfs2_global_read_dquot(struct dquot *dquot)
{
int err, err2, ex = 0;
struct super_block *sb = dquot->dq_sb;
int type = dquot->dq_type;
struct ocfs2_mem_dqinfo *info = sb_dqinfo(sb, type)->dqi_priv;
struct ocfs2_super *osb = OCFS2_SB(sb);
struct inode *gqinode = info->dqi_gqinode;
int need_alloc = ocfs2_global_qinit_alloc(sb, type);
handle_t *handle = NULL;
err = ocfs2_qinfo_lock(info, 0);
if (err < 0)
goto out;
err = qtree_read_dquot(&info->dqi_gi, dquot);
if (err < 0)
goto out_qlock;
OCFS2_DQUOT(dquot)->dq_use_count++;
OCFS2_DQUOT(dquot)->dq_origspace = dquot->dq_dqb.dqb_curspace;
OCFS2_DQUOT(dquot)->dq_originodes = dquot->dq_dqb.dqb_curinodes;
ocfs2_qinfo_unlock(info, 0);
if (!dquot->dq_off) { /* No real quota entry? */
ex = 1;
/*
* Add blocks to quota file before we start a transaction since
* locking allocators ranks above a transaction start
*/
WARN_ON(journal_current_handle());
down_write(&OCFS2_I(gqinode)->ip_alloc_sem);
err = ocfs2_extend_no_holes(gqinode,
gqinode->i_size + (need_alloc << sb->s_blocksize_bits),
gqinode->i_size);
up_write(&OCFS2_I(gqinode)->ip_alloc_sem);
if (err < 0)
goto out;
}
handle = ocfs2_start_trans(osb,
ocfs2_calc_global_qinit_credits(sb, type));
if (IS_ERR(handle)) {
err = PTR_ERR(handle);
goto out;
}
err = ocfs2_qinfo_lock(info, ex);
if (err < 0)
goto out_trans;
err = qtree_write_dquot(&info->dqi_gi, dquot);
if (ex && info_dirty(sb_dqinfo(dquot->dq_sb, dquot->dq_type))) {
err2 = __ocfs2_global_write_info(dquot->dq_sb, dquot->dq_type);
if (!err)
err = err2;
}
out_qlock:
if (ex)
ocfs2_qinfo_unlock(info, 1);
else
ocfs2_qinfo_unlock(info, 0);
out_trans:
if (handle)
ocfs2_commit_trans(osb, handle);
out:
if (err < 0)
mlog_errno(err);
return err;
}
/* Sync local information about quota modifications with global quota file. /* Sync local information about quota modifications with global quota file.
* Caller must have started the transaction and obtained exclusive lock for * Caller must have started the transaction and obtained exclusive lock for
* global quota file inode */ * global quota file inode */
@ -742,6 +688,10 @@ static int ocfs2_release_dquot(struct dquot *dquot)
mlog_entry("id=%u, type=%d", dquot->dq_id, dquot->dq_type); mlog_entry("id=%u, type=%d", dquot->dq_id, dquot->dq_type);
mutex_lock(&dquot->dq_lock);
/* Check whether we are not racing with some other dqget() */
if (atomic_read(&dquot->dq_count) > 1)
goto out;
status = ocfs2_lock_global_qf(oinfo, 1); status = ocfs2_lock_global_qf(oinfo, 1);
if (status < 0) if (status < 0)
goto out; goto out;
@ -752,30 +702,113 @@ static int ocfs2_release_dquot(struct dquot *dquot)
mlog_errno(status); mlog_errno(status);
goto out_ilock; goto out_ilock;
} }
status = dquot_release(dquot);
status = ocfs2_global_release_dquot(dquot);
if (status < 0) {
mlog_errno(status);
goto out_trans;
}
status = ocfs2_local_release_dquot(handle, dquot);
/*
* If we fail here, we cannot do much as global structure is
* already released. So just complain...
*/
if (status < 0)
mlog_errno(status);
clear_bit(DQ_ACTIVE_B, &dquot->dq_flags);
out_trans:
ocfs2_commit_trans(osb, handle); ocfs2_commit_trans(osb, handle);
out_ilock: out_ilock:
ocfs2_unlock_global_qf(oinfo, 1); ocfs2_unlock_global_qf(oinfo, 1);
out: out:
mutex_unlock(&dquot->dq_lock);
mlog_exit(status); mlog_exit(status);
return status; return status;
} }
/*
* Read global dquot structure from disk or create it if it does
* not exist. Also update use count of the global structure and
* create structure in node-local quota file.
*/
static int ocfs2_acquire_dquot(struct dquot *dquot) static int ocfs2_acquire_dquot(struct dquot *dquot)
{ {
struct ocfs2_mem_dqinfo *oinfo = int status = 0, err;
sb_dqinfo(dquot->dq_sb, dquot->dq_type)->dqi_priv; int ex = 0;
int status = 0; struct super_block *sb = dquot->dq_sb;
struct ocfs2_super *osb = OCFS2_SB(sb);
int type = dquot->dq_type;
struct ocfs2_mem_dqinfo *info = sb_dqinfo(sb, type)->dqi_priv;
struct inode *gqinode = info->dqi_gqinode;
int need_alloc = ocfs2_global_qinit_alloc(sb, type);
handle_t *handle;
mlog_entry("id=%u, type=%d", dquot->dq_id, dquot->dq_type); mlog_entry("id=%u, type=%d", dquot->dq_id, type);
/* We need an exclusive lock, because we're going to update use count mutex_lock(&dquot->dq_lock);
* and instantiate possibly new dquot structure */ /*
status = ocfs2_lock_global_qf(oinfo, 1); * We need an exclusive lock, because we're going to update use count
* and instantiate possibly new dquot structure
*/
status = ocfs2_lock_global_qf(info, 1);
if (status < 0) if (status < 0)
goto out; goto out;
status = dquot_acquire(dquot); if (!test_bit(DQ_READ_B, &dquot->dq_flags)) {
ocfs2_unlock_global_qf(oinfo, 1); status = ocfs2_qinfo_lock(info, 0);
if (status < 0)
goto out_dq;
status = qtree_read_dquot(&info->dqi_gi, dquot);
ocfs2_qinfo_unlock(info, 0);
if (status < 0)
goto out_dq;
}
set_bit(DQ_READ_B, &dquot->dq_flags);
OCFS2_DQUOT(dquot)->dq_use_count++;
OCFS2_DQUOT(dquot)->dq_origspace = dquot->dq_dqb.dqb_curspace;
OCFS2_DQUOT(dquot)->dq_originodes = dquot->dq_dqb.dqb_curinodes;
if (!dquot->dq_off) { /* No real quota entry? */
ex = 1;
/*
* Add blocks to quota file before we start a transaction since
* locking allocators ranks above a transaction start
*/
WARN_ON(journal_current_handle());
status = ocfs2_extend_no_holes(gqinode,
gqinode->i_size + (need_alloc << sb->s_blocksize_bits),
gqinode->i_size);
if (status < 0)
goto out_dq;
}
handle = ocfs2_start_trans(osb,
ocfs2_calc_global_qinit_credits(sb, type));
if (IS_ERR(handle)) {
status = PTR_ERR(handle);
goto out_dq;
}
status = ocfs2_qinfo_lock(info, ex);
if (status < 0)
goto out_trans;
status = qtree_write_dquot(&info->dqi_gi, dquot);
if (ex && info_dirty(sb_dqinfo(sb, type))) {
err = __ocfs2_global_write_info(sb, type);
if (!status)
status = err;
}
ocfs2_qinfo_unlock(info, ex);
out_trans:
ocfs2_commit_trans(osb, handle);
out_dq:
ocfs2_unlock_global_qf(info, 1);
if (status < 0)
goto out;
status = ocfs2_create_local_dquot(dquot);
if (status < 0)
goto out;
set_bit(DQ_ACTIVE_B, &dquot->dq_flags);
out: out:
mutex_unlock(&dquot->dq_lock);
mlog_exit(status); mlog_exit(status);
return status; return status;
} }
@ -820,7 +853,9 @@ static int ocfs2_mark_dquot_dirty(struct dquot *dquot)
mlog_errno(status); mlog_errno(status);
goto out_ilock; goto out_ilock;
} }
mutex_lock(&sb_dqopt(sb)->dqio_mutex);
status = ocfs2_sync_dquot(dquot); status = ocfs2_sync_dquot(dquot);
mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
if (status < 0) { if (status < 0) {
mlog_errno(status); mlog_errno(status);
goto out_trans; goto out_trans;

View File

@ -22,6 +22,7 @@
#include "dlmglue.h" #include "dlmglue.h"
#include "quota.h" #include "quota.h"
#include "uptodate.h" #include "uptodate.h"
#include "super.h"
/* Number of local quota structures per block */ /* Number of local quota structures per block */
static inline unsigned int ol_quota_entries_per_block(struct super_block *sb) static inline unsigned int ol_quota_entries_per_block(struct super_block *sb)
@ -129,6 +130,39 @@ static int ocfs2_modify_bh(struct inode *inode, struct buffer_head *bh,
return 0; return 0;
} }
/*
* Read quota block from a given logical offset.
*
* This function acquires ip_alloc_sem and thus it must not be called with a
* transaction started.
*/
static int ocfs2_read_quota_block(struct inode *inode, u64 v_block,
struct buffer_head **bh)
{
int rc = 0;
struct buffer_head *tmp = *bh;
if (i_size_read(inode) >> inode->i_sb->s_blocksize_bits <= v_block) {
ocfs2_error(inode->i_sb,
"Quota file %llu is probably corrupted! Requested "
"to read block %Lu but file has size only %Lu\n",
(unsigned long long)OCFS2_I(inode)->ip_blkno,
(unsigned long long)v_block,
(unsigned long long)i_size_read(inode));
return -EIO;
}
rc = ocfs2_read_virt_blocks(inode, v_block, 1, &tmp, 0,
ocfs2_validate_quota_block);
if (rc)
mlog_errno(rc);
/* If ocfs2_read_virt_blocks() got us a new bh, pass it up. */
if (!rc && !*bh)
*bh = tmp;
return rc;
}
/* Check whether we understand format of quota files */ /* Check whether we understand format of quota files */
static int ocfs2_local_check_quota_file(struct super_block *sb, int type) static int ocfs2_local_check_quota_file(struct super_block *sb, int type)
{ {
@ -972,10 +1006,8 @@ static struct ocfs2_quota_chunk *ocfs2_local_quota_add_chunk(
} }
/* Initialize chunk header */ /* Initialize chunk header */
down_read(&OCFS2_I(lqinode)->ip_alloc_sem);
status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks, status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks,
&p_blkno, NULL, NULL); &p_blkno, NULL, NULL);
up_read(&OCFS2_I(lqinode)->ip_alloc_sem);
if (status < 0) { if (status < 0) {
mlog_errno(status); mlog_errno(status);
goto out_trans; goto out_trans;
@ -1003,10 +1035,8 @@ static struct ocfs2_quota_chunk *ocfs2_local_quota_add_chunk(
ocfs2_journal_dirty(handle, bh); ocfs2_journal_dirty(handle, bh);
/* Initialize new block with structures */ /* Initialize new block with structures */
down_read(&OCFS2_I(lqinode)->ip_alloc_sem);
status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks + 1, status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks + 1,
&p_blkno, NULL, NULL); &p_blkno, NULL, NULL);
up_read(&OCFS2_I(lqinode)->ip_alloc_sem);
if (status < 0) { if (status < 0) {
mlog_errno(status); mlog_errno(status);
goto out_trans; goto out_trans;
@ -1103,10 +1133,8 @@ static struct ocfs2_quota_chunk *ocfs2_extend_local_quota_file(
} }
/* Get buffer from the just added block */ /* Get buffer from the just added block */
down_read(&OCFS2_I(lqinode)->ip_alloc_sem);
status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks, status = ocfs2_extent_map_get_blocks(lqinode, oinfo->dqi_blocks,
&p_blkno, NULL, NULL); &p_blkno, NULL, NULL);
up_read(&OCFS2_I(lqinode)->ip_alloc_sem);
if (status < 0) { if (status < 0) {
mlog_errno(status); mlog_errno(status);
goto out; goto out;
@ -1187,7 +1215,7 @@ static void olq_alloc_dquot(struct buffer_head *bh, void *private)
} }
/* Create dquot in the local file for given id */ /* Create dquot in the local file for given id */
static int ocfs2_create_local_dquot(struct dquot *dquot) int ocfs2_create_local_dquot(struct dquot *dquot)
{ {
struct super_block *sb = dquot->dq_sb; struct super_block *sb = dquot->dq_sb;
int type = dquot->dq_type; int type = dquot->dq_type;
@ -1237,36 +1265,11 @@ out:
return status; return status;
} }
/* Create entry in local file for dquot, load data from the global file */ /*
static int ocfs2_local_read_dquot(struct dquot *dquot) * Release dquot structure from local quota file. ocfs2_release_dquot() has
{ * already started a transaction and written all changes to global quota file
int status; */
int ocfs2_local_release_dquot(handle_t *handle, struct dquot *dquot)
mlog_entry("id=%u, type=%d\n", dquot->dq_id, dquot->dq_type);
status = ocfs2_global_read_dquot(dquot);
if (status < 0) {
mlog_errno(status);
goto out_err;
}
/* Now create entry in the local quota file */
status = ocfs2_create_local_dquot(dquot);
if (status < 0) {
mlog_errno(status);
goto out_err;
}
mlog_exit(0);
return 0;
out_err:
mlog_exit(status);
return status;
}
/* Release dquot structure from local quota file. ocfs2_release_dquot() has
* already started a transaction and obtained exclusive lock for global
* quota file. */
static int ocfs2_local_release_dquot(struct dquot *dquot)
{ {
int status; int status;
int type = dquot->dq_type; int type = dquot->dq_type;
@ -1274,15 +1277,6 @@ static int ocfs2_local_release_dquot(struct dquot *dquot)
struct super_block *sb = dquot->dq_sb; struct super_block *sb = dquot->dq_sb;
struct ocfs2_local_disk_chunk *dchunk; struct ocfs2_local_disk_chunk *dchunk;
int offset; int offset;
handle_t *handle = journal_current_handle();
BUG_ON(!handle);
/* First write all local changes to global file */
status = ocfs2_global_release_dquot(dquot);
if (status < 0) {
mlog_errno(status);
goto out;
}
status = ocfs2_journal_access_dq(handle, status = ocfs2_journal_access_dq(handle,
INODE_CACHE(sb_dqopt(sb)->files[type]), INODE_CACHE(sb_dqopt(sb)->files[type]),
@ -1315,9 +1309,7 @@ static const struct quota_format_ops ocfs2_format_ops = {
.read_file_info = ocfs2_local_read_info, .read_file_info = ocfs2_local_read_info,
.write_file_info = ocfs2_global_write_info, .write_file_info = ocfs2_global_write_info,
.free_file_info = ocfs2_local_free_info, .free_file_info = ocfs2_local_free_info,
.read_dqblk = ocfs2_local_read_dquot,
.commit_dqblk = ocfs2_local_write_dquot, .commit_dqblk = ocfs2_local_write_dquot,
.release_dqblk = ocfs2_local_release_dquot,
}; };
struct quota_format_type ocfs2_quota_format = { struct quota_format_type ocfs2_quota_format = {