From 7ecc9d37716d8d79b7e32c6efd0eec048eb7513b Mon Sep 17 00:00:00 2001
From: green <green>
Date: Thu, 27 Mar 2008 03:32:38 +0000
Subject: [PATCH] b=13371 r=shadow,nikita

Implement readv/writev (aio_read/aio_write) to greatly help NFS writes speed
on 2.6 kernels.
---
 lustre/ChangeLog                       |   5 +
 lustre/autoconf/lustre-core.m4         |  34 +--
 lustre/include/linux/lustre_compat25.h |  39 ----
 lustre/llite/file.c                    | 286 ++++++++++++++++++++++---
 lustre/llite/llite_internal.h          |   7 +-
 lustre/llite/llite_mmap.c              |  25 ++-
 lustre/llite/rw.c                      | 120 ++++++++---
 7 files changed, 394 insertions(+), 122 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index b1f2fa8f37..0161f78265 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -458,6 +458,11 @@ Bugzilla   : 15033
 Description: build for x2 fails
 Details    : fix compile issue on Cray systems.
 
+Severity   : enhancement
+Bugzilla   : 13371
+Description: implement readv/writev APIs(aio_read/aio_writes in newer kernels)
+Details    : This greatly improves speed of NFS writes on 2.6 kernels.
+
 --------------------------------------------------------------------------------
 
 2007-12-07         Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4
index 2437b56d5d..51ba04a690 100644
--- a/lustre/autoconf/lustre-core.m4
+++ b/lustre/autoconf/lustre-core.m4
@@ -964,37 +964,37 @@ LB_LINUX_TRY_COMPILE([
 EXTRA_KCFLAGS="$tmp_flags"
 ])
 
