Merge branch 'pnfs-submit' of git://git.open-osd.org/linux-open-osd
authorLinus Torvalds <torvalds@linux-foundation.org>
Sun, 29 May 2011 21:10:13 +0000 (14:10 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sun, 29 May 2011 21:10:13 +0000 (14:10 -0700)
* 'pnfs-submit' of git://git.open-osd.org/linux-open-osd: (32 commits)
  pnfs-obj: pg_test check for max_io_size
  NFSv4.1: define nfs_generic_pg_test
  NFSv4.1: use pnfs_generic_pg_test directly by layout driver
  NFSv4.1: change pg_test return type to bool
  NFSv4.1: unify pnfs_pageio_init functions
  pnfs-obj: objlayout_encode_layoutcommit implementation
  pnfs: encode_layoutcommit
  pnfs-obj: report errors and .encode_layoutreturn Implementation.
  pnfs: encode_layoutreturn
  pnfs: layoutret_on_setattr
  pnfs: layoutreturn
  pnfs-obj: osd raid engine read/write implementation
  pnfs: support for non-rpc layout drivers
  pnfs-obj: define per-inode private structure
  pnfs: alloc and free layout_hdr layoutdriver methods
  pnfs-obj: objio_osd device information retrieval and caching
  pnfs-obj: decode layout, alloc/free lseg
  pnfs-obj: pnfs_osd XDR client implementation
  pnfs-obj: pnfs_osd XDR definitions
  pnfs-obj: objlayoutdriver module skeleton
  ...

1  2 
fs/nfs/dir.c
fs/nfs/inode.c
fs/nfs/internal.h
fs/nfs/nfs4proc.c
fs/nfs/pnfs.c
include/linux/nfs4.h

diff --combined fs/nfs/dir.c
@@@ -512,12 -512,7 +512,7 @@@ int nfs_readdir_page_filler(nfs_readdir
                                struct page **xdr_pages, struct page *page, unsigned int buflen)
  {
        struct xdr_stream stream;
-       struct xdr_buf buf = {
-               .pages = xdr_pages,
-               .page_len = buflen,
-               .buflen = buflen,
-               .len = buflen,
-       };
+       struct xdr_buf buf;
        struct page *scratch;
        struct nfs_cache_array *array;
        unsigned int count = 0;
        if (scratch == NULL)
                return -ENOMEM;
  
-       xdr_init_decode(&stream, &buf, NULL);
+       xdr_init_decode_pages(&stream, &buf, xdr_pages, buflen);
        xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
  
        do {
@@@ -2042,14 -2037,11 +2037,14 @@@ static void nfs_access_free_list(struc
        }
  }
  
 -int nfs_access_cache_shrinker(struct shrinker *shrink, int nr_to_scan, gfp_t gfp_mask)
 +int nfs_access_cache_shrinker(struct shrinker *shrink,
 +                            struct shrink_control *sc)
  {
        LIST_HEAD(head);
        struct nfs_inode *nfsi, *next;
        struct nfs_access_entry *cache;
 +      int nr_to_scan = sc->nr_to_scan;
 +      gfp_t gfp_mask = sc->gfp_mask;
  
        if ((gfp_mask & GFP_KERNEL) != GFP_KERNEL)
                return (nr_to_scan == 0) ? 0 : -1;
diff --combined fs/nfs/inode.c
@@@ -1298,12 -1298,8 +1298,12 @@@ static int nfs_update_inode(struct inod
                                i_size_write(inode, new_isize);
                                invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
                        }
 -                      dprintk("NFS: isize change on server for file %s/%ld\n",
 -                                      inode->i_sb->s_id, inode->i_ino);
 +                      dprintk("NFS: isize change on server for file %s/%ld "
 +                                      "(%Ld to %Ld)\n",
 +                                      inode->i_sb->s_id,
 +                                      inode->i_ino,
 +                                      (long long)cur_isize,
 +                                      (long long)new_isize);
                }
        } else
                invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
   */
  void nfs4_evict_inode(struct inode *inode)
  {
-       pnfs_destroy_layout(NFS_I(inode));
        truncate_inode_pages(&inode->i_data, 0);
        end_writeback(inode);
+       pnfs_return_layout(inode);
+       pnfs_destroy_layout(NFS_I(inode));
        /* If we are holding a delegation, return it! */
        nfs_inode_return_delegation_noreclaim(inode);
        /* First call standard NFS clear_inode() code */
diff --combined fs/nfs/internal.h
@@@ -234,7 -234,7 +234,7 @@@ extern int nfs_init_client(struct nfs_c
  
  /* dir.c */
  extern int nfs_access_cache_shrinker(struct shrinker *shrink,
 -                                      int nr_to_scan, gfp_t gfp_mask);
 +                                      struct shrink_control *sc);
  
  /* inode.c */
  extern struct workqueue_struct *nfsiod_workqueue;
@@@ -310,6 -310,7 +310,7 @@@ extern int nfs_migrate_page(struct addr
  #endif
  
  /* nfs4proc.c */
+ extern void __nfs4_read_done_cb(struct nfs_read_data *);
  extern void nfs4_reset_read(struct rpc_task *task, struct nfs_read_data *data);
  extern int nfs4_init_client(struct nfs_client *clp,
                            const struct rpc_timeout *timeparms,
diff --combined fs/nfs/nfs4proc.c
@@@ -267,11 -267,9 +267,11 @@@ static int nfs4_handle_exception(struc
                                break;
                        nfs4_schedule_stateid_recovery(server, state);
                        goto wait_on_recovery;
 +              case -NFS4ERR_EXPIRED:
 +                      if (state != NULL)
 +                              nfs4_schedule_stateid_recovery(server, state);
                case -NFS4ERR_STALE_STATEID:
                case -NFS4ERR_STALE_CLIENTID:
 -              case -NFS4ERR_EXPIRED:
                        nfs4_schedule_lease_recovery(clp);
                        goto wait_on_recovery;
  #if defined(CONFIG_NFS_V4_1)
@@@ -2363,6 -2361,9 +2363,9 @@@ nfs4_proc_setattr(struct dentry *dentry
        struct nfs4_state *state = NULL;
        int status;
  
+       if (pnfs_ld_layoutret_on_setattr(inode))
+               pnfs_return_layout(inode);
        nfs_fattr_init(fattr);
        
        /* Search for an existing open(O_WRITE) file */
@@@ -3177,6 -3178,11 +3180,11 @@@ static int nfs4_proc_pathconf(struct nf
        return err;
  }
  
+ void __nfs4_read_done_cb(struct nfs_read_data *data)
+ {
+       nfs_invalidate_atime(data->inode);
+ }
  static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
  {
        struct nfs_server *server = NFS_SERVER(data->inode);
                return -EAGAIN;
        }
  
-       nfs_invalidate_atime(data->inode);
+       __nfs4_read_done_cb(data);
        if (task->tk_status > 0)
                renew_lease(server, data->timestamp);
        return 0;
@@@ -3200,7 -3206,8 +3208,8 @@@ static int nfs4_read_done(struct rpc_ta
        if (!nfs4_sequence_done(task, &data->res.seq_res))
                return -EAGAIN;
  
-       return data->read_done_cb(task, data);
+       return data->read_done_cb ? data->read_done_cb(task, data) :
+                                   nfs4_read_done_cb(task, data);
  }
  
  static void nfs4_proc_read_setup(struct nfs_read_data *data, struct rpc_message *msg)
@@@ -3245,7 -3252,8 +3254,8 @@@ static int nfs4_write_done(struct rpc_t
  {
        if (!nfs4_sequence_done(task, &data->res.seq_res))
                return -EAGAIN;
-       return data->write_done_cb(task, data);
+       return data->write_done_cb ? data->write_done_cb(task, data) :
+               nfs4_write_done_cb(task, data);
  }
  
  /* Reset the the nfs_write_data to send the write to the MDS. */
@@@ -3672,11 -3680,9 +3682,11 @@@ nfs4_async_handle_error(struct rpc_tas
                                break;
                        nfs4_schedule_stateid_recovery(server, state);
                        goto wait_on_recovery;
 +              case -NFS4ERR_EXPIRED:
 +                      if (state != NULL)
 +                              nfs4_schedule_stateid_recovery(server, state);
                case -NFS4ERR_STALE_STATEID:
                case -NFS4ERR_STALE_CLIENTID:
 -              case -NFS4ERR_EXPIRED:
                        nfs4_schedule_lease_recovery(clp);
                        goto wait_on_recovery;
  #if defined(CONFIG_NFS_V4_1)
@@@ -4547,7 -4553,6 +4557,7 @@@ int nfs4_lock_delegation_recall(struct 
                        case -ESTALE:
                                goto out;
                        case -NFS4ERR_EXPIRED:
 +                              nfs4_schedule_stateid_recovery(server, state);
                        case -NFS4ERR_STALE_CLIENTID:
                        case -NFS4ERR_STALE_STATEID:
                                nfs4_schedule_lease_recovery(server->nfs_client);
@@@ -5671,6 -5676,88 +5681,88 @@@ int nfs4_proc_layoutget(struct nfs4_lay
        return status;
  }
  
+ static void
+ nfs4_layoutreturn_prepare(struct rpc_task *task, void *calldata)
+ {
+       struct nfs4_layoutreturn *lrp = calldata;
+       dprintk("--> %s\n", __func__);
+       if (nfs41_setup_sequence(lrp->clp->cl_session, &lrp->args.seq_args,
+                               &lrp->res.seq_res, 0, task))
+               return;
+       rpc_call_start(task);
+ }
+ static void nfs4_layoutreturn_done(struct rpc_task *task, void *calldata)
+ {
+       struct nfs4_layoutreturn *lrp = calldata;
+       struct nfs_server *server;
+       dprintk("--> %s\n", __func__);
+       if (!nfs4_sequence_done(task, &lrp->res.seq_res))
+               return;
+       server = NFS_SERVER(lrp->args.inode);
+       if (nfs4_async_handle_error(task, server, NULL) == -EAGAIN) {
+               nfs_restart_rpc(task, lrp->clp);
+               return;
+       }
+       if (task->tk_status == 0) {
+               struct pnfs_layout_hdr *lo = NFS_I(lrp->args.inode)->layout;
+               if (lrp->res.lrs_present) {
+                       spin_lock(&lo->plh_inode->i_lock);
+                       pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
+                       spin_unlock(&lo->plh_inode->i_lock);
+               } else
+                       BUG_ON(!list_empty(&lo->plh_segs));
+       }
+       dprintk("<-- %s\n", __func__);
+ }
+ static void nfs4_layoutreturn_release(void *calldata)
+ {
+       struct nfs4_layoutreturn *lrp = calldata;
+       dprintk("--> %s\n", __func__);
+       put_layout_hdr(NFS_I(lrp->args.inode)->layout);
+       kfree(calldata);
+       dprintk("<-- %s\n", __func__);
+ }
+ static const struct rpc_call_ops nfs4_layoutreturn_call_ops = {
+       .rpc_call_prepare = nfs4_layoutreturn_prepare,
+       .rpc_call_done = nfs4_layoutreturn_done,
+       .rpc_release = nfs4_layoutreturn_release,
+ };
+ int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp)
+ {
+       struct rpc_task *task;
+       struct rpc_message msg = {
+               .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_LAYOUTRETURN],
+               .rpc_argp = &lrp->args,
+               .rpc_resp = &lrp->res,
+       };
+       struct rpc_task_setup task_setup_data = {
+               .rpc_client = lrp->clp->cl_rpcclient,
+               .rpc_message = &msg,
+               .callback_ops = &nfs4_layoutreturn_call_ops,
+               .callback_data = lrp,
+       };
+       int status;
+       dprintk("--> %s\n", __func__);
+       task = rpc_run_task(&task_setup_data);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
+       status = task->tk_status;
+       dprintk("<-- %s status=%d\n", __func__, status);
+       rpc_put_task(task);
+       return status;
+ }
  static int
  _nfs4_proc_getdeviceinfo(struct nfs_server *server, struct pnfs_device *pdev)
  {
diff --combined fs/nfs/pnfs.c
@@@ -177,13 -177,28 +177,28 @@@ get_layout_hdr(struct pnfs_layout_hdr *
        atomic_inc(&lo->plh_refcount);
  }
  
+ static struct pnfs_layout_hdr *
+ pnfs_alloc_layout_hdr(struct inode *ino, gfp_t gfp_flags)
+ {
+       struct pnfs_layoutdriver_type *ld = NFS_SERVER(ino)->pnfs_curr_ld;
+       return ld->alloc_layout_hdr ? ld->alloc_layout_hdr(ino, gfp_flags) :
+               kzalloc(sizeof(struct pnfs_layout_hdr), gfp_flags);
+ }
+ static void
+ pnfs_free_layout_hdr(struct pnfs_layout_hdr *lo)
+ {
+       struct pnfs_layoutdriver_type *ld = NFS_SERVER(lo->plh_inode)->pnfs_curr_ld;
+       return ld->alloc_layout_hdr ? ld->free_layout_hdr(lo) : kfree(lo);
+ }
  static void
  destroy_layout_hdr(struct pnfs_layout_hdr *lo)
  {
        dprintk("%s: freeing layout cache %p\n", __func__, lo);
        BUG_ON(!list_empty(&lo->plh_layouts));
        NFS_I(lo->plh_inode)->layout = NULL;
-       kfree(lo);
+       pnfs_free_layout_hdr(lo);
  }
  
  static void
@@@ -228,7 -243,7 +243,7 @@@ put_lseg_common(struct pnfs_layout_segm
  {
        struct inode *inode = lseg->pls_layout->plh_inode;
  
-       BUG_ON(test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
+       WARN_ON(test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
        list_del_init(&lseg->pls_list);
        if (list_empty(&lseg->pls_layout->plh_segs)) {
                set_bit(NFS_LAYOUT_DESTROYED, &lseg->pls_layout->plh_flags);
@@@ -261,11 -276,72 +276,72 @@@ put_lseg(struct pnfs_layout_segment *ls
  }
  EXPORT_SYMBOL_GPL(put_lseg);
  
+ static inline u64
+ end_offset(u64 start, u64 len)
+ {
+       u64 end;
+       end = start + len;
+       return end >= start ? end : NFS4_MAX_UINT64;
+ }
+ /* last octet in a range */
+ static inline u64
+ last_byte_offset(u64 start, u64 len)
+ {
+       u64 end;
+       BUG_ON(!len);
+       end = start + len;
+       return end > start ? end - 1 : NFS4_MAX_UINT64;
+ }
+ /*
+  * is l2 fully contained in l1?
+  *   start1                             end1
+  *   [----------------------------------)
+  *           start2           end2
+  *           [----------------)
+  */
+ static inline int
+ lo_seg_contained(struct pnfs_layout_range *l1,
+                struct pnfs_layout_range *l2)
+ {
+       u64 start1 = l1->offset;
+       u64 end1 = end_offset(start1, l1->length);
+       u64 start2 = l2->offset;
+       u64 end2 = end_offset(start2, l2->length);
+       return (start1 <= start2) && (end1 >= end2);
+ }
+ /*
+  * is l1 and l2 intersecting?
+  *   start1                             end1
+  *   [----------------------------------)
+  *                              start2           end2
+  *                              [----------------)
+  */
+ static inline int
+ lo_seg_intersecting(struct pnfs_layout_range *l1,
+                   struct pnfs_layout_range *l2)
+ {
+       u64 start1 = l1->offset;
+       u64 end1 = end_offset(start1, l1->length);
+       u64 start2 = l2->offset;
+       u64 end2 = end_offset(start2, l2->length);
+       return (end1 == NFS4_MAX_UINT64 || end1 > start2) &&
+              (end2 == NFS4_MAX_UINT64 || end2 > start1);
+ }
  static bool
- should_free_lseg(u32 lseg_iomode, u32 recall_iomode)
+ should_free_lseg(struct pnfs_layout_range *lseg_range,
+                struct pnfs_layout_range *recall_range)
  {
-       return (recall_iomode == IOMODE_ANY ||
-               lseg_iomode == recall_iomode);
+       return (recall_range->iomode == IOMODE_ANY ||
+               lseg_range->iomode == recall_range->iomode) &&
+              lo_seg_intersecting(lseg_range, recall_range);
  }
  
  /* Returns 1 if lseg is removed from list, 0 otherwise */
@@@ -296,7 -372,7 +372,7 @@@ static int mark_lseg_invalid(struct pnf
  int
  mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo,
                            struct list_head *tmp_list,
-                           u32 iomode)
+                           struct pnfs_layout_range *recall_range)
  {
        struct pnfs_layout_segment *lseg, *next;
        int invalid = 0, removed = 0;
                return 0;
        }
        list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
-               if (should_free_lseg(lseg->pls_range.iomode, iomode)) {
+               if (!recall_range ||
+                   should_free_lseg(&lseg->pls_range, recall_range)) {
                        dprintk("%s: freeing lseg %p iomode %d "
                                "offset %llu length %llu\n", __func__,
                                lseg, lseg->pls_range.iomode, lseg->pls_range.offset,
@@@ -358,7 -435,7 +435,7 @@@ pnfs_destroy_layout(struct nfs_inode *n
        lo = nfsi->layout;
        if (lo) {
                lo->plh_block_lgets++; /* permanently block new LAYOUTGETs */
-               mark_matching_lsegs_invalid(lo, &tmp_list, IOMODE_ANY);
+               mark_matching_lsegs_invalid(lo, &tmp_list, NULL);
        }
        spin_unlock(&nfsi->vfs_inode.i_lock);
        pnfs_free_lseg_list(&tmp_list);
@@@ -467,7 -544,7 +544,7 @@@ pnfs_choose_layoutget_stateid(nfs4_stat
  static struct pnfs_layout_segment *
  send_layoutget(struct pnfs_layout_hdr *lo,
           struct nfs_open_context *ctx,
-          u32 iomode,
+          struct pnfs_layout_range *range,
           gfp_t gfp_flags)
  {
        struct inode *ino = lo->plh_inode;
                        goto out_err_free;
        }
  
-       lgp->args.minlength = NFS4_MAX_UINT64;
+       lgp->args.minlength = PAGE_CACHE_SIZE;
+       if (lgp->args.minlength > range->length)
+               lgp->args.minlength = range->length;
        lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE;
-       lgp->args.range.iomode = iomode;
-       lgp->args.range.offset = 0;
-       lgp->args.range.length = NFS4_MAX_UINT64;
+       lgp->args.range = *range;
        lgp->args.type = server->pnfs_curr_ld->id;
        lgp->args.inode = ino;
        lgp->args.ctx = get_nfs_open_context(ctx);
        nfs4_proc_layoutget(lgp);
        if (!lseg) {
                /* remember that LAYOUTGET failed and suspend trying */
-               set_bit(lo_fail_bit(iomode), &lo->plh_flags);
+               set_bit(lo_fail_bit(range->iomode), &lo->plh_flags);
        }
  
        /* free xdr pages */
@@@ -542,6 -619,51 +619,51 @@@ out_err_free
        return NULL;
  }
  
+ /* Initiates a LAYOUTRETURN(FILE) */
+ int
+ _pnfs_return_layout(struct inode *ino)
+ {
+       struct pnfs_layout_hdr *lo = NULL;
+       struct nfs_inode *nfsi = NFS_I(ino);
+       LIST_HEAD(tmp_list);
+       struct nfs4_layoutreturn *lrp;
+       nfs4_stateid stateid;
+       int status = 0;
+       dprintk("--> %s\n", __func__);
+       spin_lock(&ino->i_lock);
+       lo = nfsi->layout;
+       if (!lo || !mark_matching_lsegs_invalid(lo, &tmp_list, NULL)) {
+               spin_unlock(&ino->i_lock);
+               dprintk("%s: no layout segments to return\n", __func__);
+               goto out;
+       }
+       stateid = nfsi->layout->plh_stateid;
+       /* Reference matched in nfs4_layoutreturn_release */
+       get_layout_hdr(lo);
+       spin_unlock(&ino->i_lock);
+       pnfs_free_lseg_list(&tmp_list);
+       WARN_ON(test_bit(NFS_INO_LAYOUTCOMMIT, &nfsi->flags));
+       lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
+       if (unlikely(lrp == NULL)) {
+               status = -ENOMEM;
+               goto out;
+       }
+       lrp->args.stateid = stateid;
+       lrp->args.layout_type = NFS_SERVER(ino)->pnfs_curr_ld->id;
+       lrp->args.inode = ino;
+       lrp->clp = NFS_SERVER(ino)->nfs_client;
+       status = nfs4_proc_layoutreturn(lrp);
+ out:
+       dprintk("<-- %s status: %d\n", __func__, status);
+       return status;
+ }
  bool pnfs_roc(struct inode *ino)
  {
        struct pnfs_layout_hdr *lo;
@@@ -625,10 -747,23 +747,23 @@@ bool pnfs_roc_drain(struct inode *ino, 
   * are seen first.
   */
  static s64
- cmp_layout(u32 iomode1, u32 iomode2)
+ cmp_layout(struct pnfs_layout_range *l1,
+          struct pnfs_layout_range *l2)
  {
+       s64 d;
+       /* high offset > low offset */
+       d = l1->offset - l2->offset;
+       if (d)
+               return d;
+       /* short length > long length */
+       d = l2->length - l1->length;
+       if (d)
+               return d;
        /* read > read/write */
-       return (int)(iomode2 == IOMODE_READ) - (int)(iomode1 == IOMODE_READ);
+       return (int)(l1->iomode == IOMODE_READ) - (int)(l2->iomode == IOMODE_READ);
  }
  
  static void
@@@ -636,13 -771,12 +771,12 @@@ pnfs_insert_layout(struct pnfs_layout_h
                   struct pnfs_layout_segment *lseg)
  {
        struct pnfs_layout_segment *lp;
-       int found = 0;
  
        dprintk("%s:Begin\n", __func__);
  
        assert_spin_locked(&lo->plh_inode->i_lock);
        list_for_each_entry(lp, &lo->plh_segs, pls_list) {
-               if (cmp_layout(lp->pls_range.iomode, lseg->pls_range.iomode) > 0)
+               if (cmp_layout(&lseg->pls_range, &lp->pls_range) > 0)
                        continue;
                list_add_tail(&lseg->pls_list, &lp->pls_list);
                dprintk("%s: inserted lseg %p "
                        lseg->pls_range.offset, lseg->pls_range.length,
                        lp, lp->pls_range.iomode, lp->pls_range.offset,
                        lp->pls_range.length);
-               found = 1;
-               break;
-       }
-       if (!found) {
-               list_add_tail(&lseg->pls_list, &lo->plh_segs);
-               dprintk("%s: inserted lseg %p "
-                       "iomode %d offset %llu length %llu at tail\n",
-                       __func__, lseg, lseg->pls_range.iomode,
-                       lseg->pls_range.offset, lseg->pls_range.length);
+               goto out;
        }
+       list_add_tail(&lseg->pls_list, &lo->plh_segs);
+       dprintk("%s: inserted lseg %p "
+               "iomode %d offset %llu length %llu at tail\n",
+               __func__, lseg, lseg->pls_range.iomode,
+               lseg->pls_range.offset, lseg->pls_range.length);
+ out:
        get_layout_hdr(lo);
  
        dprintk("%s:Return\n", __func__);
@@@ -672,7 -804,7 +804,7 @@@ alloc_init_layout_hdr(struct inode *ino
  {
        struct pnfs_layout_hdr *lo;
  
-       lo = kzalloc(sizeof(struct pnfs_layout_hdr), gfp_flags);
+       lo = pnfs_alloc_layout_hdr(ino, gfp_flags);
        if (!lo)
                return NULL;
        atomic_set(&lo->plh_refcount, 1);
@@@ -705,7 -837,7 +837,7 @@@ pnfs_find_alloc_layout(struct inode *in
        if (likely(nfsi->layout == NULL))       /* Won the race? */
                nfsi->layout = new;
        else
-               kfree(new);
+               pnfs_free_layout_hdr(new);
        return nfsi->layout;
  }
  
   * READ               RW      true
   */
  static int
- is_matching_lseg(struct pnfs_layout_segment *lseg, u32 iomode)
+ is_matching_lseg(struct pnfs_layout_range *ls_range,
+                struct pnfs_layout_range *range)
  {
-       return (iomode != IOMODE_RW || lseg->pls_range.iomode == IOMODE_RW);
+       struct pnfs_layout_range range1;
+       if ((range->iomode == IOMODE_RW &&
+            ls_range->iomode != IOMODE_RW) ||
+           !lo_seg_intersecting(ls_range, range))
+               return 0;
+       /* range1 covers only the first byte in the range */
+       range1 = *range;
+       range1.length = 1;
+       return lo_seg_contained(ls_range, &range1);
  }
  
  /*
   * lookup range in layout
   */
  static struct pnfs_layout_segment *
- pnfs_find_lseg(struct pnfs_layout_hdr *lo, u32 iomode)
+ pnfs_find_lseg(struct pnfs_layout_hdr *lo,
+               struct pnfs_layout_range *range)
  {
        struct pnfs_layout_segment *lseg, *ret = NULL;
  
        assert_spin_locked(&lo->plh_inode->i_lock);
        list_for_each_entry(lseg, &lo->plh_segs, pls_list) {
                if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags) &&
-                   is_matching_lseg(lseg, iomode)) {
+                   is_matching_lseg(&lseg->pls_range, range)) {
                        ret = get_lseg(lseg);
                        break;
                }
-               if (cmp_layout(iomode, lseg->pls_range.iomode) > 0)
+               if (cmp_layout(range, &lseg->pls_range) > 0)
                        break;
        }
  
  struct pnfs_layout_segment *
  pnfs_update_layout(struct inode *ino,
                   struct nfs_open_context *ctx,
+                  loff_t pos,
+                  u64 count,
                   enum pnfs_iomode iomode,
                   gfp_t gfp_flags)
  {
+       struct pnfs_layout_range arg = {
+               .iomode = iomode,
+               .offset = pos,
+               .length = count,
+       };
+       unsigned pg_offset;
        struct nfs_inode *nfsi = NFS_I(ino);
        struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
        struct pnfs_layout_hdr *lo;
                goto out_unlock;
  
        /* Check to see if the layout for the given range already exists */
-       lseg = pnfs_find_lseg(lo, iomode);
+       lseg = pnfs_find_lseg(lo, &arg);
        if (lseg)
                goto out_unlock;
  
                spin_unlock(&clp->cl_lock);
        }
  
-       lseg = send_layoutget(lo, ctx, iomode, gfp_flags);
+       pg_offset = arg.offset & ~PAGE_CACHE_MASK;
+       if (pg_offset) {
+               arg.offset -= pg_offset;
+               arg.length += pg_offset;
+       }
+       arg.length = PAGE_CACHE_ALIGN(arg.length);
+       lseg = send_layoutget(lo, ctx, &arg, gfp_flags);
        if (!lseg && first) {
                spin_lock(&clp->cl_lock);
                list_del_init(&lo->plh_layouts);
@@@ -838,17 -997,6 +997,6 @@@ pnfs_layout_process(struct nfs4_layoutg
        struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
        int status = 0;
  
-       /* Verify we got what we asked for.
-        * Note that because the xdr parsing only accepts a single
-        * element array, this can fail even if the server is behaving
-        * correctly.
-        */
-       if (lgp->args.range.iomode > res->range.iomode ||
-           res->range.offset != 0 ||
-           res->range.length != NFS4_MAX_UINT64) {
-               status = -EINVAL;
-               goto out;
-       }
        /* Inject layout blob into I/O device driver */
        lseg = NFS_SERVER(ino)->pnfs_curr_ld->alloc_lseg(lo, res, lgp->gfp_flags);
        if (!lseg || IS_ERR(lseg)) {
@@@ -895,51 -1043,64 +1043,64 @@@ out_forget_reply
        goto out;
  }
  
- static int pnfs_read_pg_test(struct nfs_pageio_descriptor *pgio,
                           struct nfs_page *prev,
-                            struct nfs_page *req)
+ bool
pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
+                    struct nfs_page *req)
  {
+       enum pnfs_iomode access_type;
+       gfp_t gfp_flags;
+       /* We assume that pg_ioflags == 0 iff we're reading a page */
+       if (pgio->pg_ioflags == 0) {
+               access_type = IOMODE_READ;
+               gfp_flags = GFP_KERNEL;
+       } else {
+               access_type = IOMODE_RW;
+               gfp_flags = GFP_NOFS;
+       }
        if (pgio->pg_count == prev->wb_bytes) {
                /* This is first coelesce call for a series of nfs_pages */
                pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
                                                   prev->wb_context,
-                                                  IOMODE_READ,
-                                                  GFP_KERNEL);
+                                                  req_offset(req),
+                                                  pgio->pg_count,
+                                                  access_type,
+                                                  gfp_flags);
+               return true;
        }
-       return NFS_SERVER(pgio->pg_inode)->pnfs_curr_ld->pg_test(pgio, prev, req);
- }
  
- void
- pnfs_pageio_init_read(struct nfs_pageio_descriptor *pgio, struct inode *inode)
- {
-       struct pnfs_layoutdriver_type *ld;
+       if (pgio->pg_lseg &&
+           req_offset(req) > end_offset(pgio->pg_lseg->pls_range.offset,
+                                        pgio->pg_lseg->pls_range.length))
+               return false;
  
-       ld = NFS_SERVER(inode)->pnfs_curr_ld;
-       pgio->pg_test = (ld && ld->pg_test) ? pnfs_read_pg_test : NULL;
+       return true;
  }
+ EXPORT_SYMBOL_GPL(pnfs_generic_pg_test);
  
- static int pnfs_write_pg_test(struct nfs_pageio_descriptor *pgio,
-                             struct nfs_page *prev,
-                             struct nfs_page *req)
+ /*
+  * Called by non rpc-based layout drivers
+  */
+ int
+ pnfs_ld_write_done(struct nfs_write_data *data)
  {
-       if (pgio->pg_count == prev->wb_bytes) {
-               /* This is first coelesce call for a series of nfs_pages */
-               pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
-                                                  prev->wb_context,
-                                                  IOMODE_RW,
-                                                  GFP_NOFS);
-       }
-       return NFS_SERVER(pgio->pg_inode)->pnfs_curr_ld->pg_test(pgio, prev, req);
- }
+       int status;
  
- void
- pnfs_pageio_init_write(struct nfs_pageio_descriptor *pgio, struct inode *inode)
- {
-       struct pnfs_layoutdriver_type *ld;
+       if (!data->pnfs_error) {
+               pnfs_set_layoutcommit(data);
+               data->mds_ops->rpc_call_done(&data->task, data);
+               data->mds_ops->rpc_release(data);
+               return 0;
+       }
  
-       ld = NFS_SERVER(inode)->pnfs_curr_ld;
-       pgio->pg_test = (ld && ld->pg_test) ? pnfs_write_pg_test : NULL;
+       dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
+               data->pnfs_error);
+       status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
+                                   data->mds_ops, NFS_FILE_SYNC);
+       return status ? : -EAGAIN;
  }
+ EXPORT_SYMBOL_GPL(pnfs_ld_write_done);
  
  enum pnfs_try_status
  pnfs_try_to_write_data(struct nfs_write_data *wdata,
        return trypnfs;
  }
  
+ /*
+  * Called by non rpc-based layout drivers
+  */
+ int
+ pnfs_ld_read_done(struct nfs_read_data *data)
+ {
+       int status;
+       if (!data->pnfs_error) {
+               __nfs4_read_done_cb(data);
+               data->mds_ops->rpc_call_done(&data->task, data);
+               data->mds_ops->rpc_release(data);
+               return 0;
+       }
+       dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
+               data->pnfs_error);
+       status = nfs_initiate_read(data, NFS_CLIENT(data->inode),
+                                  data->mds_ops);
+       return status ? : -EAGAIN;
+ }
+ EXPORT_SYMBOL_GPL(pnfs_ld_read_done);
  /*
   * Call the appropriate parallel I/O subsystem read function.
   */
@@@ -1009,7 -1193,7 +1193,7 @@@ voi
  pnfs_set_layoutcommit(struct nfs_write_data *wdata)
  {
        struct nfs_inode *nfsi = NFS_I(wdata->inode);
 -      loff_t end_pos = wdata->args.offset + wdata->res.count;
 +      loff_t end_pos = wdata->mds_offset + wdata->res.count;
        bool mark_as_dirty = false;
  
        spin_lock(&nfsi->vfs_inode.i_lock);
diff --combined include/linux/nfs4.h
@@@ -562,6 -562,7 +562,7 @@@ enum 
        NFSPROC4_CLNT_LAYOUTGET,
        NFSPROC4_CLNT_GETDEVICEINFO,
        NFSPROC4_CLNT_LAYOUTCOMMIT,
+       NFSPROC4_CLNT_LAYOUTRETURN,
  };
  
  /* nfs41 types */
@@@ -570,11 -571,9 +571,11 @@@ struct nfs4_sessionid 
  };
  
  /* Create Session Flags */
 -#define SESSION4_PERSIST       0x001
 -#define SESSION4_BACK_CHAN     0x002
 -#define SESSION4_RDMA          0x004
 +#define SESSION4_PERSIST      0x001
 +#define SESSION4_BACK_CHAN    0x002
 +#define SESSION4_RDMA         0x004
 +
 +#define SESSION4_FLAG_MASK_A  0x007
  
  enum state_protect_how4 {
        SP4_NONE        = 0,