Remove obsolete #include <linux/config.h>
[pandora-kernel.git] / fs / nfs / direct.c
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001  Initial implementation for 2.4  --cel
33  * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003  Port to 2.5 APIs  --cel
35  * 31 Mar 2004  Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004  Parallel async reads  --cel
37  * 04 May 2005  support O_DIRECT with aio  --cel
38  *
39  */
40
41 #include <linux/errno.h>
42 #include <linux/sched.h>
43 #include <linux/kernel.h>
44 #include <linux/smp_lock.h>
45 #include <linux/file.h>
46 #include <linux/pagemap.h>
47 #include <linux/kref.h>
48
49 #include <linux/nfs_fs.h>
50 #include <linux/nfs_page.h>
51 #include <linux/sunrpc/clnt.h>
52
53 #include <asm/system.h>
54 #include <asm/uaccess.h>
55 #include <asm/atomic.h>
56
57 #include "iostat.h"
58
59 #define NFSDBG_FACILITY         NFSDBG_VFS
60
61 static kmem_cache_t *nfs_direct_cachep;
62
63 /*
64  * This represents a set of asynchronous requests that we're waiting on
65  */
66 struct nfs_direct_req {
67         struct kref             kref;           /* release manager */
68
69         /* I/O parameters */
70         struct list_head        list,           /* nfs_read/write_data structs */
71                                 rewrite_list;   /* saved nfs_write_data structs */
72         struct nfs_open_context *ctx;           /* file open context info */
73         struct kiocb *          iocb;           /* controlling i/o request */
74         struct inode *          inode;          /* target file of i/o */
75         unsigned long           user_addr;      /* location of user's buffer */
76         size_t                  user_count;     /* total bytes to move */
77         loff_t                  pos;            /* starting offset in file */
78         struct page **          pages;          /* pages in our buffer */
79         unsigned int            npages;         /* count of pages */
80
81         /* completion state */
82         spinlock_t              lock;           /* protect completion state */
83         int                     outstanding;    /* i/os we're waiting for */
84         ssize_t                 count,          /* bytes actually processed */
85                                 error;          /* any reported error */
86         struct completion       completion;     /* wait for i/o completion */
87
88         /* commit state */
89         struct nfs_write_data * commit_data;    /* special write_data for commits */
90         int                     flags;
91 #define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
92 #define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
93         struct nfs_writeverf    verf;           /* unstable write verifier */
94 };
95
96 static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync);
97 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
98
99 /**
100  * nfs_direct_IO - NFS address space operation for direct I/O
101  * @rw: direction (read or write)
102  * @iocb: target I/O control block
103  * @iov: array of vectors that define I/O buffer
104  * @pos: offset in file to begin the operation
105  * @nr_segs: size of iovec array
106  *
107  * The presence of this routine in the address space ops vector means
108  * the NFS client supports direct I/O.  However, we shunt off direct
109  * read and write requests before the VFS gets them, so this method
110  * should never be called.
111  */
112 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
113 {
114         dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
115                         iocb->ki_filp->f_dentry->d_name.name,
116                         (long long) pos, nr_segs);
117
118         return -EINVAL;
119 }
120
121 static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
122 {
123         int i;
124         for (i = 0; i < npages; i++) {
125                 struct page *page = pages[i];
126                 if (do_dirty && !PageCompound(page))
127                         set_page_dirty_lock(page);
128                 page_cache_release(page);
129         }
130         kfree(pages);
131 }
132
133 static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, struct page ***pages)
134 {
135         int result = -ENOMEM;
136         unsigned long page_count;
137         size_t array_size;
138
139         page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
140         page_count -= user_addr >> PAGE_SHIFT;
141
142         array_size = (page_count * sizeof(struct page *));
143         *pages = kmalloc(array_size, GFP_KERNEL);
144         if (*pages) {
145                 down_read(&current->mm->mmap_sem);
146                 result = get_user_pages(current, current->mm, user_addr,
147                                         page_count, (rw == READ), 0,
148                                         *pages, NULL);
149                 up_read(&current->mm->mmap_sem);
150                 if (result != page_count) {
151                         /*
152                          * If we got fewer pages than expected from
153                          * get_user_pages(), the user buffer runs off the
154                          * end of a mapping; return EFAULT.
155                          */
156                         if (result >= 0) {
157                                 nfs_free_user_pages(*pages, result, 0);
158                                 result = -EFAULT;
159                         } else
160                                 kfree(*pages);
161                         *pages = NULL;
162                 }
163         }
164         return result;
165 }
166
167 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
168 {
169         struct nfs_direct_req *dreq;
170
171         dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
172         if (!dreq)
173                 return NULL;
174
175         kref_init(&dreq->kref);
176         init_completion(&dreq->completion);
177         INIT_LIST_HEAD(&dreq->list);
178         INIT_LIST_HEAD(&dreq->rewrite_list);
179         dreq->iocb = NULL;
180         dreq->ctx = NULL;
181         spin_lock_init(&dreq->lock);
182         dreq->outstanding = 0;
183         dreq->count = 0;
184         dreq->error = 0;
185         dreq->flags = 0;
186
187         return dreq;
188 }
189
190 static void nfs_direct_req_release(struct kref *kref)
191 {
192         struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
193
194         if (dreq->ctx != NULL)
195                 put_nfs_open_context(dreq->ctx);
196         kmem_cache_free(nfs_direct_cachep, dreq);
197 }
198
199 /*
200  * Collects and returns the final error value/byte-count.
201  */
202 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
203 {
204         ssize_t result = -EIOCBQUEUED;
205
206         /* Async requests don't wait here */
207         if (dreq->iocb)
208                 goto out;
209
210         result = wait_for_completion_interruptible(&dreq->completion);
211
212         if (!result)
213                 result = dreq->error;
214         if (!result)
215                 result = dreq->count;
216
217 out:
218         kref_put(&dreq->kref, nfs_direct_req_release);
219         return (ssize_t) result;
220 }
221
222 /*
223  * We must hold a reference to all the pages in this direct read request
224  * until the RPCs complete.  This could be long *after* we are woken up in
225  * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
226  *
227  * In addition, synchronous I/O uses a stack-allocated iocb.  Thus we
228  * can't trust the iocb is still valid here if this is a synchronous
229  * request.  If the waiter is woken prematurely, the iocb is long gone.
230  */
231 static void nfs_direct_complete(struct nfs_direct_req *dreq)
232 {
233         nfs_free_user_pages(dreq->pages, dreq->npages, 1);
234
235         if (dreq->iocb) {
236                 long res = (long) dreq->error;
237                 if (!res)
238                         res = (long) dreq->count;
239                 aio_complete(dreq->iocb, res, 0);
240         }
241         complete_all(&dreq->completion);
242
243         kref_put(&dreq->kref, nfs_direct_req_release);
244 }
245
246 /*
247  * Note we also set the number of requests we have in the dreq when we are
248  * done.  This prevents races with I/O completion so we will always wait
249  * until all requests have been dispatched and completed.
250  */
251 static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
252 {
253         struct list_head *list;
254         struct nfs_direct_req *dreq;
255         unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
256
257         dreq = nfs_direct_req_alloc();
258         if (!dreq)
259                 return NULL;
260
261         list = &dreq->list;
262         for(;;) {
263                 struct nfs_read_data *data = nfs_readdata_alloc(rpages);
264
265                 if (unlikely(!data)) {
266                         while (!list_empty(list)) {
267                                 data = list_entry(list->next,
268                                                   struct nfs_read_data, pages);
269                                 list_del(&data->pages);
270                                 nfs_readdata_free(data);
271                         }
272                         kref_put(&dreq->kref, nfs_direct_req_release);
273                         return NULL;
274                 }
275
276                 INIT_LIST_HEAD(&data->pages);
277                 list_add(&data->pages, list);
278
279                 data->req = (struct nfs_page *) dreq;
280                 dreq->outstanding++;
281                 if (nbytes <= rsize)
282                         break;
283                 nbytes -= rsize;
284         }
285         kref_get(&dreq->kref);
286         return dreq;
287 }
288
289 static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
290 {
291         struct nfs_read_data *data = calldata;
292         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
293
294         if (nfs_readpage_result(task, data) != 0)
295                 return;
296
297         spin_lock(&dreq->lock);
298
299         if (likely(task->tk_status >= 0))
300                 dreq->count += data->res.count;
301         else
302                 dreq->error = task->tk_status;
303
304         if (--dreq->outstanding) {
305                 spin_unlock(&dreq->lock);
306                 return;
307         }
308
309         spin_unlock(&dreq->lock);
310         nfs_direct_complete(dreq);
311 }
312
313 static const struct rpc_call_ops nfs_read_direct_ops = {
314         .rpc_call_done = nfs_direct_read_result,
315         .rpc_release = nfs_readdata_release,
316 };
317
318 /*
319  * For each nfs_read_data struct that was allocated on the list, dispatch
320  * an NFS READ operation
321  */
322 static void nfs_direct_read_schedule(struct nfs_direct_req *dreq)
323 {
324         struct nfs_open_context *ctx = dreq->ctx;
325         struct inode *inode = ctx->dentry->d_inode;
326         struct list_head *list = &dreq->list;
327         struct page **pages = dreq->pages;
328         size_t count = dreq->user_count;
329         loff_t pos = dreq->pos;
330         size_t rsize = NFS_SERVER(inode)->rsize;
331         unsigned int curpage, pgbase;
332
333         curpage = 0;
334         pgbase = dreq->user_addr & ~PAGE_MASK;
335         do {
336                 struct nfs_read_data *data;
337                 size_t bytes;
338
339                 bytes = rsize;
340                 if (count < rsize)
341                         bytes = count;
342
343                 BUG_ON(list_empty(list));
344                 data = list_entry(list->next, struct nfs_read_data, pages);
345                 list_del_init(&data->pages);
346
347                 data->inode = inode;
348                 data->cred = ctx->cred;
349                 data->args.fh = NFS_FH(inode);
350                 data->args.context = ctx;
351                 data->args.offset = pos;
352                 data->args.pgbase = pgbase;
353                 data->args.pages = &pages[curpage];
354                 data->args.count = bytes;
355                 data->res.fattr = &data->fattr;
356                 data->res.eof = 0;
357                 data->res.count = bytes;
358
359                 rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
360                                 &nfs_read_direct_ops, data);
361                 NFS_PROTO(inode)->read_setup(data);
362
363                 data->task.tk_cookie = (unsigned long) inode;
364
365                 lock_kernel();
366                 rpc_execute(&data->task);
367                 unlock_kernel();
368
369                 dfprintk(VFS, "NFS: %5u initiated direct read call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
370                                 data->task.tk_pid,
371                                 inode->i_sb->s_id,
372                                 (long long)NFS_FILEID(inode),
373                                 bytes,
374                                 (unsigned long long)data->args.offset);
375
376                 pos += bytes;
377                 pgbase += bytes;
378                 curpage += pgbase >> PAGE_SHIFT;
379                 pgbase &= ~PAGE_MASK;
380
381                 count -= bytes;
382         } while (count != 0);
383         BUG_ON(!list_empty(list));
384 }
385
386 static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, unsigned int nr_pages)
387 {
388         ssize_t result;
389         sigset_t oldset;
390         struct inode *inode = iocb->ki_filp->f_mapping->host;
391         struct rpc_clnt *clnt = NFS_CLIENT(inode);
392         struct nfs_direct_req *dreq;
393
394         dreq = nfs_direct_read_alloc(count, NFS_SERVER(inode)->rsize);
395         if (!dreq)
396                 return -ENOMEM;
397
398         dreq->user_addr = user_addr;
399         dreq->user_count = count;
400         dreq->pos = pos;
401         dreq->pages = pages;
402         dreq->npages = nr_pages;
403         dreq->inode = inode;
404         dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
405         if (!is_sync_kiocb(iocb))
406                 dreq->iocb = iocb;
407
408         nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
409         rpc_clnt_sigmask(clnt, &oldset);
410         nfs_direct_read_schedule(dreq);
411         result = nfs_direct_wait(dreq);
412         rpc_clnt_sigunmask(clnt, &oldset);
413
414         return result;
415 }
416
417 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq)
418 {
419         list_splice_init(&dreq->rewrite_list, &dreq->list);
420         while (!list_empty(&dreq->list)) {
421                 struct nfs_write_data *data = list_entry(dreq->list.next, struct nfs_write_data, pages);
422                 list_del(&data->pages);
423                 nfs_writedata_release(data);
424         }
425 }
426
427 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
428 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
429 {
430         struct list_head *pos;
431
432         list_splice_init(&dreq->rewrite_list, &dreq->list);
433         list_for_each(pos, &dreq->list)
434                 dreq->outstanding++;
435         dreq->count = 0;
436
437         nfs_direct_write_schedule(dreq, FLUSH_STABLE);
438 }
439
440 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
441 {
442         struct nfs_write_data *data = calldata;
443         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
444
445         /* Call the NFS version-specific code */
446         if (NFS_PROTO(data->inode)->commit_done(task, data) != 0)
447                 return;
448         if (unlikely(task->tk_status < 0)) {
449                 dreq->error = task->tk_status;
450                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
451         }
452         if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
453                 dprintk("NFS: %5u commit verify failed\n", task->tk_pid);
454                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
455         }
456
457         dprintk("NFS: %5u commit returned %d\n", task->tk_pid, task->tk_status);
458         nfs_direct_write_complete(dreq, data->inode);
459 }
460
461 static const struct rpc_call_ops nfs_commit_direct_ops = {
462         .rpc_call_done = nfs_direct_commit_result,
463         .rpc_release = nfs_commit_release,
464 };
465
466 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
467 {
468         struct nfs_write_data *data = dreq->commit_data;
469
470         data->inode = dreq->inode;
471         data->cred = dreq->ctx->cred;
472
473         data->args.fh = NFS_FH(data->inode);
474         data->args.offset = dreq->pos;
475         data->args.count = dreq->user_count;
476         data->res.count = 0;
477         data->res.fattr = &data->fattr;
478         data->res.verf = &data->verf;
479
480         rpc_init_task(&data->task, NFS_CLIENT(dreq->inode), RPC_TASK_ASYNC,
481                                 &nfs_commit_direct_ops, data);
482         NFS_PROTO(data->inode)->commit_setup(data, 0);
483
484         data->task.tk_priority = RPC_PRIORITY_NORMAL;
485         data->task.tk_cookie = (unsigned long)data->inode;
486         /* Note: task.tk_ops->rpc_release will free dreq->commit_data */
487         dreq->commit_data = NULL;
488
489         dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid);
490
491         lock_kernel();
492         rpc_execute(&data->task);
493         unlock_kernel();
494 }
495
496 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
497 {
498         int flags = dreq->flags;
499
500         dreq->flags = 0;
501         switch (flags) {
502                 case NFS_ODIRECT_DO_COMMIT:
503                         nfs_direct_commit_schedule(dreq);
504                         break;
505                 case NFS_ODIRECT_RESCHED_WRITES:
506                         nfs_direct_write_reschedule(dreq);
507                         break;
508                 default:
509                         nfs_end_data_update(inode);
510                         if (dreq->commit_data != NULL)
511                                 nfs_commit_free(dreq->commit_data);
512                         nfs_direct_free_writedata(dreq);
513                         nfs_direct_complete(dreq);
514         }
515 }
516
517 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
518 {
519         dreq->commit_data = nfs_commit_alloc(0);
520         if (dreq->commit_data != NULL)
521                 dreq->commit_data->req = (struct nfs_page *) dreq;
522 }
523 #else
524 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
525 {
526         dreq->commit_data = NULL;
527 }
528
529 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
530 {
531         nfs_end_data_update(inode);
532         nfs_direct_free_writedata(dreq);
533         nfs_direct_complete(dreq);
534 }
535 #endif
536
537 static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
538 {
539         struct list_head *list;
540         struct nfs_direct_req *dreq;
541         unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
542
543         dreq = nfs_direct_req_alloc();
544         if (!dreq)
545                 return NULL;
546
547         list = &dreq->list;
548         for(;;) {
549                 struct nfs_write_data *data = nfs_writedata_alloc(wpages);
550
551                 if (unlikely(!data)) {
552                         while (!list_empty(list)) {
553                                 data = list_entry(list->next,
554                                                   struct nfs_write_data, pages);
555                                 list_del(&data->pages);
556                                 nfs_writedata_free(data);
557                         }
558                         kref_put(&dreq->kref, nfs_direct_req_release);
559                         return NULL;
560                 }
561
562                 INIT_LIST_HEAD(&data->pages);
563                 list_add(&data->pages, list);
564
565                 data->req = (struct nfs_page *) dreq;
566                 dreq->outstanding++;
567                 if (nbytes <= wsize)
568                         break;
569                 nbytes -= wsize;
570         }
571
572         nfs_alloc_commit_data(dreq);
573
574         kref_get(&dreq->kref);
575         return dreq;
576 }
577
578 static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
579 {
580         struct nfs_write_data *data = calldata;
581         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
582         int status = task->tk_status;
583
584         if (nfs_writeback_done(task, data) != 0)
585                 return;
586
587         spin_lock(&dreq->lock);
588
589         if (likely(status >= 0))
590                 dreq->count += data->res.count;
591         else
592                 dreq->error = task->tk_status;
593
594         if (data->res.verf->committed != NFS_FILE_SYNC) {
595                 switch (dreq->flags) {
596                         case 0:
597                                 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf));
598                                 dreq->flags = NFS_ODIRECT_DO_COMMIT;
599                                 break;
600                         case NFS_ODIRECT_DO_COMMIT:
601                                 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) {
602                                         dprintk("NFS: %5u write verify failed\n", task->tk_pid);
603                                         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
604                                 }
605                 }
606         }
607         /* In case we have to resend */
608         data->args.stable = NFS_FILE_SYNC;
609
610         spin_unlock(&dreq->lock);
611 }
612
613 /*
614  * NB: Return the value of the first error return code.  Subsequent
615  *     errors after the first one are ignored.
616  */
617 static void nfs_direct_write_release(void *calldata)
618 {
619         struct nfs_write_data *data = calldata;
620         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
621
622         spin_lock(&dreq->lock);
623         if (--dreq->outstanding) {
624                 spin_unlock(&dreq->lock);
625                 return;
626         }
627         spin_unlock(&dreq->lock);
628
629         nfs_direct_write_complete(dreq, data->inode);
630 }
631
632 static const struct rpc_call_ops nfs_write_direct_ops = {
633         .rpc_call_done = nfs_direct_write_result,
634         .rpc_release = nfs_direct_write_release,
635 };
636
637 /*
638  * For each nfs_write_data struct that was allocated on the list, dispatch
639  * an NFS WRITE operation
640  */
641 static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync)
642 {
643         struct nfs_open_context *ctx = dreq->ctx;
644         struct inode *inode = ctx->dentry->d_inode;
645         struct list_head *list = &dreq->list;
646         struct page **pages = dreq->pages;
647         size_t count = dreq->user_count;
648         loff_t pos = dreq->pos;
649         size_t wsize = NFS_SERVER(inode)->wsize;
650         unsigned int curpage, pgbase;
651
652         curpage = 0;
653         pgbase = dreq->user_addr & ~PAGE_MASK;
654         do {
655                 struct nfs_write_data *data;
656                 size_t bytes;
657
658                 bytes = wsize;
659                 if (count < wsize)
660                         bytes = count;
661
662                 BUG_ON(list_empty(list));
663                 data = list_entry(list->next, struct nfs_write_data, pages);
664                 list_move_tail(&data->pages, &dreq->rewrite_list);
665
666                 data->inode = inode;
667                 data->cred = ctx->cred;
668                 data->args.fh = NFS_FH(inode);
669                 data->args.context = ctx;
670                 data->args.offset = pos;
671                 data->args.pgbase = pgbase;
672                 data->args.pages = &pages[curpage];
673                 data->args.count = bytes;
674                 data->res.fattr = &data->fattr;
675                 data->res.count = bytes;
676                 data->res.verf = &data->verf;
677
678                 rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
679                                 &nfs_write_direct_ops, data);
680                 NFS_PROTO(inode)->write_setup(data, sync);
681
682                 data->task.tk_priority = RPC_PRIORITY_NORMAL;
683                 data->task.tk_cookie = (unsigned long) inode;
684
685                 lock_kernel();
686                 rpc_execute(&data->task);
687                 unlock_kernel();
688
689                 dfprintk(VFS, "NFS: %5u initiated direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
690                                 data->task.tk_pid,
691                                 inode->i_sb->s_id,
692                                 (long long)NFS_FILEID(inode),
693                                 bytes,
694                                 (unsigned long long)data->args.offset);
695
696                 pos += bytes;
697                 pgbase += bytes;
698                 curpage += pgbase >> PAGE_SHIFT;
699                 pgbase &= ~PAGE_MASK;
700
701                 count -= bytes;
702         } while (count != 0);
703         BUG_ON(!list_empty(list));
704 }
705
706 static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, int nr_pages)
707 {
708         ssize_t result;
709         sigset_t oldset;
710         struct inode *inode = iocb->ki_filp->f_mapping->host;
711         struct rpc_clnt *clnt = NFS_CLIENT(inode);
712         struct nfs_direct_req *dreq;
713         size_t wsize = NFS_SERVER(inode)->wsize;
714         int sync = 0;
715
716         dreq = nfs_direct_write_alloc(count, wsize);
717         if (!dreq)
718                 return -ENOMEM;
719         if (dreq->commit_data == NULL || count < wsize)
720                 sync = FLUSH_STABLE;
721
722         dreq->user_addr = user_addr;
723         dreq->user_count = count;
724         dreq->pos = pos;
725         dreq->pages = pages;
726         dreq->npages = nr_pages;
727         dreq->inode = inode;
728         dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
729         if (!is_sync_kiocb(iocb))
730                 dreq->iocb = iocb;
731
732         nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, count);
733
734         nfs_begin_data_update(inode);
735
736         rpc_clnt_sigmask(clnt, &oldset);
737         nfs_direct_write_schedule(dreq, sync);
738         result = nfs_direct_wait(dreq);
739         rpc_clnt_sigunmask(clnt, &oldset);
740
741         return result;
742 }
743
744 /**
745  * nfs_file_direct_read - file direct read operation for NFS files
746  * @iocb: target I/O control block
747  * @buf: user's buffer into which to read data
748  * @count: number of bytes to read
749  * @pos: byte offset in file where reading starts
750  *
751  * We use this function for direct reads instead of calling
752  * generic_file_aio_read() in order to avoid gfar's check to see if
753  * the request starts before the end of the file.  For that check
754  * to work, we must generate a GETATTR before each direct read, and
755  * even then there is a window between the GETATTR and the subsequent
756  * READ where the file size could change.  Our preference is simply
757  * to do all reads the application wants, and the server will take
758  * care of managing the end of file boundary.
759  *
760  * This function also eliminates unnecessarily updating the file's
761  * atime locally, as the NFS server sets the file's atime, and this
762  * client must read the updated atime from the server back into its
763  * cache.
764  */
765 ssize_t nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos)
766 {
767         ssize_t retval = -EINVAL;
768         int page_count;
769         struct page **pages;
770         struct file *file = iocb->ki_filp;
771         struct address_space *mapping = file->f_mapping;
772
773         dprintk("nfs: direct read(%s/%s, %lu@%Ld)\n",
774                 file->f_dentry->d_parent->d_name.name,
775                 file->f_dentry->d_name.name,
776                 (unsigned long) count, (long long) pos);
777
778         if (count < 0)
779                 goto out;
780         retval = -EFAULT;
781         if (!access_ok(VERIFY_WRITE, buf, count))
782                 goto out;
783         retval = 0;
784         if (!count)
785                 goto out;
786
787         retval = nfs_sync_mapping(mapping);
788         if (retval)
789                 goto out;
790
791         retval = nfs_get_user_pages(READ, (unsigned long) buf,
792                                                 count, &pages);
793         if (retval < 0)
794                 goto out;
795         page_count = retval;
796
797         retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos,
798                                                 pages, page_count);
799         if (retval > 0)
800                 iocb->ki_pos = pos + retval;
801
802 out:
803         return retval;
804 }
805
806 /**
807  * nfs_file_direct_write - file direct write operation for NFS files
808  * @iocb: target I/O control block
809  * @buf: user's buffer from which to write data
810  * @count: number of bytes to write
811  * @pos: byte offset in file where writing starts
812  *
813  * We use this function for direct writes instead of calling
814  * generic_file_aio_write() in order to avoid taking the inode
815  * semaphore and updating the i_size.  The NFS server will set
816  * the new i_size and this client must read the updated size
817  * back into its cache.  We let the server do generic write
818  * parameter checking and report problems.
819  *
820  * We also avoid an unnecessary invocation of generic_osync_inode(),
821  * as it is fairly meaningless to sync the metadata of an NFS file.
822  *
823  * We eliminate local atime updates, see direct read above.
824  *
825  * We avoid unnecessary page cache invalidations for normal cached
826  * readers of this file.
827  *
828  * Note that O_APPEND is not supported for NFS direct writes, as there
829  * is no atomic O_APPEND write facility in the NFS protocol.
830  */
831 ssize_t nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos)
832 {
833         ssize_t retval;
834         int page_count;
835         struct page **pages;
836         struct file *file = iocb->ki_filp;
837         struct address_space *mapping = file->f_mapping;
838
839         dfprintk(VFS, "nfs: direct write(%s/%s, %lu@%Ld)\n",
840                 file->f_dentry->d_parent->d_name.name,
841                 file->f_dentry->d_name.name,
842                 (unsigned long) count, (long long) pos);
843
844         retval = generic_write_checks(file, &pos, &count, 0);
845         if (retval)
846                 goto out;
847
848         retval = -EINVAL;
849         if ((ssize_t) count < 0)
850                 goto out;
851         retval = 0;
852         if (!count)
853                 goto out;
854
855         retval = -EFAULT;
856         if (!access_ok(VERIFY_READ, buf, count))
857                 goto out;
858
859         retval = nfs_sync_mapping(mapping);
860         if (retval)
861                 goto out;
862
863         retval = nfs_get_user_pages(WRITE, (unsigned long) buf,
864                                                 count, &pages);
865         if (retval < 0)
866                 goto out;
867         page_count = retval;
868
869         retval = nfs_direct_write(iocb, (unsigned long) buf, count,
870                                         pos, pages, page_count);
871
872         /*
873          * XXX: nfs_end_data_update() already ensures this file's
874          *      cached data is subsequently invalidated.  Do we really
875          *      need to call invalidate_inode_pages2() again here?
876          *
877          *      For aio writes, this invalidation will almost certainly
878          *      occur before the writes complete.  Kind of racey.
879          */
880         if (mapping->nrpages)
881                 invalidate_inode_pages2(mapping);
882
883         if (retval > 0)
884                 iocb->ki_pos = pos + retval;
885
886 out:
887         return retval;
888 }
889
890 /**
891  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
892  *
893  */
894 int __init nfs_init_directcache(void)
895 {
896         nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
897                                                 sizeof(struct nfs_direct_req),
898                                                 0, (SLAB_RECLAIM_ACCOUNT|
899                                                         SLAB_MEM_SPREAD),
900                                                 NULL, NULL);
901         if (nfs_direct_cachep == NULL)
902                 return -ENOMEM;
903
904         return 0;
905 }
906
907 /**
908  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
909  *
910  */
911 void nfs_destroy_directcache(void)
912 {
913         if (kmem_cache_destroy(nfs_direct_cachep))
914                 printk(KERN_INFO "nfs_direct_cache: not all structures were freed\n");
915 }