-# LC_GENERIC_FILE_WRITE
-# 2.6.19 introduce do_sync_write instead of
-# generic_file_write
-AC_DEFUN([LC_GENERIC_FILE_WRITE],
-[AC_MSG_CHECKING([use generic_file_write])
+# LC_FILE_WRITEV
+# 2.6.19 replaced writev with aio_write
+AC_DEFUN([LC_FILE_WRITEV],
+[AC_MSG_CHECKING([writev in fops])
 LB_LINUX_TRY_COMPILE([
         #include <linux/fs.h>
 ],[
-        int result = generic_file_read(NULL, NULL, 0, 0);
+        struct file_operations *fops;
+        fops->writev = NULL;
 ],[
         AC_MSG_RESULT(yes)
-        AC_DEFINE(HAVE_GENERIC_FILE_WRITE, 1,
-                [use generic_file_write])
+        AC_DEFINE(HAVE_FILE_WRITEV, 1,
+                [use fops->writev])
 ],[
 	AC_MSG_RESULT(NO)
 ])
 ])
 
 # LC_GENERIC_FILE_READ
-# 2.6.19 need to use do_sync_read instead of
-# generic_file_read
-AC_DEFUN([LC_GENERIC_FILE_READ],
-[AC_MSG_CHECKING([use generic_file_read])
+# 2.6.19 replaced readv with aio_read
+AC_DEFUN([LC_FILE_READV],
+[AC_MSG_CHECKING([readv in fops])
 LB_LINUX_TRY_COMPILE([
         #include <linux/fs.h>
 ],[
-        int result = generic_file_read(NULL, NULL, 0, 0);
+        struct file_operations *fops;
+        fops->readv = NULL;
 ],[
         AC_MSG_RESULT(yes)
-        AC_DEFINE(HAVE_GENERIC_FILE_READ, 1,
-                [use generic_file_read])
+        AC_DEFINE(HAVE_FILE_READV, 1,
+                [use fops->readv])
 ],[
         AC_MSG_RESULT(NO)
 ])
@@ -1310,8 +1310,8 @@ AC_DEFUN([LC_PROG_LINUX],
           # 2.6.19
           LC_INODE_BLKSIZE
           LC_VFS_READDIR_U64_INO
-          LC_GENERIC_FILE_READ
-          LC_GENERIC_FILE_WRITE
+          LC_FILE_WRITEV
+          LC_FILE_READV
 
           # 2.6.20
           LC_CANCEL_DIRTY_PAGE
diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h
index 2678bdc3fc..1ff876555b 100644
--- a/lustre/include/linux/lustre_compat25.h
+++ b/lustre/include/linux/lustre_compat25.h
@@ -510,45 +510,6 @@ ll_kern_mount(const char *fstype, int flags, const char *name, void *data)
 #define ll_kern_mount(fstype, flags, name, data) do_kern_mount((fstype), (flags), (name), (data))
 #endif
 
-#ifndef HAVE_GENERIC_FILE_READ
-static inline
-ssize_t
-generic_file_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
-{
-        struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = len };
-        struct kiocb kiocb;
-        ssize_t ret;
-
-        init_sync_kiocb(&kiocb, filp);
-        kiocb.ki_pos = *ppos;
-        kiocb.ki_left = len;
-
-        ret = generic_file_aio_read(&kiocb, &iov, 1, kiocb.ki_pos);
-        *ppos = kiocb.ki_pos;
-        return ret;
-}
-#endif
-
-#ifndef HAVE_GENERIC_FILE_WRITE
-static inline
-ssize_t
-generic_file_write(struct file *filp, const char __user *buf, size_t len, loff_t *ppos)
-{
-        struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = len };
-        struct kiocb kiocb;
-        ssize_t ret;
-
-        init_sync_kiocb(&kiocb, filp);
-        kiocb.ki_pos = *ppos;
-        kiocb.ki_left = len;
-
-        ret = generic_file_aio_write(&kiocb, &iov, 1, kiocb.ki_pos);
-        *ppos = kiocb.ki_pos;
-
-        return ret;
-}
-#endif
-
 #ifdef HAVE_STATFS_DENTRY_PARAM
 #define ll_do_statfs(sb, sfs) (sb)->s_op->statfs((sb)->s_root, (sfs))
 #else
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index 58082d36cd..e2c82f8e19 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -1125,9 +1125,10 @@ static int ll_is_file_contended(struct file *file)
         RETURN(0);
 }
 
-static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
-                                 const char *buf, size_t count,
-                                 loff_t start, loff_t end, int rw)
+static int ll_file_get_tree_lock_iov(struct ll_lock_tree *tree,
+                                     struct file *file, const struct iovec *iov,
+                                     unsigned long nr_segs,
+                                     loff_t start, loff_t end, int rw)
 {
         int append;
         int tree_locked = 0;
@@ -1150,7 +1151,7 @@ static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
                         GOTO(out, rc);
                 }
                 tree->lt_fd = LUSTRE_FPRIVATE(file);
-                rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+                rc = ll_tree_lock_iov(tree, node, iov, nr_segs, ast_flags);
                 if (rc == 0)
                         tree_locked = 1;
                 else if (rc == -EUSERS)
@@ -1163,9 +1164,79 @@ out:
         return rc;
 }
 
-static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
-                            loff_t *ppos)
+/* XXX: exact copy from kernel code (__generic_file_aio_write_nolock from rhel4)
+ */
+static size_t ll_file_get_iov_count(const struct iovec *iov, 
+                                     unsigned long *nr_segs)
+{
+        size_t count = 0;
+        unsigned long seg;
+
+        for (seg = 0; seg < *nr_segs; seg++) {
+                const struct iovec *iv = &iov[seg];
+
+                /*
+                 * If any segment has a negative length, or the cumulative
+                 * length ever wraps negative then return -EINVAL.
+                 */
+                count += iv->iov_len;
+                if (unlikely((ssize_t)(count|iv->iov_len) < 0))
+                        return -EINVAL;
+                if (access_ok(VERIFY_WRITE, iv->iov_base, iv->iov_len))
+                        continue;
+                if (seg == 0)
+                        return -EFAULT;
+                *nr_segs = seg;
+                count -= iv->iov_len;   /* This segment is no good */
+                break;
+        }
+        return count;
+}
+
+static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
+                           unsigned long *nrsegs_copy,
+                           struct iovec *iov_copy, size_t *offset,
+                           size_t size)
 {
+        int i;
+        const struct iovec *iov = *iov_out;
+        for (i = 0; i < *nr_segs;
+             i++) {
+                const struct iovec *iv = &iov[i];
+                struct iovec *ivc = &iov_copy[i];
+                *ivc = *iv;
+                if (i == 0) {
+                        ivc->iov_len -= *offset;
+                        ivc->iov_base += *offset;
+                }
+                if (ivc->iov_len > size) {
+                        ivc->iov_len = size;
+                        if (i == 0)
+                                *offset += size;
+                        else
+                                *offset = size;
+                        break;
+                }
+                size -= ivc->iov_len;
+        }
+        *iov_out += i;
+        *nr_segs -= i;
+        *nrsegs_copy = i + 1;
+
+        return 0;
+}
+
+#ifdef HAVE_FILE_READV
+static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
+                              unsigned long nr_segs, loff_t *ppos)
+{
+#else
+static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
+                                unsigned long nr_segs, loff_t pos)
+{
+        struct file *file = iocb->ki_filp;
+        loff_t *ppos = &iocb->ki_pos;
+#endif
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
@@ -1177,9 +1248,13 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         loff_t end;
         ssize_t retval, chunk, sum = 0;
         int tree_locked;
-
+        struct iovec *iov_copy = NULL;
+        unsigned long nrsegs_copy, nrsegs_orig = 0;
+        size_t count, iov_offset = 0;
         __u64 kms;
         ENTRY;
+
+        count = ll_file_get_iov_count(iov, &nr_segs);
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
                inode->i_ino, inode->i_generation, inode, count, *ppos);
         /* "If nbyte is 0, read() will return 0 and have no other results."
@@ -1207,12 +1282,23 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                         count = i_size_read(inode) - *ppos;
                 /* Make sure to correctly adjust the file pos pointer for
                  * EFAULT case */
