Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 13 Jan 2011 18:25:24 +0000 (10:25 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 13 Jan 2011 18:25:24 +0000 (10:25 -0800)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: fix cleanup when trying to mount inexistent image
  net/ceph: make ceph_msgr_wq non-reentrant
  ceph: fsc->*_wq's aren't used in memory reclaim path
  ceph: Always free allocated memory in osdmap_decode()
  ceph: Makefile: Remove unnessary code
  ceph: associate requests with opening sessions
  ceph: drop redundant r_mds field
  ceph: implement DIRLAYOUTHASH feature to get dir layout from MDS
  ceph: add dir_layout to inode

15 files changed:
drivers/block/rbd.c
fs/ceph/Makefile
fs/ceph/debugfs.c
fs/ceph/dir.c
fs/ceph/export.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h
include/linux/ceph/ceph_fs.h
include/linux/ceph/messenger.h
net/ceph/ceph_hash.c
net/ceph/messenger.c
net/ceph/osdmap.c

index 008d4a0..e1e38b1 100644 (file)
@@ -1790,18 +1790,29 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
 
        rc = rbd_bus_add_dev(rbd_dev);
        if (rc)
-               goto err_out_disk;
+               goto err_out_blkdev;
+
        /* set up and announce blkdev mapping */
        rc = rbd_init_disk(rbd_dev);
        if (rc)
-               goto err_out_blkdev;
+               goto err_out_bus;
 
        return count;
 
+err_out_bus:
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+       list_del_init(&rbd_dev->node);
+       mutex_unlock(&ctl_mutex);
+
+       /* this will also clean up rest of rbd_dev stuff */
+
+       rbd_bus_del_dev(rbd_dev);
+       kfree(options);
+       kfree(mon_dev_name);
+       return rc;
+
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
-err_out_disk:
-       rbd_free_disk(rbd_dev);
 err_out_client:
        rbd_put_client(rbd_dev);
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
index 9e6c4f2..bd35212 100644 (file)
@@ -2,31 +2,10 @@
 # Makefile for CEPH filesystem.
 #
 
-ifneq ($(KERNELRELEASE),)
-
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
-ceph-objs := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
+ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
        export.o caps.o snap.o xattr.o \
        mds_client.o mdsmap.o strings.o ceph_frag.o \
        debugfs.o
 
-else
-#Otherwise we were called directly from the command
-# line; invoke the kernel build system.
-
-KERNELDIR ?= /lib/modules/$(shell uname -r)/build
-PWD := $(shell pwd)
-
-default: all
-
-all:
-       $(MAKE) -C $(KERNELDIR) M=$(PWD) CONFIG_CEPH_FS=m modules
-
-modules_install:
-       $(MAKE) -C $(KERNELDIR) M=$(PWD) CONFIG_CEPH_FS=m modules_install
-
-clean:
-       $(MAKE) -C $(KERNELDIR) M=$(PWD) clean
-
-endif
index 7ae1b3d..08f65fa 100644 (file)
@@ -60,10 +60,13 @@ static int mdsc_show(struct seq_file *s, void *p)
        for (rp = rb_first(&mdsc->request_tree); rp; rp = rb_next(rp)) {
                req = rb_entry(rp, struct ceph_mds_request, r_node);
 
-               if (req->r_request)
-                       seq_printf(s, "%lld\tmds%d\t", req->r_tid, req->r_mds);
-               else
+               if (req->r_request && req->r_session)
+                       seq_printf(s, "%lld\tmds%d\t", req->r_tid,
+                                  req->r_session->s_mds);
+               else if (!req->r_request)
                        seq_printf(s, "%lld\t(no request)\t", req->r_tid);
+               else
+                       seq_printf(s, "%lld\t(no session)\t", req->r_tid);
 
                seq_printf(s, "%s", ceph_mds_op_name(req->r_op));
 
index fa7ca04..0bc68de 100644 (file)
@@ -1224,6 +1224,26 @@ void ceph_dentry_lru_del(struct dentry *dn)
        }
 }
 
