ceph: include the initial ACL in create/mkdir/mknod MDS requests
authorYan, Zheng <zyan@redhat.com>
Tue, 16 Sep 2014 12:35:17 +0000 (20:35 +0800)
committerSage Weil <sage@redhat.com>
Tue, 14 Oct 2014 19:56:49 +0000 (12:56 -0700)
Current code set new file/directory's initial ACL in a non-atomic
manner.
Client first sends request to MDS to create new file/directory, then set
the initial ACL after the new file/directory is successfully created.

The fix is include the initial ACL in create/mkdir/mknod MDS requests.
So MDS can handle creating file/directory and setting the initial ACL in
one request.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
Reviewed-by: Sage Weil <sage@redhat.com>
fs/ceph/acl.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/super.h

index cebf2eb..5bd853b 100644 (file)
@@ -169,36 +169,109 @@ out:
        return ret;
 }
 
-int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
+int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+                      struct ceph_acls_info *info)
 {
-       struct posix_acl *default_acl, *acl;
-       umode_t new_mode = inode->i_mode;
-       int error;
-
-       error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
-       if (error)
-               return error;
-
-       if (!default_acl && !acl) {
-               cache_no_acl(inode);
-               if (new_mode != inode->i_mode) {
-                       struct iattr newattrs = {
-                               .ia_mode = new_mode,
-                               .ia_valid = ATTR_MODE,
-                       };
-                       error = ceph_setattr(dentry, &newattrs);
+       struct posix_acl *acl, *default_acl;
+       size_t val_size1 = 0, val_size2 = 0;
+       struct ceph_pagelist *pagelist = NULL;
+       void *tmp_buf = NULL;
+       int err;
+
+       err = posix_acl_create(dir, mode, &default_acl, &acl);
+       if (err)
+               return err;
+
+       if (acl) {
+               int ret = posix_acl_equiv_mode(acl, mode);
+               if (ret < 0)
+                       goto out_err;
+               if (ret == 0) {
+                       posix_acl_release(acl);
+                       acl = NULL;
                }
-               return error;
        }
 
-       if (default_acl) {
-               error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
-               posix_acl_release(default_acl);
-       }
+       if (!default_acl && !acl)
+               return 0;
+
+       if (acl)
+               val_size1 = posix_acl_xattr_size(acl->a_count);
+       if (default_acl)
+               val_size2 = posix_acl_xattr_size(default_acl->a_count);
+
+       err = -ENOMEM;
+       tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
+       if (!tmp_buf)
+               goto out_err;
+       pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
+       if (!pagelist)
+               goto out_err;
+       ceph_pagelist_init(pagelist);
+
+       err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
+       if (err)
+               goto out_err;
+
+       ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
+
        if (acl) {
-               if (!error)
-                       error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS);
-               posix_acl_release(acl);
+               size_t len = strlen(POSIX_ACL_XATTR_ACCESS);
+               err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
+               if (err)
+                       goto out_err;
+               ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS,
+                                           len);
+               err = posix_acl_to_xattr(&init_user_ns, acl,
+                                        tmp_buf, val_size1);
+               if (err < 0)
+                       goto out_err;
+               ceph_pagelist_encode_32(pagelist, val_size1);
+               ceph_pagelist_append(pagelist, tmp_buf, val_size1);
        }
-       return error;
+       if (default_acl) {
+               size_t len = strlen(POSIX_ACL_XATTR_DEFAULT);
+               err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
+               if (err)
+                       goto out_err;
+               err = ceph_pagelist_encode_string(pagelist,
+                                                 POSIX_ACL_XATTR_DEFAULT, len);
+               err = posix_acl_to_xattr(&init_user_ns, default_acl,
+                                        tmp_buf, val_size2);
+               if (err < 0)
+                       goto out_err;
+               ceph_pagelist_encode_32(pagelist, val_size2);
+               ceph_pagelist_append(pagelist, tmp_buf, val_size2);
+       }
+
+       kfree(tmp_buf);
+
+       info->acl = acl;
+       info->default_acl = default_acl;
+       info->pagelist = pagelist;
+       return 0;
+
+out_err:
+       posix_acl_release(acl);
+       posix_acl_release(default_acl);
+       kfree(tmp_buf);
+       if (pagelist)
+               ceph_pagelist_release(pagelist);
+       return err;
+}
+
+void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
+{
+       if (!inode)
+               return;
+       ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
+       ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
+}
+
+void ceph_release_acls_info(struct ceph_acls_info *info)
+{
+       posix_acl_release(info->acl);
+       posix_acl_release(info->default_acl);
+       if (info->pagelist)
+               ceph_pagelist_release(info->pagelist);
 }
index c29d6ae..26be849 100644 (file)
@@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
+       struct ceph_acls_info acls = {};
        int err;
 
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       err = ceph_pre_init_acls(dir, &mode, &acls);
+       if (err < 0)
+               return err;
+
        dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
             dir, dentry, mode, rdev);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
        if (IS_ERR(req)) {
-               d_drop(dentry);
-               return PTR_ERR(req);
+               err = PTR_ERR(req);
+               goto out;
        }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