-                notzeroed = clear_user(buf, count);
-                count -= notzeroed;
-                *ppos += count;
-                if (!count)
+                for (nrsegs_copy = 0; nrsegs_copy < nr_segs; nrsegs_copy++) {
+                        const struct iovec *iv = &iov[nrsegs_copy];
+
+                        if (count < iv->iov_len)
+                                chunk = count;
+                        else
+                                chunk = iv->iov_len;
+                        notzeroed = clear_user(iv->iov_base, chunk);
+                        sum += (chunk - notzeroed);
+                        count -= (chunk - notzeroed);
+                        if (notzeroed || !count)
+                                break;
+                }
+                *ppos += sum;
+                if (!sum)
                         RETURN(-EFAULT);
-                RETURN(count);
+                RETURN(sum);
         }
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
@@ -1228,12 +1314,34 @@ repeat:
                 /* and chunk shouldn't be too large even if striping is wide */
                 if (end - *ppos > sbi->ll_max_rw_chunk)
                         end = *ppos + sbi->ll_max_rw_chunk - 1;
+
+                chunk = end - *ppos + 1;
+                if ((count == chunk) && (iov_offset == 0)) {
+                        if (iov_copy)
+                                OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
+                        iov_copy = (struct iovec *)iov;
+                        nrsegs_copy = nr_segs;
+                } else {
+                        if (!iov_copy) {
+                                nrsegs_orig = nr_segs;
+                                OBD_ALLOC(iov_copy, sizeof(iov) * nr_segs);
+                                if (!iov_copy)
+                                        GOTO(out, retval = -ENOMEM); 
+                        }
+
+                        iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
+                                        &iov_offset, chunk);
+                }
+ 
         } else {
                 end = *ppos + count - 1;
+                iov_copy = (struct iovec *)iov;
+                nrsegs_copy = nr_segs;
         }
 