+/*
+ * Return name hash for a given dentry.  This is dependent on
+ * the parent directory's hash function.
+ */
+unsigned ceph_dentry_hash(struct dentry *dn)
+{
+       struct inode *dir = dn->d_parent->d_inode;
+       struct ceph_inode_info *dci = ceph_inode(dir);
+
+       switch (dci->i_dir_layout.dl_dir_hash) {
+       case 0: /* for backward compat */
+       case CEPH_STR_HASH_LINUX:
+               return dn->d_name.hash;
+
+       default:
+               return ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
+                                    dn->d_name.name, dn->d_name.len);
+       }
+}
+
 const struct file_operations ceph_dir_fops = {
        .read = ceph_read_dir,
        .readdir = ceph_readdir,
index 2297d94..e410561 100644 (file)
@@ -59,7 +59,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
                dout("encode_fh %p connectable\n", dentry);
                cfh->ino = ceph_ino(dentry->d_inode);
                cfh->parent_ino = ceph_ino(parent->d_inode);
-               cfh->parent_name_hash = parent->d_name.hash;
+               cfh->parent_name_hash = ceph_dentry_hash(parent);
                *max_len = connected_handle_length;
                type = 2;
        } else if (*max_len >= handle_length) {
index e61de4f..e835eff 100644 (file)
@@ -297,6 +297,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_release_count = 0;
        ci->i_symlink = NULL;
 
+       memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
+
        ci->i_fragtree = RB_ROOT;
        mutex_init(&ci->i_fragtree_mutex);
 
@@ -689,6 +691,8 @@ static int fill_inode(struct inode *inode,
                inode->i_op = &ceph_dir_iops;
                inode->i_fop = &ceph_dir_fops;
 
+               ci->i_dir_layout = iinfo->dir_layout;
+
                ci->i_files = le64_to_cpu(info->files);
                ci->i_subdirs = le64_to_cpu(info->subdirs);
                ci->i_rbytes = le64_to_cpu(info->rbytes);
index a50fca1..1e30d19 100644 (file)
@@ -60,7 +60,8 @@ static const struct ceph_connection_operations mds_con_ops;
  * parse individual inode info
  */
 static int parse_reply_info_in(void **p, void *end,
-                              struct ceph_mds_reply_info_in *info)
+                              struct ceph_mds_reply_info_in *info,
+                              int features)
 {
        int err = -EIO;
 
@@ -74,6 +75,12 @@ static int parse_reply_info_in(void **p, void *end,
        info->symlink = *p;
        *p += info->symlink_len;
 
+       if (features & CEPH_FEATURE_DIRLAYOUTHASH)
+               ceph_decode_copy_safe(p, end, &info->dir_layout,
+                                     sizeof(info->dir_layout), bad);
+       else
+               memset(&info->dir_layout, 0, sizeof(info->dir_layout));
+
        ceph_decode_32_safe(p, end, info->xattr_len, bad);
        ceph_decode_need(p, end, info->xattr_len, bad);
        info->xattr_data = *p;
@@ -88,12 +95,13 @@ bad:
  * target inode.
  */
 static int parse_reply_info_trace(void **p, void *end,
-                                 struct ceph_mds_reply_info_parsed *info)
+                                 struct ceph_mds_reply_info_parsed *info,
+                                 int features)
 {
        int err;
 
        if (info->head->is_dentry) {
-               err = parse_reply_info_in(p, end, &info->diri);
+               err = parse_reply_info_in(p, end, &info->diri, features);
                if (err < 0)
                        goto out_bad;
 
@@ -114,7 +122,7 @@ static int parse_reply_info_trace(void **p, void *end,
        }
 
        if (info->head->is_target) {
-               err = parse_reply_info_in(p, end, &info->targeti);
+               err = parse_reply_info_in(p, end, &info->targeti, features);
                if (err < 0)
                        goto out_bad;
        }
@@ -134,7 +142,8 @@ out_bad:
  * parse readdir results
  */
 static int parse_reply_info_dir(void **p, void *end,
-                               struct ceph_mds_reply_info_parsed *info)
+                               struct ceph_mds_reply_info_parsed *info,
+                               int features)
 {
        u32 num, i = 0;
        int err;
@@ -182,7 +191,7 @@ static int parse_reply_info_dir(void **p, void *end,
                *p += sizeof(struct ceph_mds_reply_lease);
 
                /* inode */
-               err = parse_reply_info_in(p, end, &info->dir_in[i]);
+               err = parse_reply_info_in(p, end, &info->dir_in[i], features);
                if (err < 0)
                        goto out_bad;
                i++;
@@ -205,7 +214,8 @@ out_bad:
  * parse fcntl F_GETLK results
  */
 static int parse_reply_info_filelock(void **p, void *end,
-                struct ceph_mds_reply_info_parsed *info)
+                                    struct ceph_mds_reply_info_parsed *info,
+                                    int features)
 {
        if (*p + sizeof(*info->filelock_reply) > end)
                goto bad;
@@ -225,19 +235,21 @@ bad:
  * parse extra results
  */
 static int parse_reply_info_extra(void **p, void *end,
-                struct ceph_mds_reply_info_parsed *info)
+                                 struct ceph_mds_reply_info_parsed *info,
+                                 int features)
 {
        if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
-               return parse_reply_info_filelock(p, end, info);
+               return parse_reply_info_filelock(p, end, info, features);
        else
-               return parse_reply_info_dir(p, end, info);
+               return parse_reply_info_dir(p, end, info, features);
 }
 
 /*
  * parse entire mds reply
  */
 static int parse_reply_info(struct ceph_msg *msg,
-                           struct ceph_mds_reply_info_parsed *info)
+                           struct ceph_mds_reply_info_parsed *info,
+                           int features)
 {
        void *p, *end;
        u32 len;
@@ -250,7 +262,7 @@ static int parse_reply_info(struct ceph_msg *msg,
        /* trace */
        ceph_decode_32_safe(&p, end, len, bad);
        if (len > 0) {
-               err = parse_reply_info_trace(&p, p+len, info);
+               err = parse_reply_info_trace(&p, p+len, info, features);
                if (err < 0)
                        goto out_bad;
        }
@@ -258,7 +270,7 @@ static int parse_reply_info(struct ceph_msg *msg,
        /* extra */
        ceph_decode_32_safe(&p, end, len, bad);
        if (len > 0) {
-               err = parse_reply_info_extra(&p, p+len, info);
+               err = parse_reply_info_extra(&p, p+len, info, features);
                if (err < 0)
                        goto out_bad;
        }
@@ -654,7 +666,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                } else {
                        /* dir + name */
                        inode = dir;
-                       hash = req->r_dentry->d_name.hash;
+                       hash = ceph_dentry_hash(req->r_dentry);
                        is_hash = true;
                }
        }
@@ -1693,7 +1705,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg;
        int flags = 0;
 
-       req->r_mds = mds;
        req->r_attempts++;
        if (req->r_inode) {
                struct ceph_cap *cap =
@@ -1780,6 +1791,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
                goto finish;
        }
 
+       put_request_session(req);
+
        mds = __choose_mds(mdsc, req);
        if (mds < 0 ||
            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
@@ -1797,6 +1810,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
                        goto finish;
                }
        }
+       req->r_session = get_session(session);
+
        dout("do_request mds%d session %p state %s\n", mds, session,
             session_state_name(session->s_state));
        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
@@ -1809,7 +1824,6 @@ static int __do_request(struct ceph_mds_client *mdsc,
        }
 
        /* send request */
-       req->r_session = get_session(session);
        req->r_resend_mds = -1;   /* forget any previous mds hint */
 
        if (req->r_request_started == 0)   /* note request start time */
@@ -1863,7 +1877,6 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
                if (req->r_session &&
                    req->r_session->s_mds == mds) {
                        dout(" kicking tid %llu\n", req->r_tid);
-                       put_request_session(req);
                        __do_request(mdsc, req);
                }
        }
@@ -2056,8 +2069,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                        goto out;
                } else  {
                        struct ceph_inode_info *ci = ceph_inode(req->r_inode);
-                       struct ceph_cap *cap =
-                               ceph_get_cap_for_mds(ci, req->r_mds);;
+                       struct ceph_cap *cap = NULL;
+
+                       if (req->r_session)
+                               cap = ceph_get_cap_for_mds(ci,
+                                                  req->r_session->s_mds);
 
                        dout("already using auth");
                        if ((!cap || cap != ci->i_auth_cap) ||
@@ -2101,7 +2117,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
        dout("handle_reply tid %lld result %d\n", tid, result);
        rinfo = &req->r_reply_info;
-       err = parse_reply_info(msg, rinfo);
+       err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
        mutex_unlock(&mdsc->mutex);
 
        mutex_lock(&session->s_mutex);
index aabe563..4e3a9cc 100644 (file)
@@ -35,6 +35,7 @@ struct ceph_cap;
  */
 struct ceph_mds_reply_info_in {
        struct ceph_mds_reply_inode *in;
+       struct ceph_dir_layout dir_layout;
        u32 symlink_len;
        char *symlink;
        u32 xattr_len;
@@ -165,7 +166,6 @@ struct ceph_mds_request {
        struct ceph_mds_client *r_mdsc;
 
        int r_op;                    /* mds op code */
-       int r_mds;
 
        /* operation on what? */
        struct inode *r_inode;              /* arg1 */
index 08b460a..bf6f0f3 100644 (file)
@@ -428,7 +428,8 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
                goto fail;
        }
        fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-       fsc->client->supported_features |= CEPH_FEATURE_FLOCK;
+       fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
+               CEPH_FEATURE_DIRLAYOUTHASH;
        fsc->client->monc.want_mdsmap = 1;
 
        fsc->mount_options = fsopt;
@@ -443,13 +444,17 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
                goto fail_client;
 
        err = -ENOMEM;
-       fsc->wb_wq = create_workqueue("ceph-writeback");
+       /*
+        * The number of concurrent works can be high but they don't need
+        * to be processed in parallel, limit concurrency.
+        */
+       fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
        if (fsc->wb_wq == NULL)
                goto fail_bdi;
-       fsc->pg_inv_wq = create_singlethread_workqueue("ceph-pg-invalid");
+       fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
        if (fsc->pg_inv_wq == NULL)
                goto fail_wb_wq;
-       fsc->trunc_wq = create_singlethread_workqueue("ceph-trunc");
+       fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
        if (fsc->trunc_wq == NULL)
                goto fail_pg_inv_wq;
 
index 4553d88..20b907d 100644 (file)
@@ -239,6 +239,7 @@ struct ceph_inode_info {
        unsigned i_ceph_flags;
        unsigned long i_release_count;
 
+       struct ceph_dir_layout i_dir_layout;
        struct ceph_file_layout i_layout;
        char *i_symlink;
 
@@ -768,6 +769,7 @@ extern void ceph_dentry_lru_add(struct dentry *dn);
 extern void ceph_dentry_lru_touch(struct dentry *dn);
 extern void ceph_dentry_lru_del(struct dentry *dn);
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
+extern unsigned ceph_dentry_hash(struct dentry *dn);
 
 /*
  * our d_ops vary depending on whether the inode is live,
index c3c74ae..09dcc0c 100644 (file)
 #define CEPH_FEATURE_NOSRCADDR      (1<<1)
 #define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
 #define CEPH_FEATURE_FLOCK          (1<<3)
+#define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
+#define CEPH_FEATURE_MONNAMES       (1<<5)
+#define CEPH_FEATURE_RECONNECT_SEQ  (1<<6)
+#define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
 
 
 /*
@@ -55,10 +59,10 @@ struct ceph_file_layout {
        __le32 fl_stripe_count;    /* over this many objects */
        __le32 fl_object_size;     /* until objects are this big, then move to
                                      new objects */
-       __le32 fl_cas_hash;        /* 0 = none; 1 = sha256 */
+       __le32 fl_cas_hash;        /* UNUSED.  0 = none; 1 = sha256 */
 
        /* pg -> disk layout */
-       __le32 fl_object_stripe_unit;  /* for per-object parity, if any */
+       __le32 fl_object_stripe_unit;  /* UNUSED.  for per-object parity, if any */
 
        /* object -> pg layout */
        __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */
@@ -69,6 +73,12 @@ struct ceph_file_layout {
 
 int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
 
+struct ceph_dir_layout {
+       __u8   dl_dir_hash;   /* see ceph_hash.h for ids */
+       __u8   dl_unused1;
+       __u16  dl_unused2;
+       __u32  dl_unused3;
+} __attribute__ ((packed));
 
 /* crypto algorithms */
 #define CEPH_CRYPTO_NONE 0x0
@@ -457,7 +467,7 @@ struct ceph_mds_reply_inode {
        struct ceph_timespec rctime;
        struct ceph_frag_tree_head fragtree;  /* (must be at end of struct) */
 } __attribute__ ((packed));
-/* followed by frag array, then symlink string, then xattr blob */
+/* followed by frag array, symlink string, dir layout, xattr blob */
 
 /* reply_lease follows dname, and reply_inode */
 struct ceph_mds_reply_lease {
index a108b42..c3011be 100644 (file)
@@ -110,17 +110,12 @@ struct ceph_msg_pos {
 
 /*
  * ceph_connection state bit flags
- *
- * QUEUED and BUSY are used together to ensure that only a single
- * thread is currently opening, reading or writing data to the socket.
  */
 #define LOSSYTX         0  /* we can close channel or drop messages on errors */
 #define CONNECTING     1
 #define NEGOTIATING    2
 #define KEEPALIVE_PENDING      3
 #define WRITE_PENDING  4  /* we have data ready to send */
-#define QUEUED          5  /* there is work queued on this connection */
-#define BUSY            6  /* work is being done */
 #define STANDBY                8  /* no outgoing messages, socket closed.  we keep
                            * the ceph_connection around to maintain shared
                            * state with the peer. */
index 815ef88..0a1b53b 100644 (file)
@@ -1,5 +1,6 @@
 
 #include <linux/ceph/types.h>
+#include <linux/module.h>
 
 /*
  * Robert Jenkin's hash function.
@@ -104,6 +105,7 @@ unsigned ceph_str_hash(int type, const char *s, unsigned len)
                return -1;
        }
 }
+EXPORT_SYMBOL(ceph_str_hash);
 
 const char *ceph_str_hash_name(int type)
 {
@@ -116,3 +118,4 @@ const char *ceph_str_hash_name(int type)
                return "unknown";
        }
 }
+EXPORT_SYMBOL(ceph_str_hash_name);
index b6ff4a1..dff633d 100644 (file)
@@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq;
 
 int ceph_msgr_init(void)
 {
-       ceph_msgr_wq = create_workqueue("ceph-msgr");
+       ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
        if (!ceph_msgr_wq) {
                pr_err("msgr_init failed to create workqueue\n");
                return -ENOMEM;
@@ -1920,20 +1920,6 @@ bad_tag:
 /*
  * Atomically queue work on a connection.  Bump @con reference to
  * avoid races with connection teardown.
- *
- * There is some trickery going on with QUEUED and BUSY because we
- * only want a _single_ thread operating on each connection at any
- * point in time, but we want to use all available CPUs.
- *
- * The worker thread only proceeds if it can atomically set BUSY.  It
- * clears QUEUED and does it's thing.  When it thinks it's done, it
- * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
- * (tries again to set BUSY).
- *
- * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
- * try to queue work.  If that fails (work is already queued, or BUSY)
- * we give up (work also already being done or is queued) but leave QUEUED
- * set so that the worker thread will loop if necessary.
  */
 static void queue_con(struct ceph_connection *con)
 {
@@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
                return;
        }
 
-       set_bit(QUEUED, &con->state);
-       if (test_bit(BUSY, &con->state)) {
-               dout("queue_con %p - already BUSY\n", con);
-               con->ops->put(con);
-       } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
+       if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
                dout("queue_con %p - already queued\n", con);
                con->ops->put(con);
        } else {
@@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
-       int backoff = 0;
-
-more:
-       if (test_and_set_bit(BUSY, &con->state) != 0) {
-               dout("con_work %p BUSY already set\n", con);
-               goto out;
-       }
-       dout("con_work %p start, clearing QUEUED\n", con);
-       clear_bit(QUEUED, &con->state);
 
        mutex_lock(&con->mutex);
 
@@ -1994,28 +1967,13 @@ more:
            try_read(con) < 0 ||
            try_write(con) < 0) {
                mutex_unlock(&con->mutex);
-               backoff = 1;
                ceph_fault(con);     /* error/fault path */
                goto done_unlocked;
        }
 
 done:
        mutex_unlock(&con->mutex);
-
 done_unlocked:
-       clear_bit(BUSY, &con->state);
-       dout("con->state=%lu\n", con->state);
-       if (test_bit(QUEUED, &con->state)) {
-               if (!backoff || test_bit(OPENING, &con->state)) {
-                       dout("con_work %p QUEUED reset, looping\n", con);
-                       goto more;
-               }
-               dout("con_work %p QUEUED reset, but just faulted\n", con);
-               clear_bit(QUEUED, &con->state);
-       }
-       dout("con_work %p done\n", con);
-
-out:
        con->ops->put(con);
 }
 
index d73f3f6..71603ac 100644 (file)
@@ -605,8 +605,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
                        goto bad;
                }
                err = __decode_pool(p, end, pi);
-               if (err < 0)
+               if (err < 0) {
+                       kfree(pi);
                        goto bad;
+               }
                __insert_pg_pool(&map->pg_pools, pi);
        }