NFS: create common nfs_pgio_header for both read and write
[pandora-kernel.git] / fs / nfs / read.c
1 /*
2  * linux/fs/nfs/read.c
3  *
4  * Block I/O for NFS
5  *
6  * Partial copy of Linus' read cache modifications to fs/nfs/file.c
7  * modified for async RPC by okir@monad.swb.de
8  */
9
10 #include <linux/time.h>
11 #include <linux/kernel.h>
12 #include <linux/errno.h>
13 #include <linux/fcntl.h>
14 #include <linux/stat.h>
15 #include <linux/mm.h>
16 #include <linux/slab.h>
17 #include <linux/pagemap.h>
18 #include <linux/sunrpc/clnt.h>
19 #include <linux/nfs_fs.h>
20 #include <linux/nfs_page.h>
21 #include <linux/module.h>
22
23 #include "pnfs.h"
24
25 #include "nfs4_fs.h"
26 #include "internal.h"
27 #include "iostat.h"
28 #include "fscache.h"
29
30 #define NFSDBG_FACILITY         NFSDBG_PAGECACHE
31
32 static const struct nfs_pageio_ops nfs_pageio_read_ops;
33 static const struct rpc_call_ops nfs_read_partial_ops;
34 static const struct rpc_call_ops nfs_read_full_ops;
35
36 static struct kmem_cache *nfs_rdata_cachep;
37
38 struct nfs_read_header *nfs_readhdr_alloc(unsigned int pagecount)
39 {
40         struct nfs_read_header *p;
41
42         p = kmem_cache_zalloc(nfs_rdata_cachep, GFP_KERNEL);
43         if (p) {
44                 struct nfs_pgio_header *hdr = &p->header;
45                 struct nfs_read_data *data = &p->rpc_data;
46
47                 INIT_LIST_HEAD(&hdr->pages);
48                 INIT_LIST_HEAD(&data->list);
49                 data->npages = pagecount;
50                 data->header = hdr;
51                 if (pagecount <= ARRAY_SIZE(data->page_array))
52                         data->pagevec = data->page_array;
53                 else {
54                         data->pagevec = kcalloc(pagecount, sizeof(struct page *), GFP_KERNEL);
55                         if (!data->pagevec) {
56                                 kmem_cache_free(nfs_rdata_cachep, p);
57                                 p = NULL;
58                         }
59                 }
60         }
61         return p;
62 }
63
64 void nfs_readhdr_free(struct nfs_pgio_header *hdr)
65 {
66         struct nfs_read_header *rhdr = container_of(hdr, struct nfs_read_header, header);
67
68         kmem_cache_free(nfs_rdata_cachep, rhdr);
69 }
70
71 void nfs_readdata_release(struct nfs_read_data *rdata)
72 {
73         put_nfs_open_context(rdata->args.context);
74         if (rdata->pagevec != rdata->page_array)
75                 kfree(rdata->pagevec);
76         nfs_readhdr_free(rdata->header);
77 }
78
79 static
80 int nfs_return_empty_page(struct page *page)
81 {
82         zero_user(page, 0, PAGE_CACHE_SIZE);
83         SetPageUptodate(page);
84         unlock_page(page);
85         return 0;
86 }
87
88 static void nfs_readpage_truncate_uninitialised_page(struct nfs_read_data *data)
89 {
90         unsigned int remainder = data->args.count - data->res.count;
91         unsigned int base = data->args.pgbase + data->res.count;
92         unsigned int pglen;
93         struct page **pages;
94
95         if (data->res.eof == 0 || remainder == 0)
96                 return;
97         /*
98          * Note: "remainder" can never be negative, since we check for
99          *      this in the XDR code.
100          */
101         pages = &data->args.pages[base >> PAGE_CACHE_SHIFT];
102         base &= ~PAGE_CACHE_MASK;
103         pglen = PAGE_CACHE_SIZE - base;
104         for (;;) {
105                 if (remainder <= pglen) {
106                         zero_user(*pages, base, remainder);
107                         break;
108                 }
109                 zero_user(*pages, base, pglen);
110                 pages++;
111                 remainder -= pglen;
112                 pglen = PAGE_CACHE_SIZE;
113                 base = 0;
114         }
115 }
116
117 void nfs_pageio_init_read_mds(struct nfs_pageio_descriptor *pgio,
118                 struct inode *inode)
119 {
120         nfs_pageio_init(pgio, inode, &nfs_pageio_read_ops,
121                         NFS_SERVER(inode)->rsize, 0);
122 }
123
124 void nfs_pageio_reset_read_mds(struct nfs_pageio_descriptor *pgio)
125 {
126         pgio->pg_ops = &nfs_pageio_read_ops;
127         pgio->pg_bsize = NFS_SERVER(pgio->pg_inode)->rsize;
128 }
129 EXPORT_SYMBOL_GPL(nfs_pageio_reset_read_mds);
130
131 static void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
132                 struct inode *inode)
133 {
134         if (!pnfs_pageio_init_read(pgio, inode))
135                 nfs_pageio_init_read_mds(pgio, inode);
136 }
137
138 int nfs_readpage_async(struct nfs_open_context *ctx, struct inode *inode,
139                        struct page *page)
140 {
141         struct nfs_page *new;
142         unsigned int len;
143         struct nfs_pageio_descriptor pgio;
144
145         len = nfs_page_length(page);
146         if (len == 0)
147                 return nfs_return_empty_page(page);
148         new = nfs_create_request(ctx, inode, page, 0, len);
149         if (IS_ERR(new)) {
150                 unlock_page(page);
151                 return PTR_ERR(new);
152         }
153         if (len < PAGE_CACHE_SIZE)
154                 zero_user_segment(page, len, PAGE_CACHE_SIZE);
155
156         nfs_pageio_init_read(&pgio, inode);
157         nfs_pageio_add_request(&pgio, new);
158         nfs_pageio_complete(&pgio);
159         return 0;
160 }
161
162 static void nfs_readpage_release(struct nfs_page *req)
163 {
164         struct inode *d_inode = req->wb_context->dentry->d_inode;
165
166         if (PageUptodate(req->wb_page))
167                 nfs_readpage_to_fscache(d_inode, req->wb_page, 0);
168
169         unlock_page(req->wb_page);
170
171         dprintk("NFS: read done (%s/%Ld %d@%Ld)\n",
172                         req->wb_context->dentry->d_inode->i_sb->s_id,
173                         (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
174                         req->wb_bytes,
175                         (long long)req_offset(req));
176         nfs_release_request(req);
177 }
178
179 int nfs_initiate_read(struct rpc_clnt *clnt,
180                       struct nfs_read_data *data,
181                       const struct rpc_call_ops *call_ops)
182 {
183         struct inode *inode = data->header->inode;
184         int swap_flags = IS_SWAPFILE(inode) ? NFS_RPC_SWAPFLAGS : 0;
185         struct rpc_task *task;
186         struct rpc_message msg = {
187                 .rpc_argp = &data->args,
188                 .rpc_resp = &data->res,
189                 .rpc_cred = data->header->cred,
190         };
191         struct rpc_task_setup task_setup_data = {
192                 .task = &data->task,
193                 .rpc_client = clnt,
194                 .rpc_message = &msg,
195                 .callback_ops = call_ops,
196                 .callback_data = data,
197                 .workqueue = nfsiod_workqueue,
198                 .flags = RPC_TASK_ASYNC | swap_flags,
199         };
200
201         /* Set up the initial task struct. */
202         NFS_PROTO(inode)->read_setup(data, &msg);
203
204         dprintk("NFS: %5u initiated read call (req %s/%lld, %u bytes @ "
205                         "offset %llu)\n",
206                         data->task.tk_pid,
207                         inode->i_sb->s_id,
208                         (long long)NFS_FILEID(inode),
209                         data->args.count,
210                         (unsigned long long)data->args.offset);
211
212         task = rpc_run_task(&task_setup_data);
213         if (IS_ERR(task))
214                 return PTR_ERR(task);
215         rpc_put_task(task);
216         return 0;
217 }
218 EXPORT_SYMBOL_GPL(nfs_initiate_read);
219
220 /*
221  * Set up the NFS read request struct
222  */
223 static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data,
224                 unsigned int count, unsigned int offset)
225 {
226         struct inode *inode = data->header->inode;
227
228         data->header->req         = req;
229         data->header->inode       = inode;
230         data->header->cred        = req->wb_context->cred;
231
232         data->args.fh     = NFS_FH(inode);
233         data->args.offset = req_offset(req) + offset;
234         data->args.pgbase = req->wb_pgbase + offset;
235         data->args.pages  = data->pagevec;
236         data->args.count  = count;
237         data->args.context = get_nfs_open_context(req->wb_context);
238         data->args.lock_context = req->wb_lock_context;
239
240         data->res.fattr   = &data->fattr;
241         data->res.count   = count;
242         data->res.eof     = 0;
243         nfs_fattr_init(&data->fattr);
244 }
245
246 static int nfs_do_read(struct nfs_read_data *data,
247                 const struct rpc_call_ops *call_ops)
248 {
249         struct inode *inode = data->header->inode;
250
251         return nfs_initiate_read(NFS_CLIENT(inode), data, call_ops);
252 }
253
254 static int
255 nfs_do_multiple_reads(struct list_head *head,
256                 const struct rpc_call_ops *call_ops)
257 {
258         struct nfs_read_data *data;
259         int ret = 0;
260
261         while (!list_empty(head)) {
262                 int ret2;
263
264                 data = list_entry(head->next, struct nfs_read_data, list);
265                 list_del_init(&data->list);
266
267                 ret2 = nfs_do_read(data, call_ops);
268                 if (ret == 0)
269                         ret = ret2;
270         }
271         return ret;
272 }
273
274 static void
275 nfs_async_read_error(struct list_head *head)
276 {
277         struct nfs_page *req;
278
279         while (!list_empty(head)) {
280                 req = nfs_list_entry(head->next);
281                 nfs_list_remove_request(req);
282                 nfs_readpage_release(req);
283         }
284 }
285
286 /*
287  * Generate multiple requests to fill a single page.
288  *
289  * We optimize to reduce the number of read operations on the wire.  If we
290  * detect that we're reading a page, or an area of a page, that is past the
291  * end of file, we do not generate NFS read operations but just clear the
292  * parts of the page that would have come back zero from the server anyway.
293  *
294  * We rely on the cached value of i_size to make this determination; another
295  * client can fill pages on the server past our cached end-of-file, but we
296  * won't see the new data until our attribute cache is updated.  This is more
297  * or less conventional NFS client behavior.
298  */
299 static int nfs_pagein_multi(struct nfs_pageio_descriptor *desc, struct list_head *res)
300 {
301         struct nfs_page *req = nfs_list_entry(desc->pg_list.next);
302         struct page *page = req->wb_page;
303         struct nfs_read_header *rhdr;
304         struct nfs_read_data *data;
305         size_t rsize = desc->pg_bsize, nbytes;
306         unsigned int offset;
307         int requests = 0;
308         int ret = 0;
309
310         nfs_list_remove_request(req);
311
312         offset = 0;
313         nbytes = desc->pg_count;
314         do {
315                 size_t len = min(nbytes,rsize);
316
317                 rhdr = nfs_readhdr_alloc(1);
318                 if (!rhdr)
319                         goto out_bad;
320                 data = &rhdr->rpc_data;
321                 data->pagevec[0] = page;
322                 nfs_read_rpcsetup(req, data, len, offset);
323                 list_add(&data->list, res);
324                 requests++;
325                 nbytes -= len;
326                 offset += len;
327         } while(nbytes != 0);
328         atomic_set(&req->wb_complete, requests);
329         desc->pg_rpc_callops = &nfs_read_partial_ops;
330         return ret;
331 out_bad:
332         while (!list_empty(res)) {
333                 data = list_entry(res->next, struct nfs_read_data, list);
334                 list_del(&data->list);
335                 nfs_readdata_release(data);
336         }
337         nfs_readpage_release(req);
338         return -ENOMEM;
339 }
340
341 static int nfs_pagein_one(struct nfs_pageio_descriptor *desc, struct list_head *res)
342 {
343         struct nfs_page         *req;
344         struct page             **pages;
345         struct nfs_read_header  *rhdr;
346         struct nfs_read_data    *data;
347         struct list_head *head = &desc->pg_list;
348         int ret = 0;
349
350         rhdr = nfs_readhdr_alloc(nfs_page_array_len(desc->pg_base,
351                                                     desc->pg_count));
352         if (!rhdr) {
353                 nfs_async_read_error(head);
354                 ret = -ENOMEM;
355                 goto out;
356         }
357
358         data = &rhdr->rpc_data;
359         pages = data->pagevec;
360         while (!list_empty(head)) {
361                 req = nfs_list_entry(head->next);
362                 nfs_list_remove_request(req);
363                 nfs_list_add_request(req, &rhdr->header.pages);
364                 *pages++ = req->wb_page;
365         }
366         req = nfs_list_entry(rhdr->header.pages.next);
367
368         nfs_read_rpcsetup(req, data, desc->pg_count, 0);
369         list_add(&data->list, res);
370         desc->pg_rpc_callops = &nfs_read_full_ops;
371 out:
372         return ret;
373 }
374
375 int nfs_generic_pagein(struct nfs_pageio_descriptor *desc, struct list_head *head)
376 {
377         if (desc->pg_bsize < PAGE_CACHE_SIZE)
378                 return nfs_pagein_multi(desc, head);
379         return nfs_pagein_one(desc, head);
380 }
381
382 static int nfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
383 {
384         LIST_HEAD(head);
385         int ret;
386
387         ret = nfs_generic_pagein(desc, &head);
388         if (ret == 0)
389                 ret = nfs_do_multiple_reads(&head, desc->pg_rpc_callops);
390         return ret;
391 }
392
393 static const struct nfs_pageio_ops nfs_pageio_read_ops = {
394         .pg_test = nfs_generic_pg_test,
395         .pg_doio = nfs_generic_pg_readpages,
396 };
397
398 /*
399  * This is the callback from RPC telling us whether a reply was
400  * received or some error occurred (timeout or socket shutdown).
401  */
402 int nfs_readpage_result(struct rpc_task *task, struct nfs_read_data *data)
403 {
404         struct inode *inode = data->header->inode;
405         int status;
406
407         dprintk("NFS: %s: %5u, (status %d)\n", __func__, task->tk_pid,
408                         task->tk_status);
409
410         status = NFS_PROTO(inode)->read_done(task, data);
411         if (status != 0)
412                 return status;
413
414         nfs_add_stats(inode, NFSIOS_SERVERREADBYTES, data->res.count);
415
416         if (task->tk_status == -ESTALE) {
417                 set_bit(NFS_INO_STALE, &NFS_I(inode)->flags);
418                 nfs_mark_for_revalidate(inode);
419         }
420         return 0;
421 }
422
423 static void nfs_readpage_retry(struct rpc_task *task, struct nfs_read_data *data)
424 {
425         struct nfs_readargs *argp = &data->args;
426         struct nfs_readres *resp = &data->res;
427
428         if (resp->eof || resp->count == argp->count)
429                 return;
430
431         /* This is a short read! */
432         nfs_inc_stats(data->header->inode, NFSIOS_SHORTREAD);
433         /* Has the server at least made some progress? */
434         if (resp->count == 0)
435                 return;
436
437         /* Yes, so retry the read at the end of the data */
438         data->mds_offset += resp->count;
439         argp->offset += resp->count;
440         argp->pgbase += resp->count;
441         argp->count -= resp->count;
442         rpc_restart_call_prepare(task);
443 }
444
445 /*
446  * Handle a read reply that fills part of a page.
447  */
448 static void nfs_readpage_result_partial(struct rpc_task *task, void *calldata)
449 {
450         struct nfs_read_data *data = calldata;
451  
452         if (nfs_readpage_result(task, data) != 0)
453                 return;
454         if (task->tk_status < 0)
455                 return;
456
457         nfs_readpage_truncate_uninitialised_page(data);
458         nfs_readpage_retry(task, data);
459 }
460
461 static void nfs_readpage_release_partial(void *calldata)
462 {
463         struct nfs_read_data *data = calldata;
464         struct nfs_page *req = data->header->req;
465         struct page *page = req->wb_page;
466         int status = data->task.tk_status;
467
468         if (status < 0)
469                 set_bit(PG_PARTIAL_READ_FAILED, &req->wb_flags);
470
471         if (atomic_dec_and_test(&req->wb_complete)) {
472                 if (!test_bit(PG_PARTIAL_READ_FAILED, &req->wb_flags))
473                         SetPageUptodate(page);
474                 nfs_readpage_release(req);
475         }
476         nfs_readdata_release(data);
477 }
478
479 void nfs_read_prepare(struct rpc_task *task, void *calldata)
480 {
481         struct nfs_read_data *data = calldata;
482         NFS_PROTO(data->header->inode)->read_rpc_prepare(task, data);
483 }
484
485 static const struct rpc_call_ops nfs_read_partial_ops = {
486         .rpc_call_prepare = nfs_read_prepare,
487         .rpc_call_done = nfs_readpage_result_partial,
488         .rpc_release = nfs_readpage_release_partial,
489 };
490
491 static void nfs_readpage_set_pages_uptodate(struct nfs_read_data *data)
492 {
493         unsigned int count = data->res.count;
494         unsigned int base = data->args.pgbase;
495         struct page **pages;
496
497         if (data->res.eof)
498                 count = data->args.count;
499         if (unlikely(count == 0))
500                 return;
501         pages = &data->args.pages[base >> PAGE_CACHE_SHIFT];
502         base &= ~PAGE_CACHE_MASK;
503         count += base;
504         for (;count >= PAGE_CACHE_SIZE; count -= PAGE_CACHE_SIZE, pages++)
505                 SetPageUptodate(*pages);
506         if (count == 0)
507                 return;
508         /* Was this a short read? */
509         if (data->res.eof || data->res.count == data->args.count)
510                 SetPageUptodate(*pages);
511 }
512
513 /*
514  * This is the callback from RPC telling us whether a reply was
515  * received or some error occurred (timeout or socket shutdown).
516  */
517 static void nfs_readpage_result_full(struct rpc_task *task, void *calldata)
518 {
519         struct nfs_read_data *data = calldata;
520
521         if (nfs_readpage_result(task, data) != 0)
522                 return;
523         if (task->tk_status < 0)
524                 return;
525         /*
526          * Note: nfs_readpage_retry may change the values of
527          * data->args. In the multi-page case, we therefore need
528          * to ensure that we call nfs_readpage_set_pages_uptodate()
529          * first.
530          */
531         nfs_readpage_truncate_uninitialised_page(data);
532         nfs_readpage_set_pages_uptodate(data);
533         nfs_readpage_retry(task, data);
534 }
535
536 static void nfs_readpage_release_full(void *calldata)
537 {
538         struct nfs_read_data *data = calldata;
539         struct nfs_pgio_header *hdr = data->header;
540
541         while (!list_empty(&hdr->pages)) {
542                 struct nfs_page *req = nfs_list_entry(hdr->pages.next);
543
544                 nfs_list_remove_request(req);
545                 nfs_readpage_release(req);
546         }
547         nfs_readdata_release(calldata);
548 }
549
550 static const struct rpc_call_ops nfs_read_full_ops = {
551         .rpc_call_prepare = nfs_read_prepare,
552         .rpc_call_done = nfs_readpage_result_full,
553         .rpc_release = nfs_readpage_release_full,
554 };
555
556 /*
557  * Read a page over NFS.
558  * We read the page synchronously in the following case:
559  *  -   The error flag is set for this page. This happens only when a
560  *      previous async read operation failed.
561  */
562 int nfs_readpage(struct file *file, struct page *page)
563 {
564         struct nfs_open_context *ctx;
565         struct inode *inode = page->mapping->host;
566         int             error;
567
568         dprintk("NFS: nfs_readpage (%p %ld@%lu)\n",
569                 page, PAGE_CACHE_SIZE, page->index);
570         nfs_inc_stats(inode, NFSIOS_VFSREADPAGE);
571         nfs_add_stats(inode, NFSIOS_READPAGES, 1);
572
573         /*
574          * Try to flush any pending writes to the file..
575          *
576          * NOTE! Because we own the page lock, there cannot
577          * be any new pending writes generated at this point
578          * for this page (other pages can be written to).
579          */
580         error = nfs_wb_page(inode, page);
581         if (error)
582                 goto out_unlock;
583         if (PageUptodate(page))
584                 goto out_unlock;
585
586         error = -ESTALE;
587         if (NFS_STALE(inode))
588                 goto out_unlock;
589
590         if (file == NULL) {
591                 error = -EBADF;
592                 ctx = nfs_find_open_context(inode, NULL, FMODE_READ);
593                 if (ctx == NULL)
594                         goto out_unlock;
595         } else
596                 ctx = get_nfs_open_context(nfs_file_open_context(file));
597
598         if (!IS_SYNC(inode)) {
599                 error = nfs_readpage_from_fscache(ctx, inode, page);
600                 if (error == 0)
601                         goto out;
602         }
603
604         error = nfs_readpage_async(ctx, inode, page);
605
606 out:
607         put_nfs_open_context(ctx);
608         return error;
609 out_unlock:
610         unlock_page(page);
611         return error;
612 }
613
614 struct nfs_readdesc {
615         struct nfs_pageio_descriptor *pgio;
616         struct nfs_open_context *ctx;
617 };
618
619 static int
620 readpage_async_filler(void *data, struct page *page)
621 {
622         struct nfs_readdesc *desc = (struct nfs_readdesc *)data;
623         struct inode *inode = page->mapping->host;
624         struct nfs_page *new;
625         unsigned int len;
626         int error;
627
628         len = nfs_page_length(page);
629         if (len == 0)
630                 return nfs_return_empty_page(page);
631
632         new = nfs_create_request(desc->ctx, inode, page, 0, len);
633         if (IS_ERR(new))
634                 goto out_error;
635
636         if (len < PAGE_CACHE_SIZE)
637                 zero_user_segment(page, len, PAGE_CACHE_SIZE);
638         if (!nfs_pageio_add_request(desc->pgio, new)) {
639                 error = desc->pgio->pg_error;
640                 goto out_unlock;
641         }
642         return 0;
643 out_error:
644         error = PTR_ERR(new);
645 out_unlock:
646         unlock_page(page);
647         return error;
648 }
649
650 int nfs_readpages(struct file *filp, struct address_space *mapping,
651                 struct list_head *pages, unsigned nr_pages)
652 {
653         struct nfs_pageio_descriptor pgio;
654         struct nfs_readdesc desc = {
655                 .pgio = &pgio,
656         };
657         struct inode *inode = mapping->host;
658         unsigned long npages;
659         int ret = -ESTALE;
660
661         dprintk("NFS: nfs_readpages (%s/%Ld %d)\n",
662                         inode->i_sb->s_id,
663                         (long long)NFS_FILEID(inode),
664                         nr_pages);
665         nfs_inc_stats(inode, NFSIOS_VFSREADPAGES);
666
667         if (NFS_STALE(inode))
668                 goto out;
669
670         if (filp == NULL) {
671                 desc.ctx = nfs_find_open_context(inode, NULL, FMODE_READ);
672                 if (desc.ctx == NULL)
673                         return -EBADF;
674         } else
675                 desc.ctx = get_nfs_open_context(nfs_file_open_context(filp));
676
677         /* attempt to read as many of the pages as possible from the cache
678          * - this returns -ENOBUFS immediately if the cookie is negative
679          */
680         ret = nfs_readpages_from_fscache(desc.ctx, inode, mapping,
681                                          pages, &nr_pages);
682         if (ret == 0)
683                 goto read_complete; /* all pages were read */
684
685         nfs_pageio_init_read(&pgio, inode);
686
687         ret = read_cache_pages(mapping, pages, readpage_async_filler, &desc);
688
689         nfs_pageio_complete(&pgio);
690         npages = (pgio.pg_bytes_written + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
691         nfs_add_stats(inode, NFSIOS_READPAGES, npages);
692 read_complete:
693         put_nfs_open_context(desc.ctx);
694 out:
695         return ret;
696 }
697
698 int __init nfs_init_readpagecache(void)
699 {
700         nfs_rdata_cachep = kmem_cache_create("nfs_read_data",
701                                              sizeof(struct nfs_read_header),
702                                              0, SLAB_HWCACHE_ALIGN,
703                                              NULL);
704         if (nfs_rdata_cachep == NULL)
705                 return -ENOMEM;
706
707         return 0;
708 }
709
710 void nfs_destroy_readpagecache(void)
711 {
712         kmem_cache_destroy(nfs_rdata_cachep);
713 }