-        tree_locked = ll_file_get_tree_lock(&tree, file, buf,
-                                            count, *ppos, end, READ);
+        tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
+                                                nrsegs_copy, *ppos, end, READ);
         if (tree_locked < 0)
                 GOTO(out, retval = tree_locked);
 
@@ -1302,14 +1410,19 @@ repeat:
 
                 /* BUG: 5972 */
                 file_accessed(file);
-                retval = generic_file_read(file, buf, chunk, ppos);
+#ifdef HAVE_FILE_READV
+                retval = generic_file_readv(file, iov_copy, nrsegs_copy, ppos);
+#else
+                retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
+                                               *ppos);
+#endif
                 ll_tree_unlock(&tree);
         } else {
-                retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+                retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
+                                             READ, chunk);
         }
         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
         if (retval > 0) {
-                buf += retval;
                 count -= retval;
                 sum += retval;
                 if (retval == chunk && count > 0)
@@ -1320,15 +1433,48 @@ repeat:
         if (ra != 0)
                 ll_ra_read_ex(file, &bead);
         retval = (sum > 0) ? sum : retval;
+
+        if (iov_copy && iov_copy != iov)
+                OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
         RETURN(retval);
 }
 
+static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
+                            loff_t *ppos)
+{
+        struct iovec local_iov = { .iov_base = (void __user *)buf,
+                                   .iov_len = count };
+#ifdef HAVE_FILE_READV
+        return ll_file_readv(file, &local_iov, 1, ppos);
+#else
+        struct kiocb kiocb;
+        ssize_t ret;
+
+        init_sync_kiocb(&kiocb, file);
+        kiocb.ki_pos = *ppos;
+        kiocb.ki_left = count;
+
+        ret = ll_file_aio_read(&kiocb, &local_iov, 1, kiocb.ki_pos);
+        *ppos = kiocb.ki_pos;
+        return ret;
+#endif
+}
+
 /*
  * Write to a file (through the page cache).
  */
-static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
-                             loff_t *ppos)
+#ifdef HAVE_FILE_WRITEV
+static ssize_t ll_file_writev(struct file *file, const struct iovec *iov,
+                              unsigned long nr_segs, loff_t *ppos)
 {
+#else /* AIO stuff */
+static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
+                                 unsigned long nr_segs, loff_t pos)
+{
+        struct file *file = iocb->ki_filp;
+        loff_t *ppos = &iocb->ki_pos;
+#endif
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
@@ -1337,8 +1483,13 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
         int tree_locked;
+        struct iovec *iov_copy = NULL;
+        unsigned long nrsegs_copy, nrsegs_orig = 0;
+        size_t count, iov_offset = 0;
         ENTRY;
 
+        count = ll_file_get_iov_count(iov, &nr_segs);
+
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
                inode->i_ino, inode->i_generation, inode, count, *ppos);
         
@@ -1365,6 +1516,8 @@ repeat:
         if (file->f_flags & O_APPEND) {
                 lock_start = 0;
                 lock_end = OBD_OBJECT_EOF;
+                iov_copy = (struct iovec *)iov;
+                nrsegs_copy = nr_segs;
         } else if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
                 end = *ppos;
@@ -1380,13 +1533,34 @@ repeat:
                         end = *ppos + sbi->ll_max_rw_chunk - 1;
                 lock_start = *ppos;
                 lock_end = end;
