Merge branch 'nfs-for-3.2' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 4 Nov 2011 19:27:43 +0000 (12:27 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 4 Nov 2011 19:27:43 +0000 (12:27 -0700)
* 'nfs-for-3.2' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (25 commits)
  nfs: set vs_hidden on nfs4_callback_version4 (try #2)
  pnfs-obj: Support for RAID5 read-4-write interface.
  pnfs-obj: move to ore 03: Remove old raid engine
  pnfs-obj: move to ore 02: move to ORE
  pnfs-obj: move to ore 01: ore_layout & ore_components
  pnfs-obj: Rename objlayout_io_state => objlayout_io_res
  pnfs-obj: Get rid of objlayout_{alloc,free}_io_state
  pnfs-obj: Return PNFS_NOT_ATTEMPTED in case of read/write_pagelist
  pnfs-obj: Remove redundant EOF from objlayout_io_state
  nfs: Remove unused variable from write.c
  nfs: Fix unused variable warning from file.c
  NFS: Remove no-op less-than-zero checks on unsigned variables.
  NFS: Clean up nfs4_xdr_dec_secinfo()
  NFS: Fix documenting comment for nfs_create_request()
  NFS4: fix cb_recallany decode error
  nfs4: serialize layoutcommit
  SUNRPC: remove rpcbind clients destruction on module cleanup
  SUNRPC: remove rpcbind clients creation during service registering
  NFSd: call svc rpcbind cleanup explicitly
  SUNRPC: cleanup service destruction
  ...

20 files changed:
fs/exofs/Kconfig
fs/nfs/callback_xdr.c
fs/nfs/file.c
fs/nfs/nfs4filelayout.c
fs/nfs/nfs4proc.c
fs/nfs/nfs4xdr.c
fs/nfs/objlayout/objio_osd.c
fs/nfs/objlayout/objlayout.c
fs/nfs/objlayout/objlayout.h
fs/nfs/pagelist.c
fs/nfs/pnfs.c
fs/nfs/write.c
fs/nfsd/nfssvc.c
include/linux/nfs_fs.h
include/linux/sunrpc/clnt.h
include/linux/sunrpc/svc.h
net/sunrpc/auth_unix.c
net/sunrpc/rpcb_clnt.c
net/sunrpc/sunrpc_syms.c
net/sunrpc/svc.c

index fa9a286..da42f32 100644 (file)
@@ -5,7 +5,7 @@
 # selected by any of the users.
 config ORE
        tristate
-       depends on EXOFS_FS
+       depends on EXOFS_FS || PNFS_OBJLAYOUT
        select ASYNC_XOR
        default SCSI_OSD_ULD
 
index 918ad64..726e59a 100644 (file)
@@ -488,17 +488,18 @@ static __be32 decode_recallany_args(struct svc_rqst *rqstp,
                                      struct xdr_stream *xdr,
                                      struct cb_recallanyargs *args)
 {
-       __be32 *p;
+       uint32_t bitmap[2];
+       __be32 *p, status;
 
        args->craa_addr = svc_addr(rqstp);
        p = read_buf(xdr, 4);
        if (unlikely(p == NULL))
                return htonl(NFS4ERR_BADXDR);
        args->craa_objs_to_keep = ntohl(*p++);
-       p = read_buf(xdr, 4);
-       if (unlikely(p == NULL))
-               return htonl(NFS4ERR_BADXDR);
-       args->craa_type_mask = ntohl(*p);
+       status = decode_bitmap(xdr, bitmap);
+       if (unlikely(status))
+               return status;
+       args->craa_type_mask = bitmap[0];
 
        return 0;
 }
@@ -986,4 +987,5 @@ struct svc_version nfs4_callback_version4 = {
        .vs_proc = nfs4_callback_procedures1,
        .vs_xdrsize = NFS4_CALLBACK_XDRSIZE,
        .vs_dispatch = NULL,
+       .vs_hidden = 1,
 };
index 91c01f0..0a1f831 100644 (file)
@@ -137,11 +137,9 @@ nfs_file_open(struct inode *inode, struct file *filp)
 static int
 nfs_file_release(struct inode *inode, struct file *filp)
 {
-       struct dentry *dentry = filp->f_path.dentry;
-
        dprintk("NFS: release(%s/%s)\n",
-                       dentry->d_parent->d_name.name,
-                       dentry->d_name.name);
+                       filp->f_path.dentry->d_parent->d_name.name,
+                       filp->f_path.dentry->d_name.name);
 
        nfs_inc_stats(inode, NFSIOS_VFSRELEASE);
        return nfs_release(inode, filp);
@@ -228,14 +226,13 @@ nfs_file_read(struct kiocb *iocb, const struct iovec *iov,
        struct dentry * dentry = iocb->ki_filp->f_path.dentry;
        struct inode * inode = dentry->d_inode;
        ssize_t result;
-       size_t count = iov_length(iov, nr_segs);
 
        if (iocb->ki_filp->f_flags & O_DIRECT)
                return nfs_file_direct_read(iocb, iov, nr_segs, pos);
 
        dprintk("NFS: read(%s/%s, %lu@%lu)\n",
                dentry->d_parent->d_name.name, dentry->d_name.name,
-               (unsigned long) count, (unsigned long) pos);
+               (unsigned long) iov_length(iov, nr_segs), (unsigned long) pos);
 
        result = nfs_revalidate_mapping(inode, iocb->ki_filp->f_mapping);
        if (!result) {
index 0911941..12185aa 100644 (file)
@@ -449,9 +449,8 @@ filelayout_check_layout(struct pnfs_layout_hdr *lo,
 
        fl->dsaddr = dsaddr;
 
-       if (fl->first_stripe_index < 0 ||
-           fl->first_stripe_index >= dsaddr->stripe_count) {
-               dprintk("%s Bad first_stripe_index %d\n",
+       if (fl->first_stripe_index >= dsaddr->stripe_count) {
+               dprintk("%s Bad first_stripe_index %u\n",
                                __func__, fl->first_stripe_index);
                goto out_put;
        }
@@ -552,7 +551,7 @@ filelayout_decode_layout(struct pnfs_layout_hdr *flo,
 
        /* Note that a zero value for num_fh is legal for STRIPE_SPARSE.
         * Futher checking is done in filelayout_check_layout */
-       if (fl->num_fh < 0 || fl->num_fh >
+       if (fl->num_fh >
            max(NFS4_PNFS_MAX_STRIPE_CNT, NFS4_PNFS_MAX_MULTI_CNT))
                goto out_err;
 
index d2ae413..b60fddf 100644 (file)
@@ -5950,6 +5950,7 @@ static void nfs4_layoutcommit_release(void *calldata)
 {
        struct nfs4_layoutcommit_data *data = calldata;
        struct pnfs_layout_segment *lseg, *tmp;
+       unsigned long *bitlock = &NFS_I(data->args.inode)->flags;
 
        pnfs_cleanup_layoutcommit(data);
        /* Matched by references in pnfs_set_layoutcommit */
@@ -5959,6 +5960,11 @@ static void nfs4_layoutcommit_release(void *calldata)
                                       &lseg->pls_flags))
                        put_lseg(lseg);
        }
+
+       clear_bit_unlock(NFS_INO_LAYOUTCOMMITTING, bitlock);
+       smp_mb__after_clear_bit();
+       wake_up_bit(bitlock, NFS_INO_LAYOUTCOMMITTING);
+
        put_rpccred(data->cred);
        kfree(data);
 }
index 1dce12f..e6161b2 100644 (file)
@@ -6602,8 +6602,6 @@ static int nfs4_xdr_dec_secinfo(struct rpc_rqst *rqstp,
        if (status)
                goto out;
        status = decode_secinfo(xdr, res);
-       if (status)
-               goto out;
 out:
        return status;
 }
index d0cda12..c807ab9 100644 (file)
  */
 
 #include <linux/module.h>
-#include <scsi/osd_initiator.h>
+#include <scsi/osd_ore.h>
 
 #include "objlayout.h"
 
 #define NFSDBG_FACILITY         NFSDBG_PNFS_LD
 
-#define _LLU(x) ((unsigned long long)x)
-
-enum { BIO_MAX_PAGES_KMALLOC =
-               (PAGE_SIZE - sizeof(struct bio)) / sizeof(struct bio_vec),
-};
-
 struct objio_dev_ent {
        struct nfs4_deviceid_node id_node;
-       struct osd_dev *od;
+       struct ore_dev od;
 };
 
 static void
@@ -60,8 +54,8 @@ objio_free_deviceid_node(struct nfs4_deviceid_node *d)
 {
        struct objio_dev_ent *de = container_of(d, struct objio_dev_ent, id_node);
 
-       dprintk("%s: free od=%p\n", __func__, de->od);
-       osduld_put_device(de->od);
+       dprintk("%s: free od=%p\n", __func__, de->od.od);
+       osduld_put_device(de->od.od);
        kfree(de);
 }
 
@@ -98,12 +92,12 @@ _dev_list_add(const struct nfs_server *nfss,
                                nfss->pnfs_curr_ld,
                                nfss->nfs_client,
                                d_id);
-       de->od = od;
+       de->od.od = od;
 
        d = nfs4_insert_deviceid_node(&de->id_node);
        n = container_of(d, struct objio_dev_ent, id_node);
        if (n != de) {
-               dprintk("%s: Race with other n->od=%p\n", __func__, n->od);
+               dprintk("%s: Race with other n->od=%p\n", __func__, n->od.od);
                objio_free_deviceid_node(&de->id_node);
                de = n;
        }
@@ -111,28 +105,11 @@ _dev_list_add(const struct nfs_server *nfss,
        return de;
 }
 
-struct caps_buffers {
-       u8 caps_key[OSD_CRYPTO_KEYID_SIZE];
-       u8 creds[OSD_CAP_LEN];
-};
-
 struct objio_segment {
        struct pnfs_layout_segment lseg;
 
-       struct pnfs_osd_object_cred *comps;
-
-       unsigned mirrors_p1;
-       unsigned stripe_unit;
-       unsigned group_width;   /* Data stripe_units without integrity comps */
-       u64 group_depth;
-       unsigned group_count;
-
-       unsigned max_io_size;
-
-       unsigned comps_index;
-       unsigned num_comps;
-       /* variable length */
-       struct objio_dev_ent *ods[];
+       struct ore_layout layout;
+       struct ore_components oc;
 };
 
 static inline struct objio_segment *
@@ -141,59 +118,44 @@ OBJIO_LSEG(struct pnfs_layout_segment *lseg)
        return container_of(lseg, struct objio_segment, lseg);
 }
 
-struct objio_state;
-typedef ssize_t (*objio_done_fn)(struct objio_state *ios);
-
 struct objio_state {
        /* Generic layer */
-       struct objlayout_io_state ol_state;
-
-       struct objio_segment *layout;
-
-       struct kref kref;
-       objio_done_fn done;
-       void *private;
-
-       unsigned long length;
-       unsigned numdevs; /* Actually used devs in this IO */
-       /* A per-device variable array of size numdevs */
-       struct _objio_per_comp {
-               struct bio *bio;
-               struct osd_request *or;
-               unsigned long length;
-               u64 offset;
-               unsigned dev;
-       } per_dev[];
+       struct objlayout_io_res oir;
+
+       bool sync;
+       /*FIXME: Support for extra_bytes at ore_get_rw_state() */
+       struct ore_io_state *ios;
 };
 
 /* Send and wait for a get_device_info of devices in the layout,
    then look them up with the osd_initiator library */
-static struct objio_dev_ent *_device_lookup(struct pnfs_layout_hdr *pnfslay,
-                               struct objio_segment *objio_seg, unsigned comp,
-                               gfp_t gfp_flags)
+static int objio_devices_lookup(struct pnfs_layout_hdr *pnfslay,
+       struct objio_segment *objio_seg, unsigned c, struct nfs4_deviceid *d_id,
+       gfp_t gfp_flags)
 {
        struct pnfs_osd_deviceaddr *deviceaddr;
-       struct nfs4_deviceid *d_id;
        struct objio_dev_ent *ode;
        struct osd_dev *od;
        struct osd_dev_info odi;
        int err;
 
-       d_id = &objio_seg->comps[comp].oc_object_id.oid_device_id;
-
        ode = _dev_list_find(NFS_SERVER(pnfslay->plh_inode), d_id);
-       if (ode)
-               return ode;
+       if (ode) {
+               objio_seg->oc.ods[c] = &ode->od; /* must use container_of */
+               return 0;
+       }
 
        err = objlayout_get_deviceinfo(pnfslay, d_id, &deviceaddr, gfp_flags);
        if (unlikely(err)) {
                dprintk("%s: objlayout_get_deviceinfo dev(%llx:%llx) =>%d\n",
                        __func__, _DEVID_LO(d_id), _DEVID_HI(d_id), err);
-               return ERR_PTR(err);
+               return err;
        }
 
        odi.systemid_len = deviceaddr->oda_systemid.len;
        if (odi.systemid_len > sizeof(odi.systemid)) {
+               dprintk("%s: odi.systemid_len > sizeof(systemid=%zd)\n",
+                       __func__, sizeof(odi.systemid));
                err = -EINVAL;
                goto out;
        } else if (odi.systemid_len)
@@ -218,96 +180,53 @@ static struct objio_dev_ent *_device_lookup(struct pnfs_layout_hdr *pnfslay,
 
        ode = _dev_list_add(NFS_SERVER(pnfslay->plh_inode), d_id, od,
                            gfp_flags);
-
+       objio_seg->oc.ods[c] = &ode->od; /* must use container_of */
+       dprintk("Adding new dev_id(%llx:%llx)\n",
+               _DEVID_LO(d_id), _DEVID_HI(d_id));
 out:
-       dprintk("%s: return=%d\n", __func__, err);
        objlayout_put_deviceinfo(deviceaddr);
-       return err ? ERR_PTR(err) : ode;
+       return err;
 }
 
-static int objio_devices_lookup(struct pnfs_layout_hdr *pnfslay,
-       struct objio_segment *objio_seg,
-       gfp_t gfp_flags)
+static void copy_single_comp(struct ore_components *oc, unsigned c,
+                            struct pnfs_osd_object_cred *src_comp)
 {
-       unsigned i;
-       int err;
+       struct ore_comp *ocomp = &oc->comps[c];
 
-       /* lookup all devices */
-       for (i = 0; i < objio_seg->num_comps; i++) {
-               struct objio_dev_ent *ode;
+       WARN_ON(src_comp->oc_cap_key.cred_len > 0); /* libosd is NO_SEC only */
+       WARN_ON(src_comp->oc_cap.cred_len > sizeof(ocomp->cred));
 
-               ode = _device_lookup(pnfslay, objio_seg, i, gfp_flags);
-               if (unlikely(IS_ERR(ode))) {
-                       err = PTR_ERR(ode);
-                       goto out;
-               }
-               objio_seg->ods[i] = ode;
-       }
-       err = 0;
+       ocomp->obj.partition = src_comp->oc_object_id.oid_partition_id;
+       ocomp->obj.id = src_comp->oc_object_id.oid_object_id;
 
-out:
-       dprintk("%s: return=%d\n", __func__, err);
-       return err;
+       memcpy(ocomp->cred, src_comp->oc_cap.cred, sizeof(ocomp->cred));
 }
 
-static int _verify_data_map(struct pnfs_osd_layout *layout)
+int __alloc_objio_seg(unsigned numdevs, gfp_t gfp_flags,
+                      struct objio_segment **pseg)
 {
-       struct pnfs_osd_data_map *data_map = &layout->olo_map;
-       u64 stripe_length;
-       u32 group_width;
-
-/* FIXME: Only raid0 for now. if not go through MDS */
-       if (data_map->odm_raid_algorithm != PNFS_OSD_RAID_0) {
-               printk(KERN_ERR "Only RAID_0 for now\n");
-               return -ENOTSUPP;
-       }
-       if (0 != (data_map->odm_num_comps % (data_map->odm_mirror_cnt + 1))) {
-               printk(KERN_ERR "Data Map wrong, num_comps=%u mirrors=%u\n",
-                         data_map->odm_num_comps, data_map->odm_mirror_cnt);
-               return -EINVAL;
-       }
+       struct __alloc_objio_segment {
+               struct objio_segment olseg;
+               struct ore_dev *ods[numdevs];
+               struct ore_comp comps[numdevs];
+       } *aolseg;
 
-       if (data_map->odm_group_width)
-               group_width = data_map->odm_group_width;
-       else
-               group_width = data_map->odm_num_comps /
-                                               (data_map->odm_mirror_cnt + 1);
-
-       stripe_length = (u64)data_map->odm_stripe_unit * group_width;
-       if (stripe_length >= (1ULL << 32)) {
-               printk(KERN_ERR "Total Stripe length(0x%llx)"
-                         " >= 32bit is not supported\n", _LLU(stripe_length));
-               return -ENOTSUPP;
+       aolseg = kzalloc(sizeof(*aolseg), gfp_flags);
+       if (unlikely(!aolseg)) {
+               dprintk("%s: Faild allocation numdevs=%d size=%zd\n", __func__,
+                       numdevs, sizeof(*aolseg));
+               return -ENOMEM;
        }
 
-       if (0 != (data_map->odm_stripe_unit & ~PAGE_MASK)) {
-               printk(KERN_ERR "Stripe Unit(0x%llx)"
-                         " must be Multples of PAGE_SIZE(0x%lx)\n",
-                         _LLU(data_map->odm_stripe_unit), PAGE_SIZE);
-               return -ENOTSUPP;
-       }
+       aolseg->olseg.oc.numdevs = numdevs;
+       aolseg->olseg.oc.single_comp = EC_MULTPLE_COMPS;
+       aolseg->olseg.oc.comps = aolseg->comps;
+       aolseg->olseg.oc.ods = aolseg->ods;
 
+       *pseg = &aolseg->olseg;
        return 0;
 }
 
-static void copy_single_comp(struct pnfs_osd_object_cred *cur_comp,
-                            struct pnfs_osd_object_cred *src_comp,
-                            struct caps_buffers *caps_p)
-{
-       WARN_ON(src_comp->oc_cap_key.cred_len > sizeof(caps_p->caps_key));
-       WARN_ON(src_comp->oc_cap.cred_len > sizeof(caps_p->creds));
-
-       *cur_comp = *src_comp;
-
-       memcpy(caps_p->caps_key, src_comp->oc_cap_key.cred,
-              sizeof(caps_p->caps_key));
-       cur_comp->oc_cap_key.cred = caps_p->caps_key;
-
-       memcpy(caps_p->creds, src_comp->oc_cap.cred,
-              sizeof(caps_p->creds));
-       cur_comp->oc_cap.cred = caps_p->creds;
-}
-
 int objio_alloc_lseg(struct pnfs_layout_segment **outp,
        struct pnfs_layout_hdr *pnfslay,
        struct pnfs_layout_range *range,
@@ -317,59 +236,43 @@ int objio_alloc_lseg(struct pnfs_layout_segment **outp,
        struct objio_segment *objio_seg;
        struct pnfs_osd_xdr_decode_layout_iter iter;
        struct pnfs_osd_layout layout;
-       struct pnfs_osd_object_cred *cur_comp, src_comp;
-       struct caps_buffers *caps_p;
+       struct pnfs_osd_object_cred src_comp;
+       unsigned cur_comp;
        int err;
 
        err = pnfs_osd_xdr_decode_layout_map(&layout, &iter, xdr);
        if (unlikely(err))
                return err;
 
-       err = _verify_data_map(&layout);
+       err = __alloc_objio_seg(layout.olo_num_comps, gfp_flags, &objio_seg);
        if (unlikely(err))
                return err;
 
-       objio_seg = kzalloc(sizeof(*objio_seg) +
-                           sizeof(objio_seg->ods[0]) * layout.olo_num_comps +
-                           sizeof(*objio_seg->comps) * layout.olo_num_comps +
-                           sizeof(struct caps_buffers) * layout.olo_num_comps,
-                           gfp_flags);
-       if (!objio_seg)
-               return -ENOMEM;
+       objio_seg->layout.stripe_unit = layout.olo_map.odm_stripe_unit;
+       objio_seg->layout.group_width = layout.olo_map.odm_group_width;
+       objio_seg->layout.group_depth = layout.olo_map.odm_group_depth;
+       objio_seg->layout.mirrors_p1 = layout.olo_map.odm_mirror_cnt + 1;
+       objio_seg->layout.raid_algorithm = layout.olo_map.odm_raid_algorithm;
 
-       objio_seg->comps = (void *)(objio_seg->ods + layout.olo_num_comps);
-       cur_comp = objio_seg->comps;
-       caps_p = (void *)(cur_comp + layout.olo_num_comps);
-       while (pnfs_osd_xdr_decode_layout_comp(&src_comp, &iter, xdr, &err))
-               copy_single_comp(cur_comp++, &src_comp, caps_p++);
+       err = ore_verify_layout(layout.olo_map.odm_num_comps,
+                                         &objio_seg->layout);
        if (unlikely(err))
                goto err;
 
-       objio_seg->num_comps = layout.olo_num_comps;
-       objio_seg->comps_index = layout.olo_comps_index;
-       err = objio_devices_lookup(pnfslay, objio_seg, gfp_flags);
-       if (err)
-               goto err;
-
-       objio_seg->mirrors_p1 = layout.olo_map.odm_mirror_cnt + 1;
-       objio_seg->stripe_unit = layout.olo_map.odm_stripe_unit;
-       if (layout.olo_map.odm_group_width) {
-               objio_seg->group_width = layout.olo_map.odm_group_width;
-               objio_seg->group_depth = layout.olo_map.odm_group_depth;
-               objio_seg->group_count = layout.olo_map.odm_num_comps /
-                                               objio_seg->mirrors_p1 /
-                                               objio_seg->group_width;
-       } else {
-               objio_seg->group_width = layout.olo_map.odm_num_comps /
-                                               objio_seg->mirrors_p1;
-               objio_seg->group_depth = -1;
-               objio_seg->group_count = 1;
+       objio_seg->oc.first_dev = layout.olo_comps_index;
+       cur_comp = 0;
+       while (pnfs_osd_xdr_decode_layout_comp(&src_comp, &iter, xdr, &err)) {
+               copy_single_comp(&objio_seg->oc, cur_comp, &src_comp);
+               err = objio_devices_lookup(pnfslay, objio_seg, cur_comp,
+                                          &src_comp.oc_object_id.oid_device_id,
+                                          gfp_flags);
+               if (err)
+                       goto err;
+               ++cur_comp;
        }
-
-       /* Cache this calculation it will hit for every page */
-       objio_seg->max_io_size = (BIO_MAX_PAGES_KMALLOC * PAGE_SIZE -
-                                 objio_seg->stripe_unit) *
-                                objio_seg->group_width;
+       /* pnfs_osd_xdr_decode_layout_comp returns false on error */
+       if (unlikely(err))
+               goto err;
 
        *outp = &objio_seg->lseg;
        return 0;
@@ -386,43 +289,63 @@ void objio_free_lseg(struct pnfs_layout_segment *lseg)
        int i;
        struct objio_segment *objio_seg = OBJIO_LSEG(lseg);
 
-       for (i = 0; i < objio_seg->num_comps; i++) {
-               if (!objio_seg->ods[i])
+       for (i = 0; i < objio_seg->oc.numdevs; i++) {
+               struct ore_dev *od = objio_seg->oc.ods[i];
+               struct objio_dev_ent *ode;
+
+               if (!od)
                        break;
-               nfs4_put_deviceid_node(&objio_seg->ods[i]->id_node);
+               ode = container_of(od, typeof(*ode), od);
+               nfs4_put_deviceid_node(&ode->id_node);
        }
        kfree(objio_seg);
 }
 
-int objio_alloc_io_state(struct pnfs_layout_segment *lseg,
-                        struct objlayout_io_state **outp,
-                        gfp_t gfp_flags)
+static int
+objio_alloc_io_state(struct pnfs_layout_hdr *pnfs_layout_type, bool is_reading,
+       struct pnfs_layout_segment *lseg, struct page **pages, unsigned pgbase,
+       loff_t offset, size_t count, void *rpcdata, gfp_t gfp_flags,
+       struct objio_state **outp)
 {
        struct objio_segment *objio_seg = OBJIO_LSEG(lseg);
-       struct objio_state *ios;
-       const unsigned first_size = sizeof(*ios) +
-                               objio_seg->num_comps * sizeof(ios->per_dev[0]);
-       const unsigned sec_size = objio_seg->num_comps *
-                                               sizeof(ios->ol_state.ioerrs[0]);
-
-       ios = kzalloc(first_size + sec_size, gfp_flags);
-       if (unlikely(!ios))
+       struct ore_io_state *ios;
+       int ret;
+       struct __alloc_objio_state {
+               struct objio_state objios;
+               struct pnfs_osd_ioerr ioerrs[objio_seg->oc.numdevs];
+       } *aos;
+
+       aos = kzalloc(sizeof(*aos), gfp_flags);
+       if (unlikely(!aos))
                return -ENOMEM;
 
-       ios->layout = objio_seg;
-       ios->ol_state.ioerrs = ((void *)ios) + first_size;
-       ios->ol_state.num_comps = objio_seg->num_comps;
+       objlayout_init_ioerrs(&aos->objios.oir, objio_seg->oc.numdevs,
+                       aos->ioerrs, rpcdata, pnfs_layout_type);
 
-       *outp = &ios->ol_state;
+       ret = ore_get_rw_state(&objio_seg->layout, &objio_seg->oc, is_reading,
+                              offset, count, &ios);
+       if (unlikely(ret)) {
+               kfree(aos);
+               return ret;
+       }
+
+       ios->pages = pages;
+       ios->pgbase = pgbase;
+       ios->private = aos;
+       BUG_ON(ios->nr_pages > (pgbase + count + PAGE_SIZE - 1) >> PAGE_SHIFT);
+
+       aos->objios.sync = 0;
+       aos->objios.ios = ios;
+       *outp = &aos->objios;
        return 0;
 }
 
-void objio_free_io_state(struct objlayout_io_state *ol_state)
+void objio_free_result(struct objlayout_io_res *oir)
 {
-       struct objio_state *ios = container_of(ol_state, struct objio_state,
-                                              ol_state);
+       struct objio_state *objios = container_of(oir, struct objio_state, oir);
 
-       kfree(ios);
+       ore_put_io_state(objios->ios);
+       kfree(objios);
 }
 
 enum pnfs_osd_errno osd_pri_2_pnfs_err(enum osd_err_priority oep)
@@ -455,539 +378,152 @@ enum pnfs_osd_errno osd_pri_2_pnfs_err(enum osd_err_priority oep)
        }
 }
 
-static void _clear_bio(struct bio *bio)
+static void __on_dev_error(struct ore_io_state *ios,
+       struct ore_dev *od, unsigned dev_index, enum osd_err_priority oep,
+       u64 dev_offset, u64  dev_len)
 {
-       struct bio_vec *bv;
-       unsigned i;
-
-       __bio_for_each_segment(bv, bio, i, 0) {
-               unsigned this_count = bv->bv_len;
-
-               if (likely(PAGE_SIZE == this_count))
-                       clear_highpage(bv->bv_page);
-               else
-                       zero_user(bv->bv_page, bv->bv_offset, this_count);
-       }
-}
-
-static int _io_check(struct objio_state *ios, bool is_write)
-{
-       enum osd_err_priority oep = OSD_ERR_PRI_NO_ERROR;
-       int lin_ret = 0;
-       int i;
-
-       for (i = 0; i <  ios->numdevs; i++) {
-               struct osd_sense_info osi;
-               struct osd_request *or = ios->per_dev[i].or;
-               int ret;
-
-               if (!or)
-                       continue;
+       struct objio_state *objios = ios->private;
+       struct pnfs_osd_objid pooid;
+       struct objio_dev_ent *ode = container_of(od, typeof(*ode), od);
+       /* FIXME: what to do with more-then-one-group layouts. We need to
+        * translate from ore_io_state index to oc->comps index
+        */
+       unsigned comp = dev_index;
 
-               ret = osd_req_decode_sense(or, &osi);
-               if (likely(!ret))
-                       continue;
+       pooid.oid_device_id = ode->id_node.deviceid;
+       pooid.oid_partition_id = ios->oc->comps[comp].obj.partition;
+       pooid.oid_object_id = ios->oc->comps[comp].obj.id;
 
-               if (OSD_ERR_PRI_CLEAR_PAGES == osi.osd_err_pri) {
-                       /* start read offset passed endof file */
-                       BUG_ON(is_write);
-                       _clear_bio(ios->per_dev[i].bio);
-                       dprintk("%s: start read offset passed end of file "
-                               "offset=0x%llx, length=0x%lx\n", __func__,
-                               _LLU(ios->per_dev[i].offset),
-                               ios->per_dev[i].length);
-
-                       continue; /* we recovered */
-               }
-               objlayout_io_set_result(&ios->ol_state, i,
-                                       &ios->layout->comps[i].oc_object_id,
-                                       osd_pri_2_pnfs_err(osi.osd_err_pri),
-                                       ios->per_dev[i].offset,
-                                       ios->per_dev[i].length,
-                                       is_write);
-
-               if (osi.osd_err_pri >= oep) {
-                       oep = osi.osd_err_pri;
-                       lin_ret = ret;
-               }
-       }
-
-       return lin_ret;
-}
-
-/*
- * Common IO state helpers.
- */
-static void _io_free(struct objio_state *ios)
-{
-       unsigned i;
-
-       for (i = 0; i < ios->numdevs; i++) {
-               struct _objio_per_comp *per_dev = &ios->per_dev[i];
-
-               if (per_dev->or) {
-                       osd_end_request(per_dev->or);
-                       per_dev->or = NULL;
-               }
-
-               if (per_dev->bio) {
-                       bio_put(per_dev->bio);
-                       per_dev->bio = NULL;
-               }
-       }
-}
-
-struct osd_dev *_io_od(struct objio_state *ios, unsigned dev)
-{
-       unsigned min_dev = ios->layout->comps_index;
-       unsigned max_dev = min_dev + ios->layout->num_comps;
-
-       BUG_ON(dev < min_dev || max_dev <= dev);
-       return ios->layout->ods[dev - min_dev]->od;
-}
-
-struct _striping_info {
-       u64 obj_offset;
-       u64 group_length;
-       unsigned dev;
-       unsigned unit_off;
-};
-
-static void _calc_stripe_info(struct objio_state *ios, u64 file_offset,
-                             struct _striping_info *si)
-{
-       u32     stripe_unit = ios->layout->stripe_unit;
-       u32     group_width = ios->layout->group_width;
-       u64     group_depth = ios->layout->group_depth;
-       u32     U = stripe_unit * group_width;
-
-       u64     T = U * group_depth;
-       u64     S = T * ios->layout->group_count;
-       u64     M = div64_u64(file_offset, S);
-
-       /*
-       G = (L - (M * S)) / T
-       H = (L - (M * S)) % T
-       */
-       u64     LmodU = file_offset - M * S;
-       u32     G = div64_u64(LmodU, T);
-       u64     H = LmodU - G * T;
-
-       u32     N = div_u64(H, U);
-
-       div_u64_rem(file_offset, stripe_unit, &si->unit_off);
-       si->obj_offset = si->unit_off + (N * stripe_unit) +
-                                 (M * group_depth * stripe_unit);
-
-       /* "H - (N * U)" is just "H % U" so it's bound to u32 */
-       si->dev = (u32)(H - (N * U)) / stripe_unit + G * group_width;
-       si->dev *= ios->layout->mirrors_p1;
-
-       si->group_length = T - H;
-}
-
-static int _add_stripe_unit(struct objio_state *ios,  unsigned *cur_pg,
-               unsigned pgbase, struct _objio_per_comp *per_dev, int len,
-               gfp_t gfp_flags)
-{
-       unsigned pg = *cur_pg;
-       int cur_len = len;
-       struct request_queue *q =
-                       osd_request_queue(_io_od(ios, per_dev->dev));
-
-       if (per_dev->bio == NULL) {
-               unsigned pages_in_stripe = ios->layout->group_width *
-                                     (ios->layout->stripe_unit / PAGE_SIZE);
-               unsigned bio_size = (ios->ol_state.nr_pages + pages_in_stripe) /
-                                   ios->layout->group_width;
-
-               if (BIO_MAX_PAGES_KMALLOC < bio_size)
-                       bio_size = BIO_MAX_PAGES_KMALLOC;
-
-               per_dev->bio = bio_kmalloc(gfp_flags, bio_size);
-               if (unlikely(!per_dev->bio)) {
-                       dprintk("Faild to allocate BIO size=%u\n", bio_size);
-                       return -ENOMEM;
-               }
-       }
-
-       while (cur_len > 0) {
-               unsigned pglen = min_t(unsigned, PAGE_SIZE - pgbase, cur_len);
-               unsigned added_len;
-
-               BUG_ON(ios->ol_state.nr_pages <= pg);
-               cur_len -= pglen;
-
-               added_len = bio_add_pc_page(q, per_dev->bio,
-                                       ios->ol_state.pages[pg], pglen, pgbase);
-               if (unlikely(pglen != added_len))
-                       return -ENOMEM;
-               pgbase = 0;
-               ++pg;
-       }
-       BUG_ON(cur_len);
-
-       per_dev->length += len;
-       *cur_pg = pg;
-       return 0;
-}
-
-static int _prepare_one_group(struct objio_state *ios, u64 length,
-                             struct _striping_info *si, unsigned *last_pg,
-                             gfp_t gfp_flags)
-{
-       unsigned stripe_unit = ios->layout->stripe_unit;
-       unsigned mirrors_p1 = ios->layout->mirrors_p1;
-       unsigned devs_in_group = ios->layout->group_width * mirrors_p1;
-       unsigned dev = si->dev;
-       unsigned first_dev = dev - (dev % devs_in_group);
-       unsigned max_comp = ios->numdevs ? ios->numdevs - mirrors_p1 : 0;
-       unsigned cur_pg = *last_pg;
-       int ret = 0;
-
-       while (length) {
-               struct _objio_per_comp *per_dev = &ios->per_dev[dev - first_dev];
-               unsigned cur_len, page_off = 0;
-
-               if (!per_dev->length) {
-                       per_dev->dev = dev;
-                       if (dev < si->dev) {
-                               per_dev->offset = si->obj_offset + stripe_unit -
-                                                                  si->unit_off;
-                               cur_len = stripe_unit;
-                       } else if (dev == si->dev) {
-                               per_dev->offset = si->obj_offset;
-                               cur_len = stripe_unit - si->unit_off;
-                               page_off = si->unit_off & ~PAGE_MASK;
-                               BUG_ON(page_off &&
-                                     (page_off != ios->ol_state.pgbase));
-                       } else { /* dev > si->dev */
-                               per_dev->offset = si->obj_offset - si->unit_off;
-                               cur_len = stripe_unit;
-                       }
-
-                       if (max_comp < dev - first_dev)
-                               max_comp = dev - first_dev;
-               } else {
-                       cur_len = stripe_unit;
-               }
-               if (cur_len >= length)
-                       cur_len = length;
-
-               ret = _add_stripe_unit(ios, &cur_pg, page_off , per_dev,
-                                      cur_len, gfp_flags);
-               if (unlikely(ret))
-                       goto out;
-
-               dev += mirrors_p1;
-               dev = (dev % devs_in_group) + first_dev;
-
-               length -= cur_len;
-               ios->length += cur_len;
-       }
-out:
-       ios->numdevs = max_comp + mirrors_p1;
-       *last_pg = cur_pg;
-       return ret;
-}
-
-static int _io_rw_pagelist(struct objio_state *ios, gfp_t gfp_flags)
-{
-       u64 length = ios->ol_state.count;
-       u64 offset = ios->ol_state.offset;
-       struct _striping_info si;
-       unsigned last_pg = 0;
-       int ret = 0;
-
-       while (length) {
-               _calc_stripe_info(ios, offset, &si);
-
-               if (length < si.group_length)
-                       si.group_length = length;
-
-               ret = _prepare_one_group(ios, si.group_length, &si, &last_pg, gfp_flags);
-               if (unlikely(ret))
-                       goto out;
-
-               offset += si.group_length;
-               length -= si.group_length;
-       }
-
-out:
-       if (!ios->length)
-               return ret;
-
-       return 0;
-}
-
-static ssize_t _sync_done(struct objio_state *ios)
-{
-       struct completion *waiting = ios->private;
-
-       complete(waiting);
-       return 0;
-}
-
-static void _last_io(struct kref *kref)
-{
-       struct objio_state *ios = container_of(kref, struct objio_state, kref);
-
-       ios->done(ios);
-}
-
-static void _done_io(struct osd_request *or, void *p)
-{
-       struct objio_state *ios = p;
-
-       kref_put(&ios->kref, _last_io);
-}
-
-static ssize_t _io_exec(struct objio_state *ios)
-{
-       DECLARE_COMPLETION_ONSTACK(wait);
-       ssize_t status = 0; /* sync status */
-       unsigned i;
-       objio_done_fn saved_done_fn = ios->done;
-       bool sync = ios->ol_state.sync;
-
-       if (sync) {
-               ios->done = _sync_done;
-               ios->private = &wait;
-       }
-
-       kref_init(&ios->kref);
-
-       for (i = 0; i < ios->numdevs; i++) {
-               struct osd_request *or = ios->per_dev[i].or;
-
-               if (!or)
-                       continue;
-
-               kref_get(&ios->kref);
-               osd_execute_request_async(or, _done_io, ios);
-       }
-
-       kref_put(&ios->kref, _last_io);
-
-       if (sync) {
-               wait_for_completion(&wait);
-               status = saved_done_fn(ios);
-       }
-
-       return status;
+       objlayout_io_set_result(&objios->oir, comp,
+                               &pooid, osd_pri_2_pnfs_err(oep),
+                               dev_offset, dev_len, !ios->reading);
 }
 
 /*
  * read
  */
-static ssize_t _read_done(struct objio_state *ios)
+static void _read_done(struct ore_io_state *ios, void *private)
 {
+       struct objio_state *objios = private;
        ssize_t status;
-       int ret = _io_check(ios, false);
+       int ret = ore_check_io(ios, &__on_dev_error);
 
-       _io_free(ios);
+       /* FIXME: _io_free(ios) can we dealocate the libosd resources; */
 
        if (likely(!ret))
                status = ios->length;
        else
                status = ret;
 
-       objlayout_read_done(&ios->ol_state, status, ios->ol_state.sync);
-       return status;
+       objlayout_read_done(&objios->oir, status, objios->sync);
 }
 
-static int _read_mirrors(struct objio_state *ios, unsigned cur_comp)
+int objio_read_pagelist(struct nfs_read_data *rdata)
 {
-       struct osd_request *or = NULL;
-       struct _objio_per_comp *per_dev = &ios->per_dev[cur_comp];
-       unsigned dev = per_dev->dev;
-       struct pnfs_osd_object_cred *cred =
-                       &ios->layout->comps[cur_comp];
-       struct osd_obj_id obj = {
-               .partition = cred->oc_object_id.oid_partition_id,
-               .id = cred->oc_object_id.oid_object_id,
-       };
+       struct objio_state *objios;
        int ret;
 
-       or = osd_start_request(_io_od(ios, dev), GFP_KERNEL);
-       if (unlikely(!or)) {
-               ret = -ENOMEM;
-               goto err;
-       }
-       per_dev->or = or;
-
-       osd_req_read(or, &obj, per_dev->offset, per_dev->bio, per_dev->length);
-
-       ret = osd_finalize_request(or, 0, cred->oc_cap.cred, NULL);
-       if (ret) {
-               dprintk("%s: Faild to osd_finalize_request() => %d\n",
-                       __func__, ret);
-               goto err;
-       }
-
-       dprintk("%s:[%d] dev=%d obj=0x%llx start=0x%llx length=0x%lx\n",
-               __func__, cur_comp, dev, obj.id, _LLU(per_dev->offset),
-               per_dev->length);
-
-err:
-       return ret;
-}
-
-static ssize_t _read_exec(struct objio_state *ios)
-{
-       unsigned i;
-       int ret;
-
-       for (i = 0; i < ios->numdevs; i += ios->layout->mirrors_p1) {
-               if (!ios->per_dev[i].length)
-                       continue;
-               ret = _read_mirrors(ios, i);
-               if (unlikely(ret))
-                       goto err;
-       }
-
-       ios->done = _read_done;
-       return _io_exec(ios); /* In sync mode exec returns the io status */
-
-err:
-       _io_free(ios);
-       return ret;
-}
-
-ssize_t objio_read_pagelist(struct objlayout_io_state *ol_state)
-{
-       struct objio_state *ios = container_of(ol_state, struct objio_state,
-                                              ol_state);
-       int ret;
-
-       ret = _io_rw_pagelist(ios, GFP_KERNEL);
+       ret = objio_alloc_io_state(NFS_I(rdata->inode)->layout, true,
+                       rdata->lseg, rdata->args.pages, rdata->args.pgbase,
+                       rdata->args.offset, rdata->args.count, rdata,
+                       GFP_KERNEL, &objios);
        if (unlikely(ret))
                return ret;
 
-       return _read_exec(ios);
+       objios->ios->done = _read_done;
+       dprintk("%s: offset=0x%llx length=0x%x\n", __func__,
+               rdata->args.offset, rdata->args.count);
+       return ore_read(objios->ios);
 }
 
 /*
  * write
  */
-static ssize_t _write_done(struct objio_state *ios)
+static void _write_done(struct ore_io_state *ios, void *private)
 {
+       struct objio_state *objios = private;
        ssize_t status;
-       int ret = _io_check(ios, true);
+       int ret = ore_check_io(ios, &__on_dev_error);
 
-       _io_free(ios);
+       /* FIXME: _io_free(ios) can we dealocate the libosd resources; */
 
        if (likely(!ret)) {
                /* FIXME: should be based on the OSD's persistence model
                 * See OSD2r05 Section 4.13 Data persistence model */
-               ios->ol_state.committed = NFS_FILE_SYNC;
+               objios->oir.committed = NFS_FILE_SYNC;
                status = ios->length;
        } else {
                status = ret;
        }
 
-       objlayout_write_done(&ios->ol_state, status, ios->ol_state.sync);
-       return status;
+       objlayout_write_done(&objios->oir, status, objios->sync);
 }
 
-static int _write_mirrors(struct objio_state *ios, unsigned cur_comp)
+static struct page *__r4w_get_page(void *priv, u64 offset, bool *uptodate)
 {
-       struct _objio_per_comp *master_dev = &ios->per_dev[cur_comp];
-       unsigned dev = ios->per_dev[cur_comp].dev;
-       unsigned last_comp = cur_comp + ios->layout->mirrors_p1;
-       int ret;
-
-       for (; cur_comp < last_comp; ++cur_comp, ++dev) {
-               struct osd_request *or = NULL;
-               struct pnfs_osd_object_cred *cred =
-                                       &ios->layout->comps[cur_comp];
-               struct osd_obj_id obj = {
-                       .partition = cred->oc_object_id.oid_partition_id,
-                       .id = cred->oc_object_id.oid_object_id,
-               };
-               struct _objio_per_comp *per_dev = &ios->per_dev[cur_comp];
-               struct bio *bio;
-
-               or = osd_start_request(_io_od(ios, dev), GFP_NOFS);
-               if (unlikely(!or)) {
-                       ret = -ENOMEM;
-                       goto err;
-               }
-               per_dev->or = or;
-
-               if (per_dev != master_dev) {
-                       bio = bio_kmalloc(GFP_NOFS,
-                                         master_dev->bio->bi_max_vecs);
-                       if (unlikely(!bio)) {
-                               dprintk("Faild to allocate BIO size=%u\n",
-                                       master_dev->bio->bi_max_vecs);
-                               ret = -ENOMEM;
-                               goto err;
-                       }
-
-                       __bio_clone(bio, master_dev->bio);
-                       bio->bi_bdev = NULL;
-                       bio->bi_next = NULL;
-                       per_dev->bio = bio;
-                       per_dev->dev = dev;
-                       per_dev->length = master_dev->length;
-                       per_dev->offset =  master_dev->offset;
-               } else {
-                       bio = master_dev->bio;
-                       bio->bi_rw |= REQ_WRITE;
-               }
-
-               osd_req_write(or, &obj, per_dev->offset, bio, per_dev->length);
+       struct objio_state *objios = priv;
+       struct nfs_write_data *wdata = objios->oir.rpcdata;
+       pgoff_t index = offset / PAGE_SIZE;
+       struct page *page = find_get_page(wdata->inode->i_mapping, index);
 
-               ret = osd_finalize_request(or, 0, cred->oc_cap.cred, NULL);
-               if (ret) {
-                       dprintk("%s: Faild to osd_finalize_request() => %d\n",
-                               __func__, ret);
-                       goto err;
+       if (!page) {
+               page = find_or_create_page(wdata->inode->i_mapping,
+                                               index, GFP_NOFS);
+               if (unlikely(!page)) {
+                       dprintk("%s: grab_cache_page Failed index=0x%lx\n",
+                               __func__, index);
+                       return NULL;
                }
-
-               dprintk("%s:[%d] dev=%d obj=0x%llx start=0x%llx length=0x%lx\n",
-                       __func__, cur_comp, dev, obj.id, _LLU(per_dev->offset),
-                       per_dev->length);
+               unlock_page(page);
        }
+       if (PageDirty(page) || PageWriteback(page))
+               *uptodate = true;
+       else
+               *uptodate = PageUptodate(page);
+       dprintk("%s: index=0x%lx uptodate=%d\n", __func__, index, *uptodate);
+       return page;
+}
 
-err:
-       return ret;
+static void __r4w_put_page(void *priv, struct page *page)
+{
+       dprintk("%s: index=0x%lx\n", __func__, page->index);
+       page_cache_release(page);
+       return;
 }
 
-static ssize_t _write_exec(struct objio_state *ios)
+static const struct _ore_r4w_op _r4w_op = {
+       .get_page = &__r4w_get_page,
+       .put_page = &__r4w_put_page,
+};
+
+int objio_write_pagelist(struct nfs_write_data *wdata, int how)
 {
-       unsigned i;
+       struct objio_state *objios;
        int ret;
 
-       for (i = 0; i < ios->numdevs; i += ios->layout->mirrors_p1) {
-               if (!ios->per_dev[i].length)
-                       continue;
-               ret = _write_mirrors(ios, i);
-               if (unlikely(ret))
-                       goto err;
-       }
-
-       ios->done = _write_done;
-       return _io_exec(ios); /* In sync mode exec returns the io->status */
+       ret = objio_alloc_io_state(NFS_I(wdata->inode)->layout, false,
+                       wdata->lseg, wdata->args.pages, wdata->args.pgbase,
+                       wdata->args.offset, wdata->args.count, wdata, GFP_NOFS,
+                       &objios);
+       if (unlikely(ret))
+               return ret;
 
-err:
-       _io_free(ios);
-       return ret;
-}
+       objios->sync = 0 != (how & FLUSH_SYNC);
+       objios->ios->r4w = &_r4w_op;
 
-ssize_t objio_write_pagelist(struct objlayout_io_state *ol_state, bool stable)
-{
-       struct objio_state *ios = container_of(ol_state, struct objio_state,
-                                              ol_state);
-       int ret;
+       if (!objios->sync)
+               objios->ios->done = _write_done;
 
-       /* TODO: ios->stable = stable; */
-       ret = _io_rw_pagelist(ios, GFP_NOFS);
+       dprintk("%s: offset=0x%llx length=0x%x\n", __func__,
+               wdata->args.offset, wdata->args.count);
+       ret = ore_write(objios->ios);
        if (unlikely(ret))
                return ret;
 
-       return _write_exec(ios);
+       if (objios->sync)
+               _write_done(objios->ios, objios);
+
+       return 0;
 }
 
 static bool objio_pg_test(struct nfs_pageio_descriptor *pgio,
@@ -997,7 +533,7 @@ static bool objio_pg_test(struct nfs_pageio_descriptor *pgio,
                return false;
 
        return pgio->pg_count + req->wb_bytes <=
-                       OBJIO_LSEG(pgio->pg_lseg)->max_io_size;
+                       OBJIO_LSEG(pgio->pg_lseg)->layout.max_io_length;
 }
 
 static const struct nfs_pageio_ops objio_pg_read_ops = {
index 1d06f8e..72074e3 100644 (file)
@@ -156,77 +156,39 @@ last_byte_offset(u64 start, u64 len)
        return end > start ? end - 1 : NFS4_MAX_UINT64;
 }
 
-static struct objlayout_io_state *
-objlayout_alloc_io_state(struct pnfs_layout_hdr *pnfs_layout_type,
-                       struct page **pages,
-                       unsigned pgbase,
-                       loff_t offset,
-                       size_t count,
-                       struct pnfs_layout_segment *lseg,
-                       void *rpcdata,
-                       gfp_t gfp_flags)
+void _fix_verify_io_params(struct pnfs_layout_segment *lseg,
+                          struct page ***p_pages, unsigned *p_pgbase,
+                          u64 offset, unsigned long count)
 {
-       struct objlayout_io_state *state;
        u64 lseg_end_offset;
 
-       dprintk("%s: allocating io_state\n", __func__);
-       if (objio_alloc_io_state(lseg, &state, gfp_flags))
-               return NULL;
-
        BUG_ON(offset < lseg->pls_range.offset);
        lseg_end_offset = end_offset(lseg->pls_range.offset,
                                     lseg->pls_range.length);
        BUG_ON(offset >= lseg_end_offset);
-       if (offset + count > lseg_end_offset) {
-               count = lseg->pls_range.length -
-                               (offset - lseg->pls_range.offset);
-               dprintk("%s: truncated count %Zd\n", __func__, count);
-       }
+       WARN_ON(offset + count > lseg_end_offset);
 
-       if (pgbase > PAGE_SIZE) {
-               pages += pgbase >> PAGE_SHIFT;
-               pgbase &= ~PAGE_MASK;
+       if (*p_pgbase > PAGE_SIZE) {
+               dprintk("%s: pgbase(0x%x) > PAGE_SIZE\n", __func__, *p_pgbase);
+               *p_pages += *p_pgbase >> PAGE_SHIFT;
+               *p_pgbase &= ~PAGE_MASK;
        }
-
-       INIT_LIST_HEAD(&state->err_list);
-       state->lseg = lseg;
-       state->rpcdata = rpcdata;
-       state->pages = pages;
-       state->pgbase = pgbase;
-       state->nr_pages = (pgbase + count + PAGE_SIZE - 1) >> PAGE_SHIFT;
-       state->offset = offset;
-       state->count = count;
-       state->sync = 0;
-
-       return state;
-}
-
-static void
-objlayout_free_io_state(struct objlayout_io_state *state)
-{
-       dprintk("%s: freeing io_state\n", __func__);
-       if (unlikely(!state))
-               return;
-
-       objio_free_io_state(state);
 }
 
 /*
  * I/O done common code
  */
 static void
-objlayout_iodone(struct objlayout_io_state *state)
+objlayout_iodone(struct objlayout_io_res *oir)
 {
-       dprintk("%s: state %p status\n", __func__, state);
-
-       if (likely(state->status >= 0)) {
-               objlayout_free_io_state(state);
+       if (likely(oir->status >= 0)) {
+               objio_free_result(oir);
        } else {
-               struct objlayout *objlay = OBJLAYOUT(state->lseg->pls_layout);
+               struct objlayout *objlay = oir->objlay;
 
                spin_lock(&objlay->lock);
                objlay->delta_space_valid = OBJ_DSU_INVALID;
-               list_add(&objlay->err_list, &state->err_list);
+               list_add(&objlay->err_list, &oir->err_list);
                spin_unlock(&objlay->lock);
        }
 }
@@ -238,13 +200,13 @@ objlayout_iodone(struct objlayout_io_state *state)
  * the error for later reporting at layout-return.
  */
 void
-objlayout_io_set_result(struct objlayout_io_state *state, unsigned index,
+objlayout_io_set_result(struct objlayout_io_res *oir, unsigned index,
                        struct pnfs_osd_objid *pooid, int osd_error,
                        u64 offset, u64 length, bool is_write)
 {
-       struct pnfs_osd_ioerr *ioerr = &state->ioerrs[index];
+       struct pnfs_osd_ioerr *ioerr = &oir->ioerrs[index];
 
-       BUG_ON(index >= state->num_comps);
+       BUG_ON(index >= oir->num_comps);
        if (osd_error) {
                ioerr->oer_component = *pooid;
                ioerr->oer_comp_offset = offset;
@@ -285,21 +247,18 @@ static void _rpc_read_complete(struct work_struct *work)
 }
 
 void
-objlayout_read_done(struct objlayout_io_state *state, ssize_t status, bool sync)
+objlayout_read_done(struct objlayout_io_res *oir, ssize_t status, bool sync)
 {
-       int eof = state->eof;
-       struct nfs_read_data *rdata;
+       struct nfs_read_data *rdata = oir->rpcdata;
 
-       state->status = status;
-       dprintk("%s: Begin status=%zd eof=%d\n", __func__, status, eof);
-       rdata = state->rpcdata;
-       rdata->task.tk_status = status;
-       if (status >= 0) {
+       oir->status = rdata->task.tk_status = status;
+       if (status >= 0)
                rdata->res.count = status;
-               rdata->res.eof = eof;
-       }
-       objlayout_iodone(state);
-       /* must not use state after this point */
+       objlayout_iodone(oir);
+       /* must not use oir after this point */
+
+       dprintk("%s: Return status=%zd eof=%d sync=%d\n", __func__,
+               status, rdata->res.eof, sync);
 
        if (sync)
                pnfs_ld_read_done(rdata);
@@ -317,40 +276,36 @@ objlayout_read_pagelist(struct nfs_read_data *rdata)
 {
        loff_t offset = rdata->args.offset;
        size_t count = rdata->args.count;
-       struct objlayout_io_state *state;
-       ssize_t status = 0;
+       int err;
        loff_t eof;
 
-       dprintk("%s: Begin inode %p offset %llu count %d\n",
-               __func__, rdata->inode, offset, (int)count);
-
        eof = i_size_read(rdata->inode);
        if (unlikely(offset + count > eof)) {
                if (offset >= eof) {
-                       status = 0;
+                       err = 0;
                        rdata->res.count = 0;
                        rdata->res.eof = 1;
+                       /*FIXME: do we need to call pnfs_ld_read_done() */
                        goto out;
                }
                count = eof - offset;
        }
 
-       state = objlayout_alloc_io_state(NFS_I(rdata->inode)->layout,
-                                        rdata->args.pages, rdata->args.pgbase,
-                                        offset, count,
-                                        rdata->lseg, rdata,
-                                        GFP_KERNEL);
-       if (unlikely(!state)) {
-               status = -ENOMEM;
-               goto out;
-       }
+       rdata->res.eof = (offset + count) >= eof;
+       _fix_verify_io_params(rdata->lseg, &rdata->args.pages,
+                             &rdata->args.pgbase,
+                             rdata->args.offset, rdata->args.count);
 
-       state->eof = state->offset + state->count >= eof;
+       dprintk("%s: inode(%lx) offset 0x%llx count 0x%Zx eof=%d\n",
+               __func__, rdata->inode->i_ino, offset, count, rdata->res.eof);
 
-       status = objio_read_pagelist(state);
+       err = objio_read_pagelist(rdata);
  out:
-       dprintk("%s: Return status %Zd\n", __func__, status);
-       rdata->pnfs_error = status;
+       if (unlikely(err)) {
+               rdata->pnfs_error = err;
+               dprintk("%s: Returned Error %d\n", __func__, err);
+               return PNFS_NOT_ATTEMPTED;
+       }
        return PNFS_ATTEMPTED;
 }
 
@@ -371,26 +326,20 @@ static void _rpc_write_complete(struct work_struct *work)
 }
 
 void
-objlayout_write_done(struct objlayout_io_state *state, ssize_t status,
-                    bool sync)
+objlayout_write_done(struct objlayout_io_res *oir, ssize_t status, bool sync)
 {
-       struct nfs_write_data *wdata;
+       struct nfs_write_data *wdata = oir->rpcdata;
 
-       dprintk("%s: Begin\n", __func__);
-       wdata = state->rpcdata;
-       state->status = status;
-       wdata->task.tk_status = status;
+       oir->status = wdata->task.tk_status = status;
        if (status >= 0) {
                wdata->res.count = status;
-               wdata->verf.committed = state->committed;
-               dprintk("%s: Return status %d committed %d\n",
-                       __func__, wdata->task.tk_status,
-                       wdata->verf.committed);
-       } else
-               dprintk("%s: Return status %d\n",
-                       __func__, wdata->task.tk_status);
-       objlayout_iodone(state);
-       /* must not use state after this point */
+               wdata->verf.committed = oir->committed;
+       }
+       objlayout_iodone(oir);
+       /* must not use oir after this point */
+
+       dprintk("%s: Return status %zd committed %d sync=%d\n", __func__,
+               status, wdata->verf.committed, sync);
 
        if (sync)
                pnfs_ld_write_done(wdata);
@@ -407,30 +356,18 @@ enum pnfs_try_status
 objlayout_write_pagelist(struct nfs_write_data *wdata,
                         int how)
 {
-       struct objlayout_io_state *state;
-       ssize_t status;
-
-       dprintk("%s: Begin inode %p offset %llu count %u\n",
-               __func__, wdata->inode, wdata->args.offset, wdata->args.count);
-
-       state = objlayout_alloc_io_state(NFS_I(wdata->inode)->layout,
-                                        wdata->args.pages,
-                                        wdata->args.pgbase,
-                                        wdata->args.offset,
-                                        wdata->args.count,
-                                        wdata->lseg, wdata,
-                                        GFP_NOFS);
-       if (unlikely(!state)) {
-               status = -ENOMEM;
-               goto out;
-       }
+       int err;
 
-       state->sync = how & FLUSH_SYNC;
+       _fix_verify_io_params(wdata->lseg, &wdata->args.pages,
+                             &wdata->args.pgbase,
+                             wdata->args.offset, wdata->args.count);
 
-       status = objio_write_pagelist(state, how & FLUSH_STABLE);
- out:
-       dprintk("%s: Return status %Zd\n", __func__, status);
-       wdata->pnfs_error = status;
+       err = objio_write_pagelist(wdata, how);
+       if (unlikely(err)) {
+               wdata->pnfs_error = err;
+               dprintk("%s: Returned Error %d\n", __func__, err);
+               return PNFS_NOT_ATTEMPTED;
+       }
        return PNFS_ATTEMPTED;
 }
 
@@ -537,14 +474,14 @@ merge_ioerr(struct pnfs_osd_ioerr *dest_err,
 static void
 encode_accumulated_error(struct objlayout *objlay, __be32 *p)
 {
-       struct objlayout_io_state *state, *tmp;
+       struct objlayout_io_res *oir, *tmp;
        struct pnfs_osd_ioerr accumulated_err = {.oer_errno = 0};
 
-       list_for_each_entry_safe(state, tmp, &objlay->err_list, err_list) {
+       list_for_each_entry_safe(oir, tmp, &objlay->err_list, err_list) {
                unsigned i;
 
-               for (i = 0; i < state->num_comps; i++) {
-                       struct pnfs_osd_ioerr *ioerr = &state->ioerrs[i];
+               for (i = 0; i < oir->num_comps; i++) {
+                       struct pnfs_osd_ioerr *ioerr = &oir->ioerrs[i];
 
                        if (!ioerr->oer_errno)
                                continue;
@@ -563,8 +500,8 @@ encode_accumulated_error(struct objlayout *objlay, __be32 *p)
 
                        merge_ioerr(&accumulated_err, ioerr);
                }
-               list_del(&state->err_list);
-               objlayout_free_io_state(state);
+               list_del(&oir->err_list);
+               objio_free_result(oir);
        }
 
        pnfs_osd_xdr_encode_ioerr(p, &accumulated_err);
@@ -576,7 +513,7 @@ objlayout_encode_layoutreturn(struct pnfs_layout_hdr *pnfslay,
                              const struct nfs4_layoutreturn_args *args)
 {
        struct objlayout *objlay = OBJLAYOUT(pnfslay);
-       struct objlayout_io_state *state, *tmp;
+       struct objlayout_io_res *oir, *tmp;
        __be32 *start;
 
        dprintk("%s: Begin\n", __func__);
@@ -585,13 +522,13 @@ objlayout_encode_layoutreturn(struct pnfs_layout_hdr *pnfslay,
 
        spin_lock(&objlay->lock);
 
-       list_for_each_entry_safe(state, tmp, &objlay->err_list, err_list) {
+       list_for_each_entry_safe(oir, tmp, &objlay->err_list, err_list) {
                __be32 *last_xdr = NULL, *p;
                unsigned i;
                int res = 0;
 
-               for (i = 0; i < state->num_comps; i++) {
-                       struct pnfs_osd_ioerr *ioerr = &state->ioerrs[i];
+               for (i = 0; i < oir->num_comps; i++) {
+                       struct pnfs_osd_ioerr *ioerr = &oir->ioerrs[i];
 
                        if (!ioerr->oer_errno)
                                continue;
@@ -615,7 +552,7 @@ objlayout_encode_layoutreturn(struct pnfs_layout_hdr *pnfslay,
                        }
 
                        last_xdr = p;
-                       pnfs_osd_xdr_encode_ioerr(p, &state->ioerrs[i]);
+                       pnfs_osd_xdr_encode_ioerr(p, &oir->ioerrs[i]);
                }
 
                /* TODO: use xdr_write_pages */
@@ -631,8 +568,8 @@ objlayout_encode_layoutreturn(struct pnfs_layout_hdr *pnfslay,
                        encode_accumulated_error(objlay, last_xdr);
                        goto loop_done;
                }
-               list_del(&state->err_list);
-               objlayout_free_io_state(state);
+               list_del(&oir->err_list);
+               objio_free_result(oir);
        }
 loop_done:
        spin_unlock(&objlay->lock);
index a8244c8..8ec3472 100644 (file)
@@ -74,19 +74,11 @@ OBJLAYOUT(struct pnfs_layout_hdr *lo)
  * per-I/O operation state
  * embedded in objects provider io_state data structure
  */
-struct objlayout_io_state {
-       struct pnfs_layout_segment *lseg;
-
-       struct page **pages;
-       unsigned pgbase;
-       unsigned nr_pages;
-       unsigned long count;
-       loff_t offset;
-       bool sync;
+struct objlayout_io_res {
+       struct objlayout *objlay;
 
        void *rpcdata;
        int status;             /* res */
-       int eof;                /* res */
        int committed;          /* res */
 
        /* Error reporting (layout_return) */
@@ -100,6 +92,18 @@ struct objlayout_io_state {
        struct pnfs_osd_ioerr *ioerrs;
 };
 
+static inline
+void objlayout_init_ioerrs(struct objlayout_io_res *oir, unsigned num_comps,
+                       struct pnfs_osd_ioerr *ioerrs, void *rpcdata,
+                       struct pnfs_layout_hdr *pnfs_layout_type)
+{
+       oir->objlay = OBJLAYOUT(pnfs_layout_type);
+       oir->rpcdata = rpcdata;
+       INIT_LIST_HEAD(&oir->err_list);
+       oir->num_comps = num_comps;
+       oir->ioerrs = ioerrs;
+}
+
 /*
  * Raid engine I/O API
  */
@@ -110,28 +114,24 @@ extern int objio_alloc_lseg(struct pnfs_layout_segment **outp,
        gfp_t gfp_flags);
 extern void objio_free_lseg(struct pnfs_layout_segment *lseg);
 
-extern int objio_alloc_io_state(
-       struct pnfs_layout_segment *lseg,
-       struct objlayout_io_state **outp,
-       gfp_t gfp_flags);
-extern void objio_free_io_state(struct objlayout_io_state *state);
+/* objio_free_result will free these @oir structs recieved from
+ * objlayout_{read,write}_done
+ */
+extern void objio_free_result(struct objlayout_io_res *oir);
 
-extern ssize_t objio_read_pagelist(struct objlayout_io_state *ol_state);
-extern ssize_t objio_write_pagelist(struct objlayout_io_state *ol_state,
-                                   bool stable);
+extern int objio_read_pagelist(struct nfs_read_data *rdata);
+extern int objio_write_pagelist(struct nfs_write_data *wdata, int how);
 
 /*
  * callback API
  */
-extern void objlayout_io_set_result(struct objlayout_io_state *state,
+extern void objlayout_io_set_result(struct objlayout_io_res *oir,
                        unsigned index, struct pnfs_osd_objid *pooid,
                        int osd_error, u64 offset, u64 length, bool is_write);
 
 static inline void
-objlayout_add_delta_space_used(struct objlayout_io_state *state, s64 space_used)
+objlayout_add_delta_space_used(struct objlayout *objlay, s64 space_used)
 {
-       struct objlayout *objlay = OBJLAYOUT(state->lseg->pls_layout);
-
        /* If one of the I/Os errored out and the delta_space_used was
         * invalid we render the complete report as invalid. Protocol mandate
         * the DSU be accurate or not reported.
@@ -144,9 +144,9 @@ objlayout_add_delta_space_used(struct objlayout_io_state *state, s64 space_used)
        spin_unlock(&objlay->lock);
 }
 
-extern void objlayout_read_done(struct objlayout_io_state *state,
+extern void objlayout_read_done(struct objlayout_io_res *oir,
                                ssize_t status, bool sync);
-extern void objlayout_write_done(struct objlayout_io_state *state,
+extern void objlayout_write_done(struct objlayout_io_res *oir,
                                 ssize_t status, bool sync);
 
 extern int objlayout_get_deviceinfo(struct pnfs_layout_hdr *pnfslay,
index b60970c..0a5ff5c 100644 (file)
@@ -41,7 +41,7 @@ nfs_page_free(struct nfs_page *p)
 
 /**
  * nfs_create_request - Create an NFS read/write request.
- * @file: file descriptor to use
+ * @ctx: open context to use
  * @inode: inode to which the request is attached
  * @page: page to write
  * @offset: starting offset within the page for the write
index ee73d9a..a2478bc 100644 (file)
@@ -1443,17 +1443,31 @@ pnfs_layoutcommit_inode(struct inode *inode, bool sync)
        /* Note kzalloc ensures data->res.seq_res.sr_slot == NULL */
        data = kzalloc(sizeof(*data), GFP_NOFS);
        if (!data) {
-               mark_inode_dirty_sync(inode);
                status = -ENOMEM;
                goto out;
        }
 
+       if (!test_bit(NFS_INO_LAYOUTCOMMIT, &nfsi->flags))
+               goto out_free;
+
+       if (test_and_set_bit(NFS_INO_LAYOUTCOMMITTING, &nfsi->flags)) {
+               if (!sync) {
+                       status = -EAGAIN;
+                       goto out_free;
+               }
+               status = wait_on_bit_lock(&nfsi->flags, NFS_INO_LAYOUTCOMMITTING,
+                                       nfs_wait_bit_killable, TASK_KILLABLE);
+               if (status)
+                       goto out_free;
+       }
+
        INIT_LIST_HEAD(&data->lseg_list);
        spin_lock(&inode->i_lock);
        if (!test_and_clear_bit(NFS_INO_LAYOUTCOMMIT, &nfsi->flags)) {
+               clear_bit(NFS_INO_LAYOUTCOMMITTING, &nfsi->flags);
                spin_unlock(&inode->i_lock);
-               kfree(data);
-               goto out;
+               wake_up_bit(&nfsi->flags, NFS_INO_LAYOUTCOMMITTING);
+               goto out_free;
        }
 
        pnfs_list_write_lseg(inode, &data->lseg_list);
@@ -1475,6 +1489,11 @@ pnfs_layoutcommit_inode(struct inode *inode, bool sync)
 
        status = nfs4_proc_layoutcommit(data, sync);
 out:
+       if (status)
+               mark_inode_dirty_sync(inode);
        dprintk("<-- %s status %d\n", __func__, status);
        return status;
+out_free:
+       kfree(data);
+       goto out;
 }
index 2219c88..b016b8a 100644 (file)
@@ -1243,7 +1243,6 @@ void nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
 {
        struct nfs_writeargs    *argp = &data->args;
        struct nfs_writeres     *resp = &data->res;
-       struct nfs_server       *server = NFS_SERVER(data->inode);
        int status;
 
        dprintk("NFS: %5u nfs_writeback_done (status %d)\n",
@@ -1277,7 +1276,7 @@ void nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
                if (time_before(complain, jiffies)) {
                        dprintk("NFS:       faulty NFS server %s:"
                                " (committed = %d) != (stable = %d)\n",
-                               server->nfs_client->cl_hostname,
+                               NFS_SERVER(data->inode)->nfs_client->cl_hostname,
                                resp->verf->committed, argp->stable);
                        complain = jiffies + 300 * HZ;
                }
index dc5a1bf..52cd976 100644 (file)
@@ -256,6 +256,8 @@ static void nfsd_last_thread(struct svc_serv *serv)
        nfsd_serv = NULL;
        nfsd_shutdown();
 
+       svc_rpcb_cleanup(serv);
+
        printk(KERN_WARNING "nfsd: last server has exited, flushing export "
                            "cache\n");
        nfsd_export_flush();
index 60a137b..ab2c634 100644 (file)
@@ -229,6 +229,7 @@ struct nfs_inode {
 #define NFS_INO_COMMIT         (7)             /* inode is committing unstable writes */
 #define NFS_INO_PNFS_COMMIT    (8)             /* use pnfs code for commit */
 #define NFS_INO_LAYOUTCOMMIT   (9)             /* layoutcommit required */
+#define NFS_INO_LAYOUTCOMMITTING (10)          /* layoutcommit inflight */
 
 static inline struct nfs_inode *NFS_I(const struct inode *inode)
 {
index 492486a..3d8f9c4 100644 (file)
@@ -136,6 +136,8 @@ void                rpc_shutdown_client(struct rpc_clnt *);
 void           rpc_release_client(struct rpc_clnt *);
 void           rpc_task_release_client(struct rpc_task *);
 
+int            rpcb_create_local(void);
+void           rpcb_put_local(void);
 int            rpcb_register(u32, u32, int, unsigned short);
 int            rpcb_v4_register(const u32 program, const u32 version,
                                 const struct sockaddr *address,
index d8d5d93..35b37b1 100644 (file)
@@ -413,6 +413,7 @@ struct svc_procedure {
 /*
  * Function prototypes.
  */
+void svc_rpcb_cleanup(struct svc_serv *serv);
 struct svc_serv *svc_create(struct svc_program *, unsigned int,
                            void (*shutdown)(struct svc_serv *));
 struct svc_rqst *svc_prepare_thread(struct svc_serv *serv,
index 4cb70dc..e50502d 100644 (file)
@@ -129,6 +129,9 @@ unx_match(struct auth_cred *acred, struct rpc_cred *rcred, int flags)
        for (i = 0; i < groups ; i++)
                if (cred->uc_gids[i] != GROUP_AT(acred->group_info, i))
                        return 0;
+       if (groups < NFS_NGROUPS &&
+           cred->uc_gids[groups] != NOGROUP)
+               return 0;
        return 1;
 }
 
index f588b85..8761bf8 100644 (file)
@@ -114,6 +114,9 @@ static struct rpc_program   rpcb_program;
 static struct rpc_clnt *       rpcb_local_clnt;
 static struct rpc_clnt *       rpcb_local_clnt4;
 
+DEFINE_SPINLOCK(rpcb_clnt_lock);
+unsigned int                   rpcb_users;
+
 struct rpcbind_args {
        struct rpc_xprt *       r_xprt;
 
@@ -161,6 +164,56 @@ static void rpcb_map_release(void *data)
        kfree(map);
 }
 
+static int rpcb_get_local(void)
+{
+       int cnt;
+
+       spin_lock(&rpcb_clnt_lock);
+       if (rpcb_users)
+               rpcb_users++;
+       cnt = rpcb_users;
+       spin_unlock(&rpcb_clnt_lock);
+
+       return cnt;
+}
+
+void rpcb_put_local(void)
+{
+       struct rpc_clnt *clnt = rpcb_local_clnt;
+       struct rpc_clnt *clnt4 = rpcb_local_clnt4;
+       int shutdown;
+
+       spin_lock(&rpcb_clnt_lock);
+       if (--rpcb_users == 0) {
+               rpcb_local_clnt = NULL;
+               rpcb_local_clnt4 = NULL;
+       }
+       shutdown = !rpcb_users;
+       spin_unlock(&rpcb_clnt_lock);
+
+       if (shutdown) {
+               /*
+                * cleanup_rpcb_clnt - remove xprtsock's sysctls, unregister
+                */
+               if (clnt4)
+                       rpc_shutdown_client(clnt4);
+               if (clnt)
+                       rpc_shutdown_client(clnt);
+       }
+}
+
+static void rpcb_set_local(struct rpc_clnt *clnt, struct rpc_clnt *clnt4)
+{
+       /* Protected by rpcb_create_local_mutex */
+       rpcb_local_clnt = clnt;
+       rpcb_local_clnt4 = clnt4;
+       smp_wmb(); 
+       rpcb_users = 1;
+       dprintk("RPC:       created new rpcb local clients (rpcb_local_clnt: "
+                       "%p, rpcb_local_clnt4: %p)\n", rpcb_local_clnt,
+                       rpcb_local_clnt4);
+}
+
 /*
  * Returns zero on success, otherwise a negative errno value
  * is returned.
@@ -205,9 +258,7 @@ static int rpcb_create_local_unix(void)
                clnt4 = NULL;
        }
 
-       /* Protected by rpcb_create_local_mutex */
-       rpcb_local_clnt = clnt;
-       rpcb_local_clnt4 = clnt4;
+       rpcb_set_local(clnt, clnt4);
 
 out:
        return result;
@@ -259,9 +310,7 @@ static int rpcb_create_local_net(void)
                clnt4 = NULL;
        }
 
-       /* Protected by rpcb_create_local_mutex */
-       rpcb_local_clnt = clnt;
-       rpcb_local_clnt4 = clnt4;
+       rpcb_set_local(clnt, clnt4);
 
 out:
        return result;
@@ -271,16 +320,16 @@ out:
  * Returns zero on success, otherwise a negative errno value
  * is returned.
  */
-static int rpcb_create_local(void)
+int rpcb_create_local(void)
 {
        static DEFINE_MUTEX(rpcb_create_local_mutex);
        int result = 0;
 
-       if (rpcb_local_clnt)
+       if (rpcb_get_local())
                return result;
 
        mutex_lock(&rpcb_create_local_mutex);
-       if (rpcb_local_clnt)
+       if (rpcb_get_local())
                goto out;
 
        if (rpcb_create_local_unix() != 0)
@@ -382,11 +431,6 @@ int rpcb_register(u32 prog, u32 vers, int prot, unsigned short port)
        struct rpc_message msg = {
                .rpc_argp       = &map,
        };
-       int error;
-
-       error = rpcb_create_local();
-       if (error)
-               return error;
 
        dprintk("RPC:       %sregistering (%u, %u, %d, %u) with local "
                        "rpcbind\n", (port ? "" : "un"),
@@ -522,11 +566,7 @@ int rpcb_v4_register(const u32 program, const u32 version,
        struct rpc_message msg = {
                .rpc_argp       = &map,
        };
-       int error;
 
-       error = rpcb_create_local();
-       if (error)
-               return error;
        if (rpcb_local_clnt4 == NULL)
                return -EPROTONOSUPPORT;
 
@@ -1060,15 +1100,3 @@ static struct rpc_program rpcb_program = {
        .version        = rpcb_version,
        .stats          = &rpcb_stats,
 };
-
-/**
- * cleanup_rpcb_clnt - remove xprtsock's sysctls, unregister
- *
- */
-void cleanup_rpcb_clnt(void)
-{
-       if (rpcb_local_clnt4)
-               rpc_shutdown_client(rpcb_local_clnt4);
-       if (rpcb_local_clnt)
-               rpc_shutdown_client(rpcb_local_clnt);
-}
index 9d08091..8ec9778 100644 (file)
@@ -61,8 +61,6 @@ static struct pernet_operations sunrpc_net_ops = {
 
 extern struct cache_detail unix_gid_cache;
 
-extern void cleanup_rpcb_clnt(void);
-
 static int __init
 init_sunrpc(void)
 {
@@ -102,7 +100,6 @@ out:
 static void __exit
 cleanup_sunrpc(void)
 {
-       cleanup_rpcb_clnt();
        rpcauth_remove_module();
        cleanup_socket_xprt();
        svc_cleanup_xprt_sock();
index dd5cc00..6e03888 100644 (file)
@@ -366,6 +366,42 @@ svc_pool_for_cpu(struct svc_serv *serv, int cpu)
        return &serv->sv_pools[pidx % serv->sv_nrpools];
 }
 
+static int svc_rpcb_setup(struct svc_serv *serv)
+{
+       int err;
+
+       err = rpcb_create_local();
+       if (err)
+               return err;
+
+       /* Remove any stale portmap registrations */
+       svc_unregister(serv);
+       return 0;
+}
+
+void svc_rpcb_cleanup(struct svc_serv *serv)
+{
+       svc_unregister(serv);
+       rpcb_put_local();
+}
+EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
+
+static int svc_uses_rpcbind(struct svc_serv *serv)
+{
+       struct svc_program      *progp;
+       unsigned int            i;
+
+       for (progp = serv->sv_program; progp; progp = progp->pg_next) {
+               for (i = 0; i < progp->pg_nvers; i++) {
+                       if (progp->pg_vers[i] == NULL)
+                               continue;
+                       if (progp->pg_vers[i]->vs_hidden == 0)
+                               return 1;
+               }
+       }
+
+       return 0;
+}
 
 /*
  * Create an RPC service
@@ -431,8 +467,15 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
                spin_lock_init(&pool->sp_lock);
        }
 
-       /* Remove any stale portmap registrations */
-       svc_unregister(serv);
+       if (svc_uses_rpcbind(serv)) {
+               if (svc_rpcb_setup(serv) < 0) {
+                       kfree(serv->sv_pools);
+                       kfree(serv);
+                       return NULL;
+               }
+               if (!serv->sv_shutdown)
+                       serv->sv_shutdown = svc_rpcb_cleanup;
+       }
 
        return serv;
 }
@@ -500,7 +543,6 @@ svc_destroy(struct svc_serv *serv)
        if (svc_serv_is_pooled(serv))
                svc_pool_map_put();
 
-       svc_unregister(serv);
        kfree(serv->sv_pools);
        kfree(serv);
 }