@@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        req->r_args.mknod.rdev = cpu_to_le32(rdev);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       if (acls.pagelist) {
+               req->r_pagelist = acls.pagelist;
+               acls.pagelist = NULL;
+       }
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
        ceph_mdsc_put_request(req);
-
+out:
        if (!err)
-               ceph_init_acl(dentry, dentry->d_inode, dir);
+               ceph_init_inode_acls(dentry->d_inode, &acls);
        else
                d_drop(dentry);
+       ceph_release_acls_info(&acls);
        return err;
 }
 
@@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
        if (IS_ERR(req)) {
-               d_drop(dentry);
-               return PTR_ERR(req);
+               err = PTR_ERR(req);
+               goto out;
        }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
@@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
        ceph_mdsc_put_request(req);
-       if (!err)
-               ceph_init_acl(dentry, dentry->d_inode, dir);
-       else
+out:
+       if (err)
                d_drop(dentry);
        return err;
 }
@@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
+       struct ceph_acls_info acls = {};
        int err = -EROFS;
        int op;
 
@@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        } else {
                goto out;
        }
+
+       mode |= S_IFDIR;
+       err = ceph_pre_init_acls(dir, &mode, &acls);
+       if (err < 0)
+               goto out;
+
        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
@@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        req->r_args.mkdir.mode = cpu_to_le32(mode);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       if (acls.pagelist) {
+               req->r_pagelist = acls.pagelist;
+               acls.pagelist = NULL;
+       }
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
        ceph_mdsc_put_request(req);
 out:
        if (!err)
-               ceph_init_acl(dentry, dentry->d_inode, dir);
+               ceph_init_inode_acls(dentry->d_inode, &acls);
        else
                d_drop(dentry);
+       ceph_release_acls_info(&acls);
        return err;
 }
 
index d190650..d7e0da8 100644 (file)
@@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        struct dentry *dn;
+       struct ceph_acls_info acls = {};
        int err;
 
        dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
@@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        if (err < 0)
                return err;
 
+       if (flags & O_CREAT) {
+               err = ceph_pre_init_acls(dir, &mode, &acls);
+               if (err < 0)
+                       return err;
+       }
+
        /* do the open */
        req = prepare_open_request(dir->i_sb, flags, mode);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       if (IS_ERR(req)) {
+               err = PTR_ERR(req);
+               goto out_acl;
+       }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        if (flags & O_CREAT) {
                req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+               if (acls.pagelist) {
+                       req->r_pagelist = acls.pagelist;
+                       acls.pagelist = NULL;
+               }
        }
        req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
        err = ceph_mdsc_do_request(mdsc,
                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
                                   req);
        if (err)
-               goto out_err;
+               goto out_req;
 
        err = ceph_handle_snapdir(req, dentry, err);
        if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
@@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                dn = NULL;
        }
        if (err)
-               goto out_err;
+               goto out_req;
        if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
                /* make vfs retry on splice, ENOENT, or symlink */
                dout("atomic_open finish_no_open on dn %p\n", dn);
@@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        } else {
                dout("atomic_open finish_open on dn %p\n", dn);
                if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
-                       ceph_init_acl(dentry, dentry->d_inode, dir);
+                       ceph_init_inode_acls(dentry->d_inode, &acls);
                        *opened |= FILE_CREATED;
                }
                err = finish_open(file, dentry, ceph_open, opened);
        }
-out_err:
+out_req:
        if (!req->r_err && req->r_target_inode)
                ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
        ceph_mdsc_put_request(req);
+out_acl:
+       ceph_release_acls_info(&acls);
        dout("atomic_open result=%d\n", err);
        return err;
 }
index bbb44cd..f62a098 100644 (file)
@@ -733,15 +733,23 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
 extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
 extern void __init ceph_xattr_init(void);
 extern void ceph_xattr_exit(void);
+extern const struct xattr_handler *ceph_xattr_handlers[];
 
 /* acl.c */
-extern const struct xattr_handler *ceph_xattr_handlers[];
+struct ceph_acls_info {
+       void *default_acl;
+       void *acl;
+       struct ceph_pagelist *pagelist;
+};
 
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 
 struct posix_acl *ceph_get_acl(struct inode *, int);
 int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type);
-int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
+int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+                      struct ceph_acls_info *info);
+void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info);
+void ceph_release_acls_info(struct ceph_acls_info *info);
 
 static inline void ceph_forget_all_cached_acls(struct inode *inode)
 {
@@ -753,12 +761,18 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
 #define ceph_get_acl NULL
 #define ceph_set_acl NULL
 
-static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
-                               struct inode *dir)
+static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+                                    struct ceph_acls_info *info)
 {
        return 0;
 }
-
+static inline void ceph_init_inode_acls(struct inode *inode,
+                                       struct ceph_acls_info *info)
+{
+}
+static inline void ceph_release_acls_info(struct ceph_acls_info *info)
+{
+}
 static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
 {
        return 0;