diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index eae417d71136..e7e2ebac330d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -856,6 +856,10 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir, if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_quota_is_max_files_exceeded(dir)) { err = -EDQUOT; goto out; @@ -918,6 +922,10 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir, if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_quota_is_max_files_exceeded(dir)) { err = -EDQUOT; goto out; @@ -968,9 +976,13 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir, struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_mds_request *req; struct ceph_acl_sec_ctx as_ctx = {}; - int err = -EROFS; + int err; int op; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_snap(dir) == CEPH_SNAPDIR) { /* mkdir .snap/foo is a MKSNAP */ op = CEPH_MDS_OP_MKSNAP; @@ -980,6 +992,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir, dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode); op = CEPH_MDS_OP_MKDIR; } else { + err = -EROFS; goto out; } @@ -1037,6 +1050,10 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, struct ceph_mds_request *req; int err; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; @@ -1071,9 +1088,27 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { + struct dentry *dentry = req->r_dentry; + struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); + struct ceph_dentry_info *di = ceph_dentry(dentry); int result = req->r_err ? req->r_err : le32_to_cpu(req->r_reply_info.head->result); + if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) + pr_warn("%s dentry %p:%pd async unlink bit is not set\n", + __func__, dentry, dentry); + + spin_lock(&fsc->async_unlink_conflict_lock); + hash_del_rcu(&di->hnode); + spin_unlock(&fsc->async_unlink_conflict_lock); + + spin_lock(&dentry->d_lock); + di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK; + wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT); + spin_unlock(&dentry->d_lock); + + synchronize_rcu(); + if (result == -EJUKEBOX) goto out; @@ -1081,7 +1116,7 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, if (result) { int pathlen = 0; u64 base = 0; - char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, + char *path = ceph_mdsc_build_path(dentry, &pathlen, &base, 0); /* mark error on parent + clear complete */ @@ -1089,13 +1124,13 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, ceph_dir_clear_complete(req->r_parent); /* drop the dentry -- we don't know its status */ - if (!d_unhashed(req->r_dentry)) - d_drop(req->r_dentry); + if (!d_unhashed(dentry)) + d_drop(dentry); /* mark inode itself for an error (since metadata is bogus) */ mapping_set_error(req->r_old_inode->i_mapping, result); - pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n", + pr_warn("async unlink failure path=(%llx)%s result=%d!\n", base, IS_ERR(path) ? "<>" : path, result); ceph_mdsc_free_path(path, pathlen); } @@ -1180,6 +1215,8 @@ retry: if (try_async && op == CEPH_MDS_OP_UNLINK && (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) { + struct ceph_dentry_info *di = ceph_dentry(dentry); + dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir), dentry->d_name.len, dentry->d_name.name, ceph_cap_string(req->r_dir_caps)); @@ -1187,6 +1224,16 @@ retry: req->r_callback = ceph_async_unlink_cb; req->r_old_inode = d_inode(dentry); ihold(req->r_old_inode); + + spin_lock(&dentry->d_lock); + di->flags |= CEPH_DENTRY_ASYNC_UNLINK; + spin_unlock(&dentry->d_lock); + + spin_lock(&fsc->async_unlink_conflict_lock); + hash_add_rcu(fsc->async_unlink_conflict, &di->hnode, + dentry->d_name.hash); + spin_unlock(&fsc->async_unlink_conflict_lock); + err = ceph_mdsc_submit_request(mdsc, dir, req); if (!err) { /* @@ -1195,10 +1242,20 @@ retry: */ drop_nlink(inode); d_delete(dentry); - } else if (err == -EJUKEBOX) { - try_async = false; - ceph_mdsc_put_request(req); - goto retry; + } else { + spin_lock(&fsc->async_unlink_conflict_lock); + hash_del_rcu(&di->hnode); + spin_unlock(&fsc->async_unlink_conflict_lock); + + spin_lock(&dentry->d_lock); + di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK; + spin_unlock(&dentry->d_lock); + + if (err == -EJUKEBOX) { + try_async = false; + ceph_mdsc_put_request(req); + goto retry; + } } } else { set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); @@ -1237,6 +1294,10 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir, (!ceph_quota_is_same_realm(old_dir, new_dir))) return -EXDEV; + err = ceph_wait_on_conflict_unlink(new_dentry); + if (err) + return err; + dout("rename dir %p dentry %p to dir %p dentry %p\n", old_dir, old_dentry, new_dir, new_dentry); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index da59e836a06e..0f3424dc618b 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -569,7 +569,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc, char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, &base, 0); - pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n", + pr_warn("async create failure path=(%llx)%s result=%d!\n", base, IS_ERR(path) ? "<>" : path, result); ceph_mdsc_free_path(path, pathlen); @@ -740,6 +740,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, if (dentry->d_name.len > NAME_MAX) return -ENAMETOOLONG; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (flags & O_CREAT) { if (ceph_quota_is_max_files_exceeded(dir)) return -EDQUOT; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 0aded10375fd..f6da80d110dc 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -456,7 +456,7 @@ static int ceph_parse_deleg_inos(void **p, void *end, dout("added delegated inode 0x%llx\n", start - 1); } else if (err == -EBUSY) { - pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", + pr_warn("MDS delegated inode 0x%llx more than once.\n", start - 1); } else { return err; @@ -655,6 +655,79 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); } +/* + * In async unlink case the kclient won't wait for the first reply + * from MDS and just drop all the links and unhash the dentry and then + * succeeds immediately. + * + * For any new create/link/rename,etc requests followed by using the + * same file names we must wait for the first reply of the inflight + * unlink request, or the MDS possibly will fail these following + * requests with -EEXIST if the inflight async unlink request was + * delayed for some reasons. + * + * And the worst case is that for the none async openc request it will + * successfully open the file if the CDentry hasn't been unlinked yet, + * but later the previous delayed async unlink request will remove the + * CDenty. That means the just created file is possiblly deleted later + * by accident. + * + * We need to wait for the inflight async unlink requests to finish + * when creating new files/directories by using the same file names. + */ +int ceph_wait_on_conflict_unlink(struct dentry *dentry) +{ + struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); + struct dentry *pdentry = dentry->d_parent; + struct dentry *udentry, *found = NULL; + struct ceph_dentry_info *di; + struct qstr dname; + u32 hash = dentry->d_name.hash; + int err; + + dname.name = dentry->d_name.name; + dname.len = dentry->d_name.len; + + rcu_read_lock(); + hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, + hnode, hash) { + udentry = di->dentry; + + spin_lock(&udentry->d_lock); + if (udentry->d_name.hash != hash) + goto next; + if (unlikely(udentry->d_parent != pdentry)) + goto next; + if (!hash_hashed(&di->hnode)) + goto next; + + if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) + pr_warn("%s dentry %p:%pd async unlink bit is not set\n", + __func__, dentry, dentry); + + if (!d_same_name(udentry, pdentry, &dname)) + goto next; + + spin_unlock(&udentry->d_lock); + found = dget(udentry); + break; +next: + spin_unlock(&udentry->d_lock); + } + rcu_read_unlock(); + + if (likely(!found)) + return 0; + + dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__, + dentry, dentry, found, found); + + err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, + TASK_KILLABLE); + dput(found); + return err; +} + /* * sessions diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 4620167f58eb..d8ec2ac93da3 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -575,6 +575,7 @@ static inline int ceph_wait_on_async_create(struct inode *inode) TASK_KILLABLE); } +extern int ceph_wait_on_conflict_unlink(struct dentry *dentry); extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session); extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino); #endif diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 40140805bdcf..5539f6c87a45 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -816,6 +816,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, if (!fsc->cap_wq) goto fail_inode_wq; + hash_init(fsc->async_unlink_conflict); + spin_lock_init(&fsc->async_unlink_conflict_lock); + spin_lock(&ceph_fsc_lock); list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list); spin_unlock(&ceph_fsc_lock); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index f59dac66955b..59469253592b 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -99,6 +100,8 @@ struct ceph_mount_options { char *mon_addr; }; +#define CEPH_ASYNC_CREATE_CONFLICT_BITS 8 + struct ceph_fs_client { struct super_block *sb; @@ -124,6 +127,9 @@ struct ceph_fs_client { struct workqueue_struct *inode_wq; struct workqueue_struct *cap_wq; + DECLARE_HASHTABLE(async_unlink_conflict, CEPH_ASYNC_CREATE_CONFLICT_BITS); + spinlock_t async_unlink_conflict_lock; + #ifdef CONFIG_DEBUG_FS struct dentry *debugfs_dentry_lru, *debugfs_caps; struct dentry *debugfs_congestion_kb; @@ -280,7 +286,8 @@ struct ceph_dentry_info { struct dentry *dentry; struct ceph_mds_session *lease_session; struct list_head lease_list; - unsigned flags; + struct hlist_node hnode; + unsigned long flags; int lease_shared_gen; u32 lease_gen; u32 lease_seq; @@ -289,10 +296,12 @@ struct ceph_dentry_info { u64 offset; }; -#define CEPH_DENTRY_REFERENCED 1 -#define CEPH_DENTRY_LEASE_LIST 2 -#define CEPH_DENTRY_SHRINK_LIST 4 -#define CEPH_DENTRY_PRIMARY_LINK 8 +#define CEPH_DENTRY_REFERENCED (1 << 0) +#define CEPH_DENTRY_LEASE_LIST (1 << 1) +#define CEPH_DENTRY_SHRINK_LIST (1 << 2) +#define CEPH_DENTRY_PRIMARY_LINK (1 << 3) +#define CEPH_DENTRY_ASYNC_UNLINK_BIT (4) +#define CEPH_DENTRY_ASYNC_UNLINK (1 << CEPH_DENTRY_ASYNC_UNLINK_BIT) struct ceph_inode_xattrs_info { /*