+                chunk = end - *ppos + 1;
+                if ((count == chunk) && (iov_offset == 0)) {
+                        if (iov_copy)
+                                OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
+                        iov_copy = (struct iovec *)iov;
+                        nrsegs_copy = nr_segs;
+                } else {
+                        if (!iov_copy) {
+                                nrsegs_orig = nr_segs;
+                                OBD_ALLOC(iov_copy, sizeof(iov) * nr_segs);
+                                if (!iov_copy)
+                                        GOTO(out, retval = -ENOMEM); 
+                        }
+
+                        iov_copy_update(&nr_segs, &iov, &nrsegs_copy, iov_copy,
+                                        &iov_offset, chunk);
+                }
         } else {
                 lock_start = *ppos;
-                lock_end = *ppos + count - 1;
+                lock_end = end;
+                iov_copy = (struct iovec *)iov;
+                nrsegs_copy = nr_segs;
         }
 
-        tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
-                                            lock_start, lock_end, WRITE);
+        tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
+                                                nrsegs_copy, lock_start,
+                                                lock_end, WRITE);
         if (tree_locked < 0)
                 GOTO(out, retval = tree_locked);
 
@@ -1411,10 +1585,15 @@ repeat:
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, chunk, *ppos);
         if (tree_locked)
-                retval = generic_file_write(file, buf, chunk, ppos);
+#ifdef HAVE_FILE_WRITEV
+                retval = generic_file_writev(file, iov_copy, nrsegs_copy, ppos);
+#else
+                retval = generic_file_aio_write(iocb, iov_copy, nrsegs_copy,
+                                                *ppos);
+#endif
         else
-                retval = ll_file_lockless_io(file, (char*)buf, chunk,
-                                             ppos, WRITE);
+                retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
+                                             ppos, WRITE, chunk);
         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
 out_unlock:
@@ -1423,7 +1602,6 @@ out_unlock:
 
 out:
         if (retval > 0) {
-                buf += retval;
                 count -= retval;
                 sum += retval;
                 if (retval == chunk && count > 0)
@@ -1432,12 +1610,38 @@ out:
 
         up(&ll_i2info(inode)->lli_write_sem);
 
+        if (iov_copy && iov_copy != iov)
+                OBD_FREE(iov_copy, sizeof(iov) * nrsegs_orig);
+
         retval = (sum > 0) ? sum : retval;
         ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_WRITE_BYTES,
                            retval > 0 ? retval : 0);
         RETURN(retval);
 }
 
+static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
+                             loff_t *ppos)
+{
+        struct iovec local_iov = { .iov_base = (void __user *)buf,
+                                   .iov_len = count };
+
+#ifdef HAVE_FILE_WRITEV
+        return ll_file_writev(file, &local_iov, 1, ppos);
+#else
+        struct kiocb kiocb;
+        ssize_t ret;
+
+        init_sync_kiocb(&kiocb, file);
+        kiocb.ki_pos = *ppos;
+        kiocb.ki_left = count;
+
+        ret = ll_file_aio_write(&kiocb, &local_iov, 1, kiocb.ki_pos);
+        *ppos = kiocb.ki_pos;
+
+        return ret;
+#endif
+}
+
 /*
  * Send file content (through pagecache) somewhere with helper
  */
