Merge git://git.linux-nfs.org/pub/linux/nfs-2.6
[pandora-kernel.git] / fs / splice.c
1 /*
2  * "splice": joining two ropes together by interweaving their strands.
3  *
4  * This is the "extended pipe" functionality, where a pipe is used as
5  * an arbitrary in-memory buffer. Think of a pipe as a small kernel
6  * buffer that you can use to transfer data from one end to the other.
7  *
8  * The traditional unix read/write is extended with a "splice()" operation
9  * that transfers data buffers to or from a pipe buffer.
10  *
11  * Named by Larry McVoy, original implementation from Linus, extended by
12  * Jens to support splicing to files, network, direct splicing, etc and
13  * fixing lots of bugs.
14  *
15  * Copyright (C) 2005-2006 Jens Axboe <axboe@suse.de>
16  * Copyright (C) 2005-2006 Linus Torvalds <torvalds@osdl.org>
17  * Copyright (C) 2006 Ingo Molnar <mingo@elte.hu>
18  *
19  */
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/pagemap.h>
23 #include <linux/pipe_fs_i.h>
24 #include <linux/mm_inline.h>
25 #include <linux/swap.h>
26 #include <linux/writeback.h>
27 #include <linux/buffer_head.h>
28 #include <linux/module.h>
29 #include <linux/syscalls.h>
30
31 /*
32  * Passed to the actors
33  */
34 struct splice_desc {
35         unsigned int len, total_len;    /* current and remaining length */
36         unsigned int flags;             /* splice flags */
37         struct file *file;              /* file to read/write */
38         loff_t pos;                     /* file position */
39 };
40
41 /*
42  * Attempt to steal a page from a pipe buffer. This should perhaps go into
43  * a vm helper function, it's already simplified quite a bit by the
44  * addition of remove_mapping(). If success is returned, the caller may
45  * attempt to reuse this page for another destination.
46  */
47 static int page_cache_pipe_buf_steal(struct pipe_inode_info *info,
48                                      struct pipe_buffer *buf)
49 {
50         struct page *page = buf->page;
51         struct address_space *mapping = page_mapping(page);
52
53         lock_page(page);
54
55         WARN_ON(!PageUptodate(page));
56
57         /*
58          * At least for ext2 with nobh option, we need to wait on writeback
59          * completing on this page, since we'll remove it from the pagecache.
60          * Otherwise truncate wont wait on the page, allowing the disk
61          * blocks to be reused by someone else before we actually wrote our
62          * data to them. fs corruption ensues.
63          */
64         wait_on_page_writeback(page);
65
66         if (PagePrivate(page))
67                 try_to_release_page(page, mapping_gfp_mask(mapping));
68
69         if (!remove_mapping(mapping, page)) {
70                 unlock_page(page);
71                 return 1;
72         }
73
74         buf->flags |= PIPE_BUF_FLAG_STOLEN | PIPE_BUF_FLAG_LRU;
75         return 0;
76 }
77
78 static void page_cache_pipe_buf_release(struct pipe_inode_info *info,
79                                         struct pipe_buffer *buf)
80 {
81         page_cache_release(buf->page);
82         buf->page = NULL;
83         buf->flags &= ~(PIPE_BUF_FLAG_STOLEN | PIPE_BUF_FLAG_LRU);
84 }
85
86 static void *page_cache_pipe_buf_map(struct file *file,
87                                      struct pipe_inode_info *info,
88                                      struct pipe_buffer *buf)
89 {
90         struct page *page = buf->page;
91         int err;
92
93         if (!PageUptodate(page)) {
94                 lock_page(page);
95
96                 /*
97                  * Page got truncated/unhashed. This will cause a 0-byte
98                  * splice, if this is the first page.
99                  */
100                 if (!page->mapping) {
101                         err = -ENODATA;
102                         goto error;
103                 }
104
105                 /*
106                  * Uh oh, read-error from disk.
107                  */
108                 if (!PageUptodate(page)) {
109                         err = -EIO;
110                         goto error;
111                 }
112
113                 /*
114                  * Page is ok afterall, fall through to mapping.
115                  */
116                 unlock_page(page);
117         }
118
119         return kmap(page);
120 error:
121         unlock_page(page);
122         return ERR_PTR(err);
123 }
124
125 static void page_cache_pipe_buf_unmap(struct pipe_inode_info *info,
126                                       struct pipe_buffer *buf)
127 {
128         kunmap(buf->page);
129 }
130
131 static void page_cache_pipe_buf_get(struct pipe_inode_info *info,
132                                     struct pipe_buffer *buf)
133 {
134         page_cache_get(buf->page);
135 }
136
137 static struct pipe_buf_operations page_cache_pipe_buf_ops = {
138         .can_merge = 0,
139         .map = page_cache_pipe_buf_map,
140         .unmap = page_cache_pipe_buf_unmap,
141         .release = page_cache_pipe_buf_release,
142         .steal = page_cache_pipe_buf_steal,
143         .get = page_cache_pipe_buf_get,
144 };
145
146 /*
147  * Pipe output worker. This sets up our pipe format with the page cache
148  * pipe buffer operations. Otherwise very similar to the regular pipe_writev().
149  */
150 static ssize_t move_to_pipe(struct pipe_inode_info *pipe, struct page **pages,
151                             int nr_pages, unsigned long len,
152                             unsigned int offset, unsigned int flags)
153 {
154         int ret, do_wakeup, i;
155
156         ret = 0;
157         do_wakeup = 0;
158         i = 0;
159
160         if (pipe->inode)
161                 mutex_lock(&pipe->inode->i_mutex);
162
163         for (;;) {
164                 if (!pipe->readers) {
165                         send_sig(SIGPIPE, current, 0);
166                         if (!ret)
167                                 ret = -EPIPE;
168                         break;
169                 }
170
171                 if (pipe->nrbufs < PIPE_BUFFERS) {
172                         int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
173                         struct pipe_buffer *buf = pipe->bufs + newbuf;
174                         struct page *page = pages[i++];
175                         unsigned long this_len;
176
177                         this_len = PAGE_CACHE_SIZE - offset;
178                         if (this_len > len)
179                                 this_len = len;
180
181                         buf->page = page;
182                         buf->offset = offset;
183                         buf->len = this_len;
184                         buf->ops = &page_cache_pipe_buf_ops;
185                         pipe->nrbufs++;
186                         if (pipe->inode)
187                                 do_wakeup = 1;
188
189                         ret += this_len;
190                         len -= this_len;
191                         offset = 0;
192                         if (!--nr_pages)
193                                 break;
194                         if (!len)
195                                 break;
196                         if (pipe->nrbufs < PIPE_BUFFERS)
197                                 continue;
198
199                         break;
200                 }
201
202                 if (flags & SPLICE_F_NONBLOCK) {
203                         if (!ret)
204                                 ret = -EAGAIN;
205                         break;
206                 }
207
208                 if (signal_pending(current)) {
209                         if (!ret)
210                                 ret = -ERESTARTSYS;
211                         break;
212                 }
213
214                 if (do_wakeup) {
215                         smp_mb();
216                         if (waitqueue_active(&pipe->wait))
217                                 wake_up_interruptible_sync(&pipe->wait);
218                         kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
219                         do_wakeup = 0;
220                 }
221
222                 pipe->waiting_writers++;
223                 pipe_wait(pipe);
224                 pipe->waiting_writers--;
225         }
226
227         if (pipe->inode)
228                 mutex_unlock(&pipe->inode->i_mutex);
229
230         if (do_wakeup) {
231                 smp_mb();
232                 if (waitqueue_active(&pipe->wait))
233                         wake_up_interruptible(&pipe->wait);
234                 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
235         }
236
237         while (i < nr_pages)
238                 page_cache_release(pages[i++]);
239
240         return ret;
241 }
242
243 static int
244 __generic_file_splice_read(struct file *in, loff_t *ppos,
245                            struct pipe_inode_info *pipe, size_t len,
246                            unsigned int flags)
247 {
248         struct address_space *mapping = in->f_mapping;
249         unsigned int loff, offset, nr_pages;
250         struct page *pages[PIPE_BUFFERS];
251         struct page *page;
252         pgoff_t index, end_index;
253         loff_t isize;
254         size_t bytes;
255         int i, error;
256
257         index = *ppos >> PAGE_CACHE_SHIFT;
258         loff = offset = *ppos & ~PAGE_CACHE_MASK;
259         nr_pages = (len + offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
260
261         if (nr_pages > PIPE_BUFFERS)
262                 nr_pages = PIPE_BUFFERS;
263
264         /*
265          * Initiate read-ahead on this page range. however, don't call into
266          * read-ahead if this is a non-zero offset (we are likely doing small
267          * chunk splice and the page is already there) for a single page.
268          */
269         if (!offset || nr_pages > 1)
270                 do_page_cache_readahead(mapping, in, index, nr_pages);
271
272         /*
273          * Now fill in the holes:
274          */
275         error = 0;
276         bytes = 0;
277         for (i = 0; i < nr_pages; i++, index++) {
278 find_page:
279                 /*
280                  * lookup the page for this index
281                  */
282                 page = find_get_page(mapping, index);
283                 if (!page) {
284                         /*
285                          * page didn't exist, allocate one
286                          */
287                         page = page_cache_alloc_cold(mapping);
288                         if (!page)
289                                 break;
290
291                         error = add_to_page_cache_lru(page, mapping, index,
292                                                 mapping_gfp_mask(mapping));
293                         if (unlikely(error)) {
294                                 page_cache_release(page);
295                                 break;
296                         }
297
298                         goto readpage;
299                 }
300
301                 /*
302                  * If the page isn't uptodate, we may need to start io on it
303                  */
304                 if (!PageUptodate(page)) {
305                         /*
306                          * If in nonblock mode then dont block on waiting
307                          * for an in-flight io page
308                          */
309                         if (flags & SPLICE_F_NONBLOCK)
310                                 break;
311
312                         lock_page(page);
313
314                         /*
315                          * page was truncated, stop here. if this isn't the
316                          * first page, we'll just complete what we already
317                          * added
318                          */
319                         if (!page->mapping) {
320                                 unlock_page(page);
321                                 page_cache_release(page);
322                                 break;
323                         }
324                         /*
325                          * page was already under io and is now done, great
326                          */
327                         if (PageUptodate(page)) {
328                                 unlock_page(page);
329                                 goto fill_it;
330                         }
331
332 readpage:
333                         /*
334                          * need to read in the page
335                          */
336                         error = mapping->a_ops->readpage(in, page);
337
338                         if (unlikely(error)) {
339                                 page_cache_release(page);
340                                 if (error == AOP_TRUNCATED_PAGE)
341                                         goto find_page;
342                                 break;
343                         }
344
345                         /*
346                          * i_size must be checked after ->readpage().
347                          */
348                         isize = i_size_read(mapping->host);
349                         end_index = (isize - 1) >> PAGE_CACHE_SHIFT;
350                         if (unlikely(!isize || index > end_index)) {
351                                 page_cache_release(page);
352                                 break;
353                         }
354
355                         /*
356                          * if this is the last page, see if we need to shrink
357                          * the length and stop
358                          */
359                         if (end_index == index) {
360                                 loff = PAGE_CACHE_SIZE - (isize & ~PAGE_CACHE_MASK);
361                                 if (bytes + loff > isize) {
362                                         page_cache_release(page);
363                                         break;
364                                 }
365                                 /*
366                                  * force quit after adding this page
367                                  */
368                                 nr_pages = i;
369                         }
370                 }
371 fill_it:
372                 pages[i] = page;
373                 bytes += PAGE_CACHE_SIZE - loff;
374                 loff = 0;
375         }
376
377         if (i)
378                 return move_to_pipe(pipe, pages, i, bytes, offset, flags);
379
380         return error;
381 }
382
383 /**
384  * generic_file_splice_read - splice data from file to a pipe
385  * @in:         file to splice from
386  * @pipe:       pipe to splice to
387  * @len:        number of bytes to splice
388  * @flags:      splice modifier flags
389  *
390  * Will read pages from given file and fill them into a pipe.
391  */
392 ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
393                                  struct pipe_inode_info *pipe, size_t len,
394                                  unsigned int flags)
395 {
396         ssize_t spliced;
397         int ret;
398
399         ret = 0;
400         spliced = 0;
401
402         while (len) {
403                 ret = __generic_file_splice_read(in, ppos, pipe, len, flags);
404
405                 if (ret < 0)
406                         break;
407                 else if (!ret) {
408                         if (spliced)
409                                 break;
410                         if (flags & SPLICE_F_NONBLOCK) {
411                                 ret = -EAGAIN;
412                                 break;
413                         }
414                 }
415
416                 *ppos += ret;
417                 len -= ret;
418                 spliced += ret;
419         }
420
421         if (spliced)
422                 return spliced;
423
424         return ret;
425 }
426
427 EXPORT_SYMBOL(generic_file_splice_read);
428
429 /*
430  * Send 'sd->len' bytes to socket from 'sd->file' at position 'sd->pos'
431  * using sendpage().
432  */
433 static int pipe_to_sendpage(struct pipe_inode_info *info,
434                             struct pipe_buffer *buf, struct splice_desc *sd)
435 {
436         struct file *file = sd->file;
437         loff_t pos = sd->pos;
438         unsigned int offset;
439         ssize_t ret;
440         void *ptr;
441         int more;
442
443         /*
444          * Sub-optimal, but we are limited by the pipe ->map. We don't
445          * need a kmap'ed buffer here, we just want to make sure we
446          * have the page pinned if the pipe page originates from the
447          * page cache.
448          */
449         ptr = buf->ops->map(file, info, buf);
450         if (IS_ERR(ptr))
451                 return PTR_ERR(ptr);
452
453         offset = pos & ~PAGE_CACHE_MASK;
454         more = (sd->flags & SPLICE_F_MORE) || sd->len < sd->total_len;
455
456         ret = file->f_op->sendpage(file, buf->page, offset, sd->len, &pos,more);
457
458         buf->ops->unmap(info, buf);
459         if (ret == sd->len)
460                 return 0;
461
462         return -EIO;
463 }
464
465 /*
466  * This is a little more tricky than the file -> pipe splicing. There are
467  * basically three cases:
468  *
469  *      - Destination page already exists in the address space and there
470  *        are users of it. For that case we have no other option that
471  *        copying the data. Tough luck.
472  *      - Destination page already exists in the address space, but there
473  *        are no users of it. Make sure it's uptodate, then drop it. Fall
474  *        through to last case.
475  *      - Destination page does not exist, we can add the pipe page to
476  *        the page cache and avoid the copy.
477  *
478  * If asked to move pages to the output file (SPLICE_F_MOVE is set in
479  * sd->flags), we attempt to migrate pages from the pipe to the output
480  * file address space page cache. This is possible if no one else has
481  * the pipe page referenced outside of the pipe and page cache. If
482  * SPLICE_F_MOVE isn't set, or we cannot move the page, we simply create
483  * a new page in the output file page cache and fill/dirty that.
484  */
485 static int pipe_to_file(struct pipe_inode_info *info, struct pipe_buffer *buf,
486                         struct splice_desc *sd)
487 {
488         struct file *file = sd->file;
489         struct address_space *mapping = file->f_mapping;
490         gfp_t gfp_mask = mapping_gfp_mask(mapping);
491         unsigned int offset;
492         struct page *page;
493         pgoff_t index;
494         char *src;
495         int ret;
496
497         /*
498          * make sure the data in this buffer is uptodate
499          */
500         src = buf->ops->map(file, info, buf);
501         if (IS_ERR(src))
502                 return PTR_ERR(src);
503
504         index = sd->pos >> PAGE_CACHE_SHIFT;
505         offset = sd->pos & ~PAGE_CACHE_MASK;
506
507         /*
508          * Reuse buf page, if SPLICE_F_MOVE is set.
509          */
510         if (sd->flags & SPLICE_F_MOVE) {
511                 /*
512                  * If steal succeeds, buf->page is now pruned from the vm
513                  * side (LRU and page cache) and we can reuse it. The page
514                  * will also be looked on successful return.
515                  */
516                 if (buf->ops->steal(info, buf))
517                         goto find_page;
518
519                 page = buf->page;
520                 if (add_to_page_cache(page, mapping, index, gfp_mask))
521                         goto find_page;
522
523                 if (!(buf->flags & PIPE_BUF_FLAG_LRU))
524                         lru_cache_add(page);
525         } else {
526 find_page:
527                 page = find_lock_page(mapping, index);
528                 if (!page) {
529                         ret = -ENOMEM;
530                         page = page_cache_alloc_cold(mapping);
531                         if (unlikely(!page))
532                                 goto out_nomem;
533
534                         /*
535                          * This will also lock the page
536                          */
537                         ret = add_to_page_cache_lru(page, mapping, index,
538                                                     gfp_mask);
539                         if (unlikely(ret))
540                                 goto out;
541                 }
542
543                 /*
544                  * We get here with the page locked. If the page is also
545                  * uptodate, we don't need to do more. If it isn't, we
546                  * may need to bring it in if we are not going to overwrite
547                  * the full page.
548                  */
549                 if (!PageUptodate(page)) {
550                         if (sd->len < PAGE_CACHE_SIZE) {
551                                 ret = mapping->a_ops->readpage(file, page);
552                                 if (unlikely(ret))
553                                         goto out;
554
555                                 lock_page(page);
556
557                                 if (!PageUptodate(page)) {
558                                         /*
559                                          * Page got invalidated, repeat.
560                                          */
561                                         if (!page->mapping) {
562                                                 unlock_page(page);
563                                                 page_cache_release(page);
564                                                 goto find_page;
565                                         }
566                                         ret = -EIO;
567                                         goto out;
568                                 }
569                         } else
570                                 SetPageUptodate(page);
571                 }
572         }
573
574         ret = mapping->a_ops->prepare_write(file, page, 0, sd->len);
575         if (ret == AOP_TRUNCATED_PAGE) {
576                 page_cache_release(page);
577                 goto find_page;
578         } else if (ret)
579                 goto out;
580
581         if (!(buf->flags & PIPE_BUF_FLAG_STOLEN)) {
582                 char *dst = kmap_atomic(page, KM_USER0);
583
584                 memcpy(dst + offset, src + buf->offset, sd->len);
585                 flush_dcache_page(page);
586                 kunmap_atomic(dst, KM_USER0);
587         }
588
589         ret = mapping->a_ops->commit_write(file, page, 0, sd->len);
590         if (ret == AOP_TRUNCATED_PAGE) {
591                 page_cache_release(page);
592                 goto find_page;
593         } else if (ret)
594                 goto out;
595
596         mark_page_accessed(page);
597         balance_dirty_pages_ratelimited(mapping);
598 out:
599         if (!(buf->flags & PIPE_BUF_FLAG_STOLEN))
600                 page_cache_release(page);
601
602         unlock_page(page);
603 out_nomem:
604         buf->ops->unmap(info, buf);
605         return ret;
606 }
607
608 typedef int (splice_actor)(struct pipe_inode_info *, struct pipe_buffer *,
609                            struct splice_desc *);
610
611 /*
612  * Pipe input worker. Most of this logic works like a regular pipe, the
613  * key here is the 'actor' worker passed in that actually moves the data
614  * to the wanted destination. See pipe_to_file/pipe_to_sendpage above.
615  */
616 static ssize_t move_from_pipe(struct pipe_inode_info *pipe, struct file *out,
617                               loff_t *ppos, size_t len, unsigned int flags,
618                               splice_actor *actor)
619 {
620         int ret, do_wakeup, err;
621         struct splice_desc sd;
622
623         ret = 0;
624         do_wakeup = 0;
625
626         sd.total_len = len;
627         sd.flags = flags;
628         sd.file = out;
629         sd.pos = *ppos;
630
631         if (pipe->inode)
632                 mutex_lock(&pipe->inode->i_mutex);
633
634         for (;;) {
635                 if (pipe->nrbufs) {
636                         struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
637                         struct pipe_buf_operations *ops = buf->ops;
638
639                         sd.len = buf->len;
640                         if (sd.len > sd.total_len)
641                                 sd.len = sd.total_len;
642
643                         err = actor(pipe, buf, &sd);
644                         if (err) {
645                                 if (!ret && err != -ENODATA)
646                                         ret = err;
647
648                                 break;
649                         }
650
651                         ret += sd.len;
652                         buf->offset += sd.len;
653                         buf->len -= sd.len;
654
655                         if (!buf->len) {
656                                 buf->ops = NULL;
657                                 ops->release(pipe, buf);
658                                 pipe->curbuf = (pipe->curbuf + 1) & (PIPE_BUFFERS - 1);
659                                 pipe->nrbufs--;
660                                 if (pipe->inode)
661                                         do_wakeup = 1;
662                         }
663
664                         sd.pos += sd.len;
665                         sd.total_len -= sd.len;
666                         if (!sd.total_len)
667                                 break;
668                 }
669
670                 if (pipe->nrbufs)
671                         continue;
672                 if (!pipe->writers)
673                         break;
674                 if (!pipe->waiting_writers) {
675                         if (ret)
676                                 break;
677                 }
678
679                 if (flags & SPLICE_F_NONBLOCK) {
680                         if (!ret)
681                                 ret = -EAGAIN;
682                         break;
683                 }
684
685                 if (signal_pending(current)) {
686                         if (!ret)
687                                 ret = -ERESTARTSYS;
688                         break;
689                 }
690
691                 if (do_wakeup) {
692                         smp_mb();
693                         if (waitqueue_active(&pipe->wait))
694                                 wake_up_interruptible_sync(&pipe->wait);
695                         kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
696                         do_wakeup = 0;
697                 }
698
699                 pipe_wait(pipe);
700         }
701
702         if (pipe->inode)
703                 mutex_unlock(&pipe->inode->i_mutex);
704
705         if (do_wakeup) {
706                 smp_mb();
707                 if (waitqueue_active(&pipe->wait))
708                         wake_up_interruptible(&pipe->wait);
709                 kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
710         }
711
712         return ret;
713 }
714
715 /**
716  * generic_file_splice_write - splice data from a pipe to a file
717  * @pipe:       pipe info
718  * @out:        file to write to
719  * @len:        number of bytes to splice
720  * @flags:      splice modifier flags
721  *
722  * Will either move or copy pages (determined by @flags options) from
723  * the given pipe inode to the given file.
724  *
725  */
726 ssize_t
727 generic_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
728                           loff_t *ppos, size_t len, unsigned int flags)
729 {
730         struct address_space *mapping = out->f_mapping;
731         ssize_t ret;
732
733         ret = move_from_pipe(pipe, out, ppos, len, flags, pipe_to_file);
734         if (ret > 0) {
735                 struct inode *inode = mapping->host;
736
737                 *ppos += ret;
738
739                 /*
740                  * If file or inode is SYNC and we actually wrote some data,
741                  * sync it.
742                  */
743                 if (unlikely((out->f_flags & O_SYNC) || IS_SYNC(inode))) {
744                         int err;
745
746                         mutex_lock(&inode->i_mutex);
747                         err = generic_osync_inode(inode, mapping,
748                                                   OSYNC_METADATA|OSYNC_DATA);
749                         mutex_unlock(&inode->i_mutex);
750
751                         if (err)
752                                 ret = err;
753                 }
754         }
755
756         return ret;
757 }
758
759 EXPORT_SYMBOL(generic_file_splice_write);
760
761 /**
762  * generic_splice_sendpage - splice data from a pipe to a socket
763  * @inode:      pipe inode
764  * @out:        socket to write to
765  * @len:        number of bytes to splice
766  * @flags:      splice modifier flags
767  *
768  * Will send @len bytes from the pipe to a network socket. No data copying
769  * is involved.
770  *
771  */
772 ssize_t generic_splice_sendpage(struct pipe_inode_info *pipe, struct file *out,
773                                 loff_t *ppos, size_t len, unsigned int flags)
774 {
775         return move_from_pipe(pipe, out, ppos, len, flags, pipe_to_sendpage);
776 }
777
778 EXPORT_SYMBOL(generic_splice_sendpage);
779
780 /*
781  * Attempt to initiate a splice from pipe to file.
782  */
783 static long do_splice_from(struct pipe_inode_info *pipe, struct file *out,
784                            loff_t *ppos, size_t len, unsigned int flags)
785 {
786         int ret;
787
788         if (unlikely(!out->f_op || !out->f_op->splice_write))
789                 return -EINVAL;
790
791         if (unlikely(!(out->f_mode & FMODE_WRITE)))
792                 return -EBADF;
793
794         ret = rw_verify_area(WRITE, out, ppos, len);
795         if (unlikely(ret < 0))
796                 return ret;
797
798         return out->f_op->splice_write(pipe, out, ppos, len, flags);
799 }
800
801 /*
802  * Attempt to initiate a splice from a file to a pipe.
803  */
804 static long do_splice_to(struct file *in, loff_t *ppos,
805                          struct pipe_inode_info *pipe, size_t len,
806                          unsigned int flags)
807 {
808         loff_t isize, left;
809         int ret;
810
811         if (unlikely(!in->f_op || !in->f_op->splice_read))
812                 return -EINVAL;
813
814         if (unlikely(!(in->f_mode & FMODE_READ)))
815                 return -EBADF;
816
817         ret = rw_verify_area(READ, in, ppos, len);
818         if (unlikely(ret < 0))
819                 return ret;
820
821         isize = i_size_read(in->f_mapping->host);
822         if (unlikely(*ppos >= isize))
823                 return 0;
824         
825         left = isize - *ppos;
826         if (unlikely(left < len))
827                 len = left;
828
829         return in->f_op->splice_read(in, ppos, pipe, len, flags);
830 }
831
832 long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
833                       size_t len, unsigned int flags)
834 {
835         struct pipe_inode_info *pipe;
836         long ret, bytes;
837         loff_t out_off;
838         umode_t i_mode;
839         int i;
840
841         /*
842          * We require the input being a regular file, as we don't want to
843          * randomly drop data for eg socket -> socket splicing. Use the
844          * piped splicing for that!
845          */
846         i_mode = in->f_dentry->d_inode->i_mode;
847         if (unlikely(!S_ISREG(i_mode) && !S_ISBLK(i_mode)))
848                 return -EINVAL;
849
850         /*
851          * neither in nor out is a pipe, setup an internal pipe attached to
852          * 'out' and transfer the wanted data from 'in' to 'out' through that
853          */
854         pipe = current->splice_pipe;
855         if (unlikely(!pipe)) {
856                 pipe = alloc_pipe_info(NULL);
857                 if (!pipe)
858                         return -ENOMEM;
859
860                 /*
861                  * We don't have an immediate reader, but we'll read the stuff
862                  * out of the pipe right after the move_to_pipe(). So set
863                  * PIPE_READERS appropriately.
864                  */
865                 pipe->readers = 1;
866
867                 current->splice_pipe = pipe;
868         }
869
870         /*
871          * Do the splice.
872          */
873         ret = 0;
874         bytes = 0;
875         out_off = 0;
876
877         while (len) {
878                 size_t read_len, max_read_len;
879
880                 /*
881                  * Do at most PIPE_BUFFERS pages worth of transfer:
882                  */
883                 max_read_len = min(len, (size_t)(PIPE_BUFFERS*PAGE_SIZE));
884
885                 ret = do_splice_to(in, ppos, pipe, max_read_len, flags);
886                 if (unlikely(ret < 0))
887                         goto out_release;
888
889                 read_len = ret;
890
891                 /*
892                  * NOTE: nonblocking mode only applies to the input. We
893                  * must not do the output in nonblocking mode as then we
894                  * could get stuck data in the internal pipe:
895                  */
896                 ret = do_splice_from(pipe, out, &out_off, read_len,
897                                      flags & ~SPLICE_F_NONBLOCK);
898                 if (unlikely(ret < 0))
899                         goto out_release;
900
901                 bytes += ret;
902                 len -= ret;
903
904                 /*
905                  * In nonblocking mode, if we got back a short read then
906                  * that was due to either an IO error or due to the
907                  * pagecache entry not being there. In the IO error case
908                  * the _next_ splice attempt will produce a clean IO error
909                  * return value (not a short read), so in both cases it's
910                  * correct to break out of the loop here:
911                  */
912                 if ((flags & SPLICE_F_NONBLOCK) && (read_len < max_read_len))
913                         break;
914         }
915
916         pipe->nrbufs = pipe->curbuf = 0;
917
918         return bytes;
919
920 out_release:
921         /*
922          * If we did an incomplete transfer we must release
923          * the pipe buffers in question:
924          */
925         for (i = 0; i < PIPE_BUFFERS; i++) {
926                 struct pipe_buffer *buf = pipe->bufs + i;
927
928                 if (buf->ops) {
929                         buf->ops->release(pipe, buf);
930                         buf->ops = NULL;
931                 }
932         }
933         pipe->nrbufs = pipe->curbuf = 0;
934
935         /*
936          * If we transferred some data, return the number of bytes:
937          */
938         if (bytes > 0)
939                 return bytes;
940
941         return ret;
942 }
943
944 EXPORT_SYMBOL(do_splice_direct);
945
946 /*
947  * Determine where to splice to/from.
948  */
949 static long do_splice(struct file *in, loff_t __user *off_in,
950                       struct file *out, loff_t __user *off_out,
951                       size_t len, unsigned int flags)
952 {
953         struct pipe_inode_info *pipe;
954         loff_t offset, *off;
955         long ret;
956
957         pipe = in->f_dentry->d_inode->i_pipe;
958         if (pipe) {
959                 if (off_in)
960                         return -ESPIPE;
961                 if (off_out) {
962                         if (out->f_op->llseek == no_llseek)
963                                 return -EINVAL;
964                         if (copy_from_user(&offset, off_out, sizeof(loff_t)))
965                                 return -EFAULT;
966                         off = &offset;
967                 } else
968                         off = &out->f_pos;
969
970                 ret = do_splice_from(pipe, out, off, len, flags);
971
972                 if (off_out && copy_to_user(off_out, off, sizeof(loff_t)))
973                         ret = -EFAULT;
974
975                 return ret;
976         }
977
978         pipe = out->f_dentry->d_inode->i_pipe;
979         if (pipe) {
980                 if (off_out)
981                         return -ESPIPE;
982                 if (off_in) {
983                         if (in->f_op->llseek == no_llseek)
984                                 return -EINVAL;
985                         if (copy_from_user(&offset, off_in, sizeof(loff_t)))
986                                 return -EFAULT;
987                         off = &offset;
988                 } else
989                         off = &in->f_pos;
990
991                 ret = do_splice_to(in, off, pipe, len, flags);
992
993                 if (off_in && copy_to_user(off_in, off, sizeof(loff_t)))
994                         ret = -EFAULT;
995
996                 return ret;
997         }
998
999         return -EINVAL;
1000 }
1001
1002 asmlinkage long sys_splice(int fd_in, loff_t __user *off_in,
1003                            int fd_out, loff_t __user *off_out,
1004                            size_t len, unsigned int flags)
1005 {
1006         long error;
1007         struct file *in, *out;
1008         int fput_in, fput_out;
1009
1010         if (unlikely(!len))
1011                 return 0;
1012
1013         error = -EBADF;
1014         in = fget_light(fd_in, &fput_in);
1015         if (in) {
1016                 if (in->f_mode & FMODE_READ) {
1017                         out = fget_light(fd_out, &fput_out);
1018                         if (out) {
1019                                 if (out->f_mode & FMODE_WRITE)
1020                                         error = do_splice(in, off_in,
1021                                                           out, off_out,
1022                                                           len, flags);
1023                                 fput_light(out, fput_out);
1024                         }
1025                 }
1026
1027                 fput_light(in, fput_in);
1028         }
1029
1030         return error;
1031 }
1032
1033 /*
1034  * Link contents of ipipe to opipe.
1035  */
1036 static int link_pipe(struct pipe_inode_info *ipipe,
1037                      struct pipe_inode_info *opipe,
1038                      size_t len, unsigned int flags)
1039 {
1040         struct pipe_buffer *ibuf, *obuf;
1041         int ret, do_wakeup, i, ipipe_first;
1042
1043         ret = do_wakeup = ipipe_first = 0;
1044
1045         /*
1046          * Potential ABBA deadlock, work around it by ordering lock
1047          * grabbing by inode address. Otherwise two different processes
1048          * could deadlock (one doing tee from A -> B, the other from B -> A).
1049          */
1050         if (ipipe->inode < opipe->inode) {
1051                 ipipe_first = 1;
1052                 mutex_lock(&ipipe->inode->i_mutex);
1053                 mutex_lock(&opipe->inode->i_mutex);
1054         } else {
1055                 mutex_lock(&opipe->inode->i_mutex);
1056                 mutex_lock(&ipipe->inode->i_mutex);
1057         }
1058
1059         for (i = 0;; i++) {
1060                 if (!opipe->readers) {
1061                         send_sig(SIGPIPE, current, 0);
1062                         if (!ret)
1063                                 ret = -EPIPE;
1064                         break;
1065                 }
1066                 if (ipipe->nrbufs - i) {
1067                         ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (PIPE_BUFFERS - 1));
1068
1069                         /*
1070                          * If we have room, fill this buffer
1071                          */
1072                         if (opipe->nrbufs < PIPE_BUFFERS) {
1073                                 int nbuf = (opipe->curbuf + opipe->nrbufs) & (PIPE_BUFFERS - 1);
1074
1075                                 /*
1076                                  * Get a reference to this pipe buffer,
1077                                  * so we can copy the contents over.
1078                                  */
1079                                 ibuf->ops->get(ipipe, ibuf);
1080
1081                                 obuf = opipe->bufs + nbuf;
1082                                 *obuf = *ibuf;
1083
1084                                 if (obuf->len > len)
1085                                         obuf->len = len;
1086
1087                                 opipe->nrbufs++;
1088                                 do_wakeup = 1;
1089                                 ret += obuf->len;
1090                                 len -= obuf->len;
1091
1092                                 if (!len)
1093                                         break;
1094                                 if (opipe->nrbufs < PIPE_BUFFERS)
1095                                         continue;
1096                         }
1097
1098                         /*
1099                          * We have input available, but no output room.
1100                          * If we already copied data, return that. If we
1101                          * need to drop the opipe lock, it must be ordered
1102                          * last to avoid deadlocks.
1103                          */
1104                         if ((flags & SPLICE_F_NONBLOCK) || !ipipe_first) {
1105                                 if (!ret)
1106                                         ret = -EAGAIN;
1107                                 break;
1108                         }
1109                         if (signal_pending(current)) {
1110                                 if (!ret)
1111                                         ret = -ERESTARTSYS;
1112                                 break;
1113                         }
1114                         if (do_wakeup) {
1115                                 smp_mb();
1116                                 if (waitqueue_active(&opipe->wait))
1117                                         wake_up_interruptible(&opipe->wait);
1118                                 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1119                                 do_wakeup = 0;
1120                         }
1121
1122                         opipe->waiting_writers++;
1123                         pipe_wait(opipe);
1124                         opipe->waiting_writers--;
1125                         continue;
1126                 }
1127
1128                 /*
1129                  * No input buffers, do the usual checks for available
1130                  * writers and blocking and wait if necessary
1131                  */
1132                 if (!ipipe->writers)
1133                         break;
1134                 if (!ipipe->waiting_writers) {
1135                         if (ret)
1136                                 break;
1137                 }
1138                 /*
1139                  * pipe_wait() drops the ipipe mutex. To avoid deadlocks
1140                  * with another process, we can only safely do that if
1141                  * the ipipe lock is ordered last.
1142                  */
1143                 if ((flags & SPLICE_F_NONBLOCK) || ipipe_first) {
1144                         if (!ret)
1145                                 ret = -EAGAIN;
1146                         break;
1147                 }
1148                 if (signal_pending(current)) {
1149                         if (!ret)
1150                                 ret = -ERESTARTSYS;
1151                         break;
1152                 }
1153
1154                 if (waitqueue_active(&ipipe->wait))
1155                         wake_up_interruptible_sync(&ipipe->wait);
1156                 kill_fasync(&ipipe->fasync_writers, SIGIO, POLL_OUT);
1157
1158                 pipe_wait(ipipe);
1159         }
1160
1161         mutex_unlock(&ipipe->inode->i_mutex);
1162         mutex_unlock(&opipe->inode->i_mutex);
1163
1164         if (do_wakeup) {
1165                 smp_mb();
1166                 if (waitqueue_active(&opipe->wait))
1167                         wake_up_interruptible(&opipe->wait);
1168                 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1169         }
1170
1171         return ret;
1172 }
1173
1174 /*
1175  * This is a tee(1) implementation that works on pipes. It doesn't copy
1176  * any data, it simply references the 'in' pages on the 'out' pipe.
1177  * The 'flags' used are the SPLICE_F_* variants, currently the only
1178  * applicable one is SPLICE_F_NONBLOCK.
1179  */
1180 static long do_tee(struct file *in, struct file *out, size_t len,
1181                    unsigned int flags)
1182 {
1183         struct pipe_inode_info *ipipe = in->f_dentry->d_inode->i_pipe;
1184         struct pipe_inode_info *opipe = out->f_dentry->d_inode->i_pipe;
1185
1186         /*
1187          * Link ipipe to the two output pipes, consuming as we go along.
1188          */
1189         if (ipipe && opipe)
1190                 return link_pipe(ipipe, opipe, len, flags);
1191
1192         return -EINVAL;
1193 }
1194
1195 asmlinkage long sys_tee(int fdin, int fdout, size_t len, unsigned int flags)
1196 {
1197         struct file *in;
1198         int error, fput_in;
1199
1200         if (unlikely(!len))
1201                 return 0;
1202
1203         error = -EBADF;
1204         in = fget_light(fdin, &fput_in);
1205         if (in) {
1206                 if (in->f_mode & FMODE_READ) {
1207                         int fput_out;
1208                         struct file *out = fget_light(fdout, &fput_out);
1209
1210                         if (out) {
1211                                 if (out->f_mode & FMODE_WRITE)
1212                                         error = do_tee(in, out, len, flags);
1213                                 fput_light(out, fput_out);
1214                         }
1215                 }
1216                 fput_light(in, fput_in);
1217         }
1218
1219         return error;
1220 }