@@ -2660,7 +2864,17 @@ check_capabilities:
 /* -o localflock - only provides locally consistent flock locks */
 struct file_operations ll_file_operations = {
         .read           = ll_file_read,
+#ifdef HAVE_FILE_READV
+        .readv          = ll_file_readv,
+#else
+        .aio_read       = ll_file_aio_read,
+#endif
         .write          = ll_file_write,
+#ifdef HAVE_FILE_WRITEV
+        .writev         = ll_file_writev,
+#else
+        .aio_write      = ll_file_aio_write,
+#endif
         .ioctl          = ll_file_ioctl,
         .open           = ll_file_open,
         .release        = ll_file_release,
@@ -2674,7 +2888,17 @@ struct file_operations ll_file_operations = {
 
 struct file_operations ll_file_operations_flock = {
         .read           = ll_file_read,
+#ifdef HAVE_FILE_READV
+        .readv          = ll_file_readv,
+#else
+        .aio_read       = ll_file_aio_read,
+#endif
         .write          = ll_file_write,
+#ifdef HAVE_FILE_WRITEV
+        .writev         = ll_file_writev,
+#else   
+        .aio_write      = ll_file_aio_write,
+#endif
         .ioctl          = ll_file_ioctl,
         .open           = ll_file_open,
         .release        = ll_file_release,
@@ -2693,7 +2917,17 @@ struct file_operations ll_file_operations_flock = {
 /* These are for -o noflock - to return ENOSYS on flock calls */
 struct file_operations ll_file_operations_noflock = {
         .read           = ll_file_read,
+#ifdef HAVE_FILE_READV
+        .readv          = ll_file_readv,
+#else
+        .aio_read       = ll_file_aio_read,
+#endif
         .write          = ll_file_write,
+#ifdef HAVE_FILE_WRITEV
+        .writev         = ll_file_writev,
+#else   
+        .aio_write      = ll_file_aio_write,
+#endif
         .ioctl          = ll_file_ioctl,
         .open           = ll_file_open,
         .release        = ll_file_release,
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index be76c003f4..5774f86197 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -558,7 +558,8 @@ void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
 void ll_ra_accounting(struct ll_async_page *llap,struct address_space *mapping);
 void ll_truncate(struct inode *inode);
 int ll_file_punch(struct inode *, loff_t, int);
-ssize_t ll_file_lockless_io(struct file *, char *, size_t, loff_t *, int);
+ssize_t ll_file_lockless_io(struct file *, const struct iovec *,
+                            unsigned long, loff_t *, int, ssize_t);
 void ll_clear_file_contended(struct inode*);
 int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
 
@@ -731,6 +732,10 @@ struct ll_lock_tree_node * ll_node_from_inode(struct inode *inode, __u64 start,
 int ll_tree_lock(struct ll_lock_tree *tree,
                  struct ll_lock_tree_node *first_node,
                  const char *buf, size_t count, int ast_flags);
+int ll_tree_lock_iov(struct ll_lock_tree *tree,
+                     struct ll_lock_tree_node *first_node,
+                     const struct iovec *iov, unsigned long nr_segs,
+                     int ast_flags);
 int ll_tree_unlock(struct ll_lock_tree *tree);
 
 #define    ll_s2sbi(sb)        (s2lsi(sb)->lsi_llsbi)
diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c
index d2e0d66e07..275ea3d915 100644
--- a/lustre/llite/llite_mmap.c
+++ b/lustre/llite/llite_mmap.c
@@ -216,12 +216,13 @@ int ll_tree_unlock(struct ll_lock_tree *tree)
         RETURN(rc);
 }
 
-int ll_tree_lock(struct ll_lock_tree *tree,
+int ll_tree_lock_iov(struct ll_lock_tree *tree,
                  struct ll_lock_tree_node *first_node,
-                 const char *buf, size_t count, int ast_flags)
+                 const struct iovec *iov, unsigned long nr_segs, int ast_flags)
 {
         struct ll_lock_tree_node *node;
         int rc = 0;
+        unsigned long seg;
         ENTRY;
 
         tree->lt_root.rb_node = NULL;
@@ -232,9 +233,13 @@ int ll_tree_lock(struct ll_lock_tree *tree,
         /* To avoid such subtle deadlock case: client1 try to read file1 to
          * mmapped file2, on the same time, client2 try to read file2 to
          * mmapped file1.*/
-        rc = lt_get_mmap_locks(tree, (unsigned long)buf, count);
-        if (rc)
-                GOTO(out, rc);
+        for (seg = 0; seg < nr_segs; seg++) {
+                const struct iovec *iv = &iov[seg];
+                rc = lt_get_mmap_locks(tree, (unsigned long)iv->iov_base,
+                                       iv->iov_len);
+                if (rc)
+                        GOTO(out, rc);
+        }
 
         while ((node = lt_least_node(tree))) {
                 struct inode *inode = node->lt_inode;
@@ -254,6 +259,16 @@ out:
         RETURN(rc);
 }
 
+int ll_tree_lock(struct ll_lock_tree *tree,
+                 struct ll_lock_tree_node *first_node,
+                 const char *buf, size_t count, int ast_flags)
+{
+        struct iovec local_iov = { .iov_base = (void __user *)buf,
+                                   .iov_len = count };
+
+        return ll_tree_lock_iov(tree, first_node, &local_iov, 1, ast_flags);
+}
+
 static ldlm_mode_t mode_from_vma(struct vm_area_struct *vma)
 {
         /* we only want to hold PW locks if the mmap() can generate
diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c
index 58f0592f59..d6c9733d2f 100644
--- a/lustre/llite/rw.c
+++ b/lustre/llite/rw.c
@@ -1914,7 +1914,8 @@ err:
  }
 
 static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
-                                  char *buf, loff_t pos, size_t count,
+                                  const struct iovec *iov, unsigned long nsegs,
+                                  ssize_t iov_offset, loff_t pos, size_t count,
                                   int rw)
 {
         ssize_t amount = 0;
@@ -1924,41 +1925,55 @@ static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
         ENTRY;
 
         for (i = 0; i < numpages; i++) {
-                unsigned offset, bytes, left;
+                unsigned offset, bytes, left = 0;
                 char *vaddr;
 
                 vaddr = kmap(pages[i]);
                 offset = pos & (CFS_PAGE_SIZE - 1);
                 bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
                 LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
-                               "buf = %p, bytes = %u\n",
+                               "bytes = %u\n",
                                (rw == WRITE) ? "CFU" : "CTU",
-                               vaddr + offset, buf, bytes);
-                if (rw == WRITE) {
-                        left = copy_from_user(vaddr + offset, buf, bytes);
-                        if (updatechecksum) {
-                                struct ll_async_page *llap;
-
-                                llap = llap_cast_private(pages[i]);
-                                llap->llap_checksum =
-                                        init_checksum(OSC_DEFAULT_CKSUM);
-                                llap->llap_checksum =
-                                        compute_checksum(llap->llap_checksum,
-                                                         vaddr, CFS_PAGE_SIZE,
-                                                         OSC_DEFAULT_CKSUM);
+                               vaddr + offset, bytes);
+                while (bytes > 0 && !left && nsegs) {
+                        unsigned copy = min_t(ssize_t, bytes,
+                                               iov->iov_len - iov_offset);
+                        if (rw == WRITE) {
+                                left = copy_from_user(vaddr + offset,
+                                                      iov->iov_base +iov_offset,
+                                                      copy);
+                                if (updatechecksum) {
+                                        struct ll_async_page *llap;
+
+                                        llap = llap_cast_private(pages[i]);
+                                        llap->llap_checksum =
+                                                init_checksum(OSC_DEFAULT_CKSUM);
+                                        llap->llap_checksum =
+                                           compute_checksum(llap->llap_checksum,
+                                                            vaddr,CFS_PAGE_SIZE,
+                                                            OSC_DEFAULT_CKSUM);
+                                }
+                        } else {
+                                left = copy_to_user(iov->iov_base + iov_offset,
+                                                    vaddr + offset, copy);
+                        }
+                        
+                        amount += copy;
+                        count -= copy;
+                        pos += copy;
+                        iov_offset += copy;
+                        bytes -= copy;
+                        if (iov_offset == iov->iov_len) {
+                                iov_offset = 0;
+                                iov++;
+                                nsegs--;
                         }
-                } else {
-                        left = copy_to_user(buf, vaddr + offset, bytes);
                 }
                 kunmap(pages[i]);
-                amount += bytes;
                 if (left) {
                         amount -= left;
                         break;
                 }
-                buf += bytes;
-                count -= bytes;
-                pos += bytes;
         }
         if (amount == 0)
                 RETURN(-EFAULT);
@@ -2030,8 +2045,25 @@ out:
         RETURN(rc);
 }
 
-ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
-                                   loff_t *ppos, int rw)
+/* Advance through passed iov, adjust iov pointer as necessary and return
+ * starting offset in individual entry we are pointing at. Also reduce
+ * nr_segs as needed */
+static ssize_t ll_iov_advance(const struct iovec **iov, unsigned long *nr_segs,
+                              ssize_t offset)
+{
+        while (*nr_segs > 0) {
+                if ((*iov)->iov_len > offset)
+                        return ((*iov)->iov_len - offset);
+                offset -= (*iov)->iov_len;
+                (*iov)++;
+                (*nr_segs)--;
+        }
+        return 0;
+}
+
+ssize_t ll_file_lockless_io(struct file *file, const struct iovec *iov,
+                            unsigned long nr_segs,
+                            loff_t *ppos, int rw, ssize_t count)
 {
         loff_t pos;
         struct inode *inode = file->f_dentry->d_inode;
@@ -2039,6 +2071,9 @@ ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
         int max_pages;
         size_t amount = 0;
         unsigned long first, last;
+        const struct iovec *iv = &iov[0];
+        unsigned long nsegs = nr_segs;
+        unsigned long offset = 0;
         ENTRY;
 
         if (rw == READ) {
@@ -2061,6 +2096,7 @@ ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
                 if (rc)
                         GOTO(out, rc);
         }
+
         pos = *ppos;
         first = pos >> CFS_PAGE_SHIFT;
         last = (pos + count - 1) >> CFS_PAGE_SHIFT;
@@ -2082,10 +2118,12 @@ ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
                         break;
                 }
                 if (rw == WRITE) {
-                        rc = ll_file_copy_pages(pages, pages_for_io, buf,
-                                                pos + amount, bytes, rw);
+                        rc = ll_file_copy_pages(pages, pages_for_io, iv, nsegs,
+                                                offset, pos + amount, bytes,
+                                                rw);
                         if (rc < 0)
                                 GOTO(put_pages, rc);
+                        offset = ll_iov_advance(&iv, &nsegs, offset + rc);
                         bytes = rc;
                 }
                 rc = ll_file_oig_pages(inode, pages, pages_for_io,
@@ -2093,31 +2131,45 @@ ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
                 if (rc)
                         GOTO(put_pages, rc);
                 if (rw == READ) {
-                        rc = ll_file_copy_pages(pages, pages_for_io, buf,
-                                                pos + amount, bytes, rw);
+                        rc = ll_file_copy_pages(pages, pages_for_io, iv, nsegs,
+                                                offset, pos + amount, bytes, rw);
                         if (rc < 0)
                                 GOTO(put_pages, rc);
+                        offset = ll_iov_advance(&iv, &nsegs, offset + rc);
                         bytes = rc;
                 }
                 amount += bytes;
-                buf += bytes;
 put_pages:
                 ll_file_put_pages(pages, pages_for_io);
                 first += pages_for_io;
                 /* a short read/write check */
                 if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
                         break;
+                /* Check if we are out of userspace buffers. (how that could
+                   happen?) */
+                if (nsegs == 0)
+                        break;
         }
         /* NOTE: don't update i_size and KMS in absence of LDLM locks even
          * write makes the file large */
         file_accessed(file);
         if (rw == READ && amount < count && rc == 0) {
                 unsigned long not_cleared;
-
-                not_cleared = clear_user(buf, count - amount);
-                amount = count - not_cleared;
-                if (not_cleared)
-                        rc = -EFAULT;
+                
+                while (nsegs > 0) {
+                        ssize_t to_clear = min_t(ssize_t, count - amount,
+                                                 iv->iov_len - offset);
+                        not_cleared = clear_user(iv->iov_base + offset,
+                                                 to_clear);
+                        amount += to_clear - not_cleared;
+                        if (not_cleared) {
+                                rc = -EFAULT;
+                                break;
+                        }
+                        offset = 0;
+                        iv++;
+                        nsegs--;
+                }
         }
         if (amount > 0) {
                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
-- 
GitLab