From 5c30b3001ce98f72364dc5baef151694594d4c12 Mon Sep 17 00:00:00 2001
From: zam <zam>
Date: Thu, 30 Aug 2007 19:23:31 +0000
Subject: [PATCH] b=11270 i=adilger i=nikita

file contention detection and lockless i/o implementation
for contended files.
---
 lustre/ChangeLog                   |   6 +
 lustre/include/linux/lustre_lite.h |   2 +
 lustre/include/lustre_dlm.h        |  19 ++
 lustre/ldlm/ldlm_extent.c          |  83 ++++++-
 lustre/ldlm/ldlm_resource.c        |  24 ++
 lustre/llite/file.c                | 168 ++++++++++----
 lustre/llite/llite_internal.h      |  12 +
 lustre/llite/llite_lib.c           |  67 ++++--
 lustre/llite/llite_mmap.c          |   2 +
 lustre/llite/lproc_llite.c         |  25 ++
 lustre/llite/rw.c                  | 352 +++++++++++++++++++++++++----
 lustre/osc/osc_request.c           |  10 +
 12 files changed, 659 insertions(+), 111 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index d7eca9d9f9..a18d123079 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -135,6 +135,12 @@ Details    : after a connection loss, the lustre client should attempt to
 	     reconnect to the last active server first before trying the
 	     other potential connections.
 
+Severity   : enhancement
+Bugzilla   : 11270
+Description: eliminate client locks in face of contention
+Details    : file contention detection and lockless i/o implementation
+             for contended files.
+
 --------------------------------------------------------------------------------
 
 2007-08-27         Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h
index dacc1bc071..7a63f2e8de 100644
--- a/lustre/include/linux/lustre_lite.h
+++ b/lustre/include/linux/lustre_lite.h
@@ -66,6 +66,8 @@ enum {
          LPROC_LL_INODE_PERM,
          LPROC_LL_DIRECT_READ,
          LPROC_LL_DIRECT_WRITE,
+         LPROC_LL_LOCKLESS_READ,
+         LPROC_LL_LOCKLESS_WRITE,
          LPROC_LL_FILE_OPCODES
 };
 
diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index 3bd0f73883..163b9c58c7 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -221,6 +221,12 @@ struct ldlm_valblock_ops {
                            int buf_idx, int increase);
 };
 
+/* default values for the "max_nolock_size", "contention_time"
+ * and "contended_locks" namespace tunables */
+#define NS_DEFAULT_MAX_NOLOCK_BYTES 131072
+#define NS_DEFAULT_CONTENTION_SECONDS 2
+#define NS_DEFAULT_CONTENDED_LOCKS 0
+
 struct ldlm_namespace {
         char                  *ns_name;
         __u32                  ns_client; /* is this a client-side lock tree? */
@@ -244,6 +250,14 @@ struct ldlm_namespace {
         struct ldlm_valblock_ops *ns_lvbo;
         void                  *ns_lvbp;
         cfs_waitq_t            ns_waitq;
+        /* if more than @ns_contented_locks found, the resource considered
+         * as contended */
+        unsigned               ns_contended_locks;
+        /* the resource remembers contended state during @ns_contention_time,
+         * in seconds */
+        unsigned               ns_contention_time;
+        /* limit size of nolock requests, in bytes */
+        unsigned               ns_max_nolock_size;
 };
 
 /*
@@ -362,6 +376,9 @@ struct ldlm_resource {
         struct semaphore       lr_lvb_sem;
         __u32                  lr_lvb_len;
         void                  *lr_lvb_data;
+
+        /* when the resource was considered as contended */
+        cfs_time_t             lr_contention_time;
 };
 
 struct ldlm_ast_work {
@@ -450,6 +467,8 @@ int ldlm_replay_locks(struct obd_import *imp);
 void ldlm_resource_iterate(struct ldlm_namespace *, struct ldlm_res_id *,
                            ldlm_iterator_t iter, void *data);
 
+/* measure lock contention and return -EBUSY if locking contention is high */
+#define LDLM_FL_DENY_ON_CONTENTION 0x10000000
 
 /* ldlm_flock.c */
 int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c
index 7f299b4a9d..9021885976 100644
--- a/lustre/ldlm/ldlm_extent.c
+++ b/lustre/ldlm/ldlm_extent.c
@@ -31,6 +31,7 @@
 
 #include <lustre_dlm.h>
 #include <obd_support.h>
+#include <obd.h>
 #include <lustre_lib.h>
 
 #include "ldlm_internal.h"
@@ -193,6 +194,18 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
         }
 }
 
+static int ldlm_check_contention(struct ldlm_lock *lock, int contended_locks)
+{
+        struct ldlm_resource *res = lock->l_resource;
+        cfs_time_t now = cfs_time_current();
+
+        CDEBUG(D_DLMTRACE, "contended locks = %d\n", contended_locks);
+        if (contended_locks > res->lr_namespace->ns_contended_locks)
+                res->lr_contention_time = now;
+        return cfs_time_before(now, cfs_time_add(res->lr_contention_time,
+                cfs_time_seconds(res->lr_namespace->ns_contention_time)));
+}
+
 /* Determine if the lock is compatible with all locks on the queue.
  * We stop walking the queue if we hit ourselves so we don't take
  * conflicting locks enqueued after us into accound, or we'd wait forever.
@@ -205,7 +218,7 @@ static void ldlm_extent_policy(struct ldlm_resource *res,
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                          int *flags, ldlm_error_t *err,
-                         struct list_head *work_list)
+                         struct list_head *work_list, int *contended_locks)
 {
         struct list_head *tmp;
         struct ldlm_lock *lock;
@@ -222,7 +235,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 if (req == lock)
-                        RETURN(compat);
+                        break;
 
                 if (unlikely(scan)) {
                         /* We only get here if we are queuing GROUP lock
@@ -238,13 +251,15 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                 ldlm_resource_insert_lock_after(lock, req);
                                 list_del_init(&lock->l_res_link);
                                 ldlm_resource_insert_lock_after(req, lock);
-                                RETURN(0);
+                                compat = 0;
+                                break;
                         }
                         if (req->l_policy_data.l_extent.gid ==
                              lock->l_policy_data.l_extent.gid) {
                                 /* found it */
                                 ldlm_resource_insert_lock_after(lock, req);
-                                RETURN(0);
+                                compat = 0;
+                                break;
                         }
                         continue;
                 }
@@ -302,13 +317,13 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                                 ldlm_resource_insert_lock_after(lock, req);
                                 list_del_init(&lock->l_res_link);
                                 ldlm_resource_insert_lock_after(req, lock);
-                                RETURN(0);
+                                break;
                         }
                         if (req->l_policy_data.l_extent.gid ==
                              lock->l_policy_data.l_extent.gid) {
                                 /* found it */
                                 ldlm_resource_insert_lock_after(lock, req);
-                                RETURN(0);
+                                break;
                         }
                         continue;
                 }
@@ -332,11 +347,25 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                 if (!work_list)
                         RETURN(0);
 
+                /* don't count conflicting glimpse locks */
+                *contended_locks +=
+                        !(lock->l_req_mode == LCK_PR &&
+                          lock->l_policy_data.l_extent.start == 0 &&
+                          lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF);
+
                 compat = 0;
                 if (lock->l_blocking_ast)
                         ldlm_add_ast_work_item(lock, req, work_list);
         }
 
+        if (ldlm_check_contention(req, *contended_locks) &&
+            compat == 0 &&
+            (*flags & LDLM_FL_DENY_ON_CONTENTION) &&
+            req->l_req_mode != LCK_GROUP &&
+            req_end - req_start <=
+            req->l_resource->lr_namespace->ns_max_nolock_size)
+                GOTO(destroylock, compat = -EBUSY);
+
         RETURN(compat);
 destroylock:
         list_del_init(&req->l_res_link);
@@ -345,6 +374,27 @@ destroylock:
         RETURN(compat);
 }
 
+static void discard_bl_list(struct list_head *bl_list)
+{
+        struct list_head *tmp, *pos;
+        ENTRY;
+
+        list_for_each_safe(pos, tmp, bl_list) {
+                struct ldlm_lock *lock =
+                        list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+                list_del_init(&lock->l_bl_ast);
+                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+                lock->l_flags &= ~LDLM_FL_AST_SENT;
+                LASSERT(lock->l_bl_ast_run == 0);
+                LASSERT(lock->l_blocking_lock);
+                LDLM_LOCK_PUT(lock->l_blocking_lock);
+                lock->l_blocking_lock = NULL;
+                LDLM_LOCK_PUT(lock);
+        }
+        EXIT;
+}
+
 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
   *   - blocking ASTs have already been sent
   *   - must call this function with the ns lock held
@@ -358,9 +408,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         struct ldlm_resource *res = lock->l_resource;
         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
         int rc, rc2;
+        int contended_locks = 0;
         ENTRY;
 
         LASSERT(list_empty(&res->lr_converting));
+        LASSERT(!(*flags & LDLM_FL_DENY_ON_CONTENTION) ||
+                !(lock->l_flags & LDLM_AST_DISCARD_DATA));
         check_res_locked(res);
         *err = ELDLM_OK;
 
@@ -372,10 +425,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                  * being true, we want to find out. */
                 LASSERT(*flags == 0);
                 rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags,
-                                              err, NULL);
+                                              err, NULL, &contended_locks);
                 if (rc == 1) {
                         rc = ldlm_extent_compat_queue(&res->lr_waiting, lock,
-                                                      flags, err, NULL);
+                                                      flags, err, NULL,
+                                                      &contended_locks);
                 }
                 if (rc == 0)
                         RETURN(LDLM_ITER_STOP);
@@ -389,13 +443,16 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
         }
 
  restart:
-        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list);
+        contended_locks = 0;
+        rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
+                                      &rpc_list, &contended_locks);
         if (rc < 0)
                 GOTO(out, rc); /* lock was destroyed */
         if (rc == 2)
                 goto grant;
 
-        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list);
+        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
+                                       &rpc_list, &contended_locks);
         if (rc2 < 0)
                 GOTO(out, rc = rc2); /* lock was destroyed */
 
@@ -424,8 +481,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                 *flags |= LDLM_FL_NO_TIMEOUT;
 
         }
-        rc = 0;
+        RETURN(0);
 out:
+        if (!list_empty(&rpc_list)) {
+                LASSERT(!(lock->l_flags & LDLM_AST_DISCARD_DATA));
+                discard_bl_list(&rpc_list);
+        }
         RETURN(rc);
 }
 
diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c
index 4c40151ea7..e277ef69eb 100644
--- a/lustre/ldlm/ldlm_resource.c
+++ b/lustre/ldlm/ldlm_resource.c
@@ -226,6 +226,27 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns)
                 lock_vars[0].read_fptr = lprocfs_uint_rd;
                 lock_vars[0].write_fptr = lprocfs_uint_wr;
                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+        } else {
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_max_nolock_size;
+                lock_vars[0].read_fptr = lprocfs_uint_rd;
+                lock_vars[0].write_fptr = lprocfs_uint_wr;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_contention_time;
+                lock_vars[0].read_fptr = lprocfs_uint_rd;
+                lock_vars[0].write_fptr = lprocfs_uint_wr;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_contended_locks;
+                lock_vars[0].read_fptr = lprocfs_uint_rd;
+                lock_vars[0].write_fptr = lprocfs_uint_wr;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
         }
 }
 #undef MAX_STRING_SIZE
@@ -267,6 +288,9 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
         atomic_set(&ns->ns_locks, 0);
         ns->ns_resources = 0;
         cfs_waitq_init(&ns->ns_waitq);
+        ns->ns_max_nolock_size = NS_DEFAULT_MAX_NOLOCK_BYTES;
+        ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
+        ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
 
         for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
              bucket--)
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index fa47d3bd55..4dd4cd9575 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -1141,6 +1141,91 @@ int ll_extent_unlock(struct ll_file_data *fd, struct inode *inode,
         RETURN(rc);
 }
 
+static void ll_set_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        lli->lli_contention_time = cfs_time_current();
+        set_bit(LLI_F_CONTENDED, &lli->lli_flags);
+}
+
+void ll_clear_file_contended(struct inode *inode)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+
+        clear_bit(LLI_F_CONTENDED, &lli->lli_flags);
+}
+
+static int ll_is_file_contended(struct file *file)
+{
+        struct inode *inode = file->f_dentry->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+        ENTRY;
+
+        if (!(sbi->ll_lco.lco_flags & OBD_CONNECT_SRVLOCK)) {
+                CDEBUG(D_INFO, "the server does not support SRVLOCK feature,"
+                       " osc connect flags = 0x"LPX64"\n",
+                       sbi->ll_lco.lco_flags);
+                RETURN(0);
+        }
+        if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
+                RETURN(1);
+        if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
+                cfs_time_t cur_time = cfs_time_current();
+                cfs_time_t retry_time;
+
+                retry_time = cfs_time_add(
+                        lli->lli_contention_time,
+                        cfs_time_seconds(sbi->ll_contention_time));
+                if (cfs_time_after(cur_time, retry_time)) {
+                        ll_clear_file_contended(inode);
+                        RETURN(0);
+                }
+                RETURN(1);
+        }
+        RETURN(0);
+}
+
+static int ll_file_get_tree_lock(struct ll_lock_tree *tree, struct file *file,
+                                 const char *buf, size_t count,
+                                 loff_t start, loff_t end, int rw)
+{
+        int append;
+        int tree_locked = 0;
+        int rc;
+        struct inode * inode = file->f_dentry->d_inode;
+
+        append = (rw == WRITE) && (file->f_flags & O_APPEND);
+
+        if (append || !ll_is_file_contended(file)) {
+                struct ll_lock_tree_node *node;
+                int ast_flags;
+
+                ast_flags = append ? 0 : LDLM_FL_DENY_ON_CONTENTION;
+                if (file->f_flags & O_NONBLOCK)
+                        ast_flags |= LDLM_FL_BLOCK_NOWAIT;
+                node = ll_node_from_inode(inode, start, end,
+                                          (rw == WRITE) ? LCK_PW : LCK_PR);
+                if (IS_ERR(node)) {
+                        rc = PTR_ERR(node);
+                        GOTO(out, rc);
+                }
+                tree->lt_fd = LUSTRE_FPRIVATE(file);
+                rc = ll_tree_lock(tree, node, buf, count, ast_flags);
+                if (rc == 0)
+                        tree_locked = 1;
+                else if (rc == -EBUSY)
+                        ll_set_file_contended(inode);
+                else
+                        GOTO(out, rc);
+        }
+        RETURN(tree_locked);
+out:
+        return rc;
+}
+
 static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                             loff_t *ppos)
 {
@@ -1149,12 +1234,12 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         struct ost_lvb lvb;
         struct ll_ra_read bead;
-        int rc, ra = 0;
+        int ra = 0;
         loff_t end;
         ssize_t retval, chunk, sum = 0;
+        int tree_locked;
 
         __u64 kms;
         ENTRY;
@@ -1192,7 +1277,6 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                         RETURN(-EFAULT);
                 RETURN(count);
         }
-
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
@@ -1210,17 +1294,11 @@ repeat:
         } else {
                 end = *ppos + count - 1;
         }
-       
-        node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
-        if (IS_ERR(node)){
-                GOTO(out, retval = PTR_ERR(node));
-        }
 
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf,
+                                            count, *ppos, end, READ);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1271,26 +1349,28 @@ repeat:
                inode->i_ino, chunk, *ppos, inode->i_size);
 
         /* turn off the kernel's read-ahead */
+        if (tree_locked) {
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        file->f_ramax = 0;
+                file->f_ramax = 0;
 #else
-        file->f_ra.ra_pages = 0;
+                file->f_ra.ra_pages = 0;
 #endif
-        /* initialize read-ahead window once per syscall */
-        if (ra == 0) {
-                ra = 1;
-                bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
-                bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-                ll_ra_read_in(file, &bead);
-        }
+                /* initialize read-ahead window once per syscall */
+                if (ra == 0) {
+                        ra = 1;
+                        bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+                        bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+                        ll_ra_read_in(file, &bead);
+                }
 
-        /* BUG: 5972 */
-        file_accessed(file);
-        retval = generic_file_read(file, buf, chunk, ppos);
+                /* BUG: 5972 */
+                file_accessed(file);
+                retval = generic_file_read(file, buf, chunk, ppos);
+                ll_tree_unlock(&tree);
+        } else {
+                retval = ll_file_lockless_io(file, buf, chunk, ppos, READ);
+        }
         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
-
-        ll_tree_unlock(&tree);
-
         if (retval > 0) {
                 buf += retval;
                 count -= retval;
@@ -1316,11 +1396,10 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_lock_tree tree;
-        struct ll_lock_tree_node *node;
         loff_t maxbytes = ll_file_maxbytes(inode);
         loff_t lock_start, lock_end, end;
         ssize_t retval, chunk, sum = 0;
-        int rc;
+        int tree_locked;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
@@ -1368,16 +1447,11 @@ repeat:
                 lock_start = *ppos;
                 lock_end = *ppos + count - 1;
         }
-        node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
 
-        if (IS_ERR(node))
-                GOTO(out, retval = PTR_ERR(node));
-
-        tree.lt_fd = LUSTRE_FPRIVATE(file);
-        rc = ll_tree_lock(&tree, node, buf, count,
-                          file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
-        if (rc != 0)
-                GOTO(out, retval = rc);
+        tree_locked = ll_file_get_tree_lock(&tree, file, buf, count,
+                                            lock_start, lock_end, WRITE);
+        if (tree_locked < 0)
+                GOTO(out, retval = tree_locked);
 
         /* This is ok, g_f_w will overwrite this under i_sem if it races
          * with a local truncate, it just makes our maxbyte checking easier.
@@ -1392,18 +1466,23 @@ repeat:
                 send_sig(SIGXFSZ, current, 0);
                 GOTO(out_unlock, retval = -EFBIG);
         }
-        if (*ppos + count > maxbytes)
-                count = maxbytes - *ppos;
+        if (end > maxbytes - 1)
+                end = maxbytes - 1;
 
         /* generic_file_write handles O_APPEND after getting i_mutex */
         chunk = end - *ppos + 1;
         CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
                inode->i_ino, chunk, *ppos);
-        retval = generic_file_write(file, buf, chunk, ppos);
-        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, count, 1);
+        if (tree_locked)
+                retval = generic_file_write(file, buf, chunk, ppos);
+        else
+                retval = ll_file_lockless_io(file, (char*)buf, chunk,
+                                             ppos, WRITE);
+        ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
 out_unlock:
-        ll_tree_unlock(&tree);
+        if (tree_locked)
+                ll_tree_unlock(&tree);
 
 out:
         if (retval > 0) {
@@ -1466,6 +1545,7 @@ static ssize_t ll_file_sendfile(struct file *in_file, loff_t *ppos,size_t count,
         if (rc != 0)
                 RETURN(rc);
 
+        ll_clear_file_contended(inode);
         ll_inode_size_lock(inode, 1);
         /*
          * Consistency guarantees: following possibilities exist for the
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index b5a4d54534..d5f3d1f37e 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -65,6 +65,8 @@ extern struct file_operations ll_pgcache_seq_fops;
 #define LLI_INODE_DEAD                  0xdeadd00d
 #define LLI_F_HAVE_OST_SIZE_LOCK        0
 #define LLI_F_HAVE_MDS_SIZE_LOCK        1
+#define LLI_F_CONTENDED                 2
+#define LLI_F_SRVLOCK                   3
 
 struct ll_inode_info {
         int                     lli_inode_magic;
@@ -76,6 +78,7 @@ struct ll_inode_info {
         __u64                   lli_maxbytes;
         __u64                   lli_io_epoch;
         unsigned long           lli_flags;
+        cfs_time_t              lli_contention_time;
 
         /* this lock protects s_d_w and p_w_ll and mmap_cnt */
         spinlock_t              lli_lock;
@@ -208,6 +211,9 @@ enum stats_track_type {
 #define LL_SBI_JOIN             0x20 /* support JOIN */
 #define LL_SBI_LOCALFLOCK       0x40 /* Local flocks support by kernel */
 
+/* default value for ll_sb_info->contention_time */
+#define SBI_DEFAULT_CONTENTION_SECONDS     60
+
 struct ll_sb_info {
         struct list_head          ll_list;
         /* this protects pglist and ra_info.  It isn't safe to
@@ -235,6 +241,8 @@ struct ll_sb_info {
         unsigned long             ll_pglist_gen;
         struct list_head          ll_pglist; /* all pages (llap_pglist_item) */
 
+        unsigned                  ll_contention_time; /* seconds */
+
         struct ll_ra_info         ll_ra_info;
         unsigned int              ll_namelen;
         struct file_operations   *ll_fop;
@@ -400,6 +408,7 @@ enum {
         LLAP_ORIGIN_COMMIT_WRITE,
         LLAP_ORIGIN_WRITEPAGE,
         LLAP_ORIGIN_REMOVEPAGE,
+        LLAP_ORIGIN_LOCKLESS_IO,
         LLAP__ORIGIN_MAX,
 };
 extern char *llap_origins[];
@@ -464,6 +473,9 @@ struct ll_async_page *llap_cast_private(struct page *page);
 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
 void ll_ra_accounting(struct ll_async_page *llap,struct address_space *mapping);
 void ll_truncate(struct inode *inode);
+int ll_file_punch(struct inode *, loff_t, int);
+ssize_t ll_file_lockless_io(struct file *, char *, size_t, loff_t *, int);
+void ll_clear_file_contended(struct inode*);
 int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
 
 /* llite/file.c */
diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c
index 78adbc04d9..1ef3295175 100644
--- a/lustre/llite/llite_lib.c
+++ b/lustre/llite/llite_lib.c
@@ -74,7 +74,7 @@ static struct ll_sb_info *ll_init_sbi(void)
                                            SBI_DEFAULT_READAHEAD_MAX);
         sbi->ll_ra_info.ra_max_read_ahead_whole_pages =
                                            SBI_DEFAULT_READAHEAD_WHOLE_MAX;
-
+        sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS;
         INIT_LIST_HEAD(&sbi->ll_conn_chain);
         INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
 
@@ -259,7 +259,7 @@ static int client_common_fill_super(struct super_block *sb,
 
         data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
-                                  OBD_CONNECT_CANCELSET;
+                                  OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET;
 
         CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d "
                "ocd_grant: %d\n", data->ocd_connect_flags,
@@ -1324,20 +1324,47 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
          * last one is especially bad for racing o_append users on other
          * nodes. */
         if (ia_valid & ATTR_SIZE) {
+                int srvlock = !!(sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK);
                 ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
                                                            OBD_OBJECT_EOF } };
                 struct lustre_handle lockh = { 0 };
-                int err, ast_flags = 0;
-                /* XXX when we fix the AST intents to pass the discard-range
-                 * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
-                 * XXX here. */
-                if (attr->ia_size == 0)
-                        ast_flags = LDLM_AST_DISCARD_DATA;
+                int err;
+
+                if (srvlock) {
+                        int flags = LDLM_FL_BLOCK_GRANTED;
+
+                        rc = obd_match(ll_i2sbi(inode)->ll_osc_exp,
+                                       lsm, LDLM_EXTENT,
+                                       &policy, LCK_PW, &flags, inode,
+                                       &lockh);
+                        if (rc < 0)
+                                RETURN(rc);
+                        if (rc == 1)
+                                srvlock = 0;
+                }
 
                 UNLOCK_INODE_MUTEX(inode);
                 UP_WRITE_I_ALLOC_SEM(inode);
-                rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
-                                    ast_flags);
+
+                if (srvlock) {
+                        rc = ll_file_punch(inode, attr->ia_size, 1);
+                        if (rc)
+                                RETURN(rc);
+                } else {
+                        int ast_flags = 0;
+
+                        /* XXX when we fix the AST intents to pass the discard-range
+                         * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
+                         * XXX here. */
+                        if (attr->ia_size == 0)
+                                ast_flags = LDLM_AST_DISCARD_DATA;
+
+                        rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy,
+                                            &lockh, ast_flags);
+                        if (rc != 0)
+                                RETURN(rc);
+                }
+
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
                 DOWN_WRITE_I_ALLOC_SEM(inode);
                 LOCK_INODE_MUTEX(inode);
@@ -1345,25 +1372,28 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 LOCK_INODE_MUTEX(inode);
                 DOWN_WRITE_I_ALLOC_SEM(inode);
 #endif
-                if (rc != 0)
-                        RETURN(rc);
-
                 /* Only ll_inode_size_lock is taken at this level.
                  * lov_stripe_lock() is grabbed by ll_truncate() only over
                  * call to obd_adjust_kms().  If vmtruncate returns 0, then
                  * ll_truncate dropped ll_inode_size_lock() */
                 ll_inode_size_lock(inode, 0);
+                if (srvlock)
+                        set_bit(LLI_F_SRVLOCK, &lli->lli_flags);
                 rc = vmtruncate(inode, attr->ia_size);
+                clear_bit(LLI_F_SRVLOCK, &lli->lli_flags);
                 if (rc != 0) {
                         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
                         ll_inode_size_unlock(inode, 0);
                 }
 
-                err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
-                if (err) {
-                        CERROR("ll_extent_unlock failed: %d\n", err);
-                        if (!rc)
-                                rc = err;
+                if (!srvlock) {
+                        err = ll_extent_unlock(NULL, inode, lsm,
+                                               LCK_PW, &lockh);
+                        if (err) {
+                                CERROR("ll_extent_unlock failed: %d\n", err);
+                                if (!rc)
+                                        rc = err;
+                        }
                 }
         } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
                 obd_flag flags;
@@ -1935,6 +1965,7 @@ char *llap_origins[] = {
         [LLAP_ORIGIN_READAHEAD] = "ra",
         [LLAP_ORIGIN_COMMIT_WRITE] = "cw",
         [LLAP_ORIGIN_WRITEPAGE] = "wp",
+        [LLAP_ORIGIN_LOCKLESS_IO] = "ls"
 };
 
 struct ll_async_page *llite_pglist_next_llap(struct ll_sb_info *sbi,
diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c
index fc5d1ff250..4b83cb653c 100644
--- a/lustre/llite/llite_mmap.c
+++ b/lustre/llite/llite_mmap.c
@@ -379,6 +379,8 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address,
                 RETURN(NULL);
         }
 
+        ll_clear_file_contended(inode);
+
         /* start and end the lock on the first and last bytes in the page */
         policy_from_vma(&policy, vma, address, CFS_PAGE_SIZE);
 
diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c
index 4efcd45db2..b354a131ac 100644
--- a/lustre/llite/lproc_llite.c
+++ b/lustre/llite/lproc_llite.c
@@ -437,6 +437,26 @@ static int ll_wr_track_gid(struct file *file, const char *buffer,
         return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID));
 }
 
+static int ll_rd_contention_time(char *page, char **start, off_t off,
+                                 int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+
+        *eof = 1;
+        return snprintf(page, count, "%u\n", ll_s2sbi(sb)->ll_contention_time);
+
+}
+
+static int ll_wr_contention_time(struct file *file, const char *buffer,
+                                 unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        struct ll_sb_info *sbi = ll_s2sbi(sb);
+
+        return lprocfs_write_helper(buffer, count,&sbi->ll_contention_time) ?:
+                count;
+}
+
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -458,6 +478,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "stats_track_pid",  ll_rd_track_pid, ll_wr_track_pid, 0 },
         { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 },
         { "stats_track_gid",  ll_rd_track_gid, ll_wr_track_gid, 0 },
+        { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0},
         { 0 }
 };
 
@@ -515,6 +536,10 @@ struct llite_file_opcode {
                                    "direct_read" },
         { LPROC_LL_DIRECT_WRITE,   LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
                                    "direct_write" },
+        { LPROC_LL_LOCKLESS_READ,  LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+                                   "lockless_read_bytes" },
+        { LPROC_LL_LOCKLESS_WRITE, LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_BYTES,
+                                   "lockless_write_bytes" },
 
 };
 
diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c
index 4796b199c1..601a102156 100644
--- a/lustre/llite/rw.c
+++ b/lustre/llite/rw.c
@@ -104,6 +104,37 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
         RETURN(rc);
 }
 
+int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock)
+{
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_info oinfo = { { { 0 } } };
+        struct obdo oa;
+        int rc;
+
+        ENTRY;
+        CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
+               lli->lli_smd->lsm_object_id, new_size, new_size);
+
+        oinfo.oi_md = lli->lli_smd;
+        oinfo.oi_policy.l_extent.start = new_size;
+        oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
+        oinfo.oi_oa = &oa;
+        oa.o_id = lli->lli_smd->lsm_object_id;
+        oa.o_valid = OBD_MD_FLID;
+        oa.o_flags = srvlock ? OBD_FL_TRUNCLOCK : 0;
+        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID|
+                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
+                        OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER |
+                        OBD_MD_FLBLOCKS);
+        rc = obd_punch_rqset(ll_i2obdexp(inode), &oinfo, NULL);
+        if (rc) {
+                CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
+                RETURN(rc);
+        }
+        obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                      OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
+        RETURN(0);
+}
 /* this isn't where truncate starts.   roughly:
  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate. setattr_raw grabs
  * DLM lock on [size, EOF], i_mutex, ->lli_size_sem, and WRITE_I_ALLOC_SEM to
@@ -113,10 +144,8 @@ static int ll_brw(int cmd, struct inode *inode, struct obdo *oa,
 void ll_truncate(struct inode *inode)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct obd_info oinfo = { { { 0 } } };
-        struct ost_lvb lvb;
-        struct obdo oa;
-        int rc;
+        int srvlock = test_bit(LLI_F_SRVLOCK, &lli->lli_flags);
+        loff_t new_size;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) to %Lu=%#Lx\n",inode->i_ino,
                inode->i_generation, inode, inode->i_size, inode->i_size);
@@ -135,22 +164,27 @@ void ll_truncate(struct inode *inode)
 
         LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
 
-        /* XXX I'm pretty sure this is a hack to paper over a more fundamental
-         * race condition. */
-        lov_stripe_lock(lli->lli_smd);
-        inode_init_lvb(inode, &lvb);
-        rc = obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0);
-        oa.o_blocks = lvb.lvb_blocks;
-        if (lvb.lvb_size == inode->i_size && rc == 0) {
-                CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
-                       lli->lli_smd->lsm_object_id,inode->i_size,inode->i_size);
+        if (!srvlock) {
+                struct ost_lvb lvb;
+                int rc;
+
+                /* XXX I'm pretty sure this is a hack to paper over a more fundamental
+                 * race condition. */
+                lov_stripe_lock(lli->lli_smd);
+                inode_init_lvb(inode, &lvb);
+                rc = obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0);
+                inode->i_blocks = lvb.lvb_blocks;
+                if (lvb.lvb_size == inode->i_size && rc == 0) {
+                        CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n",
+                               lli->lli_smd->lsm_object_id,inode->i_size,inode->i_size);
+                        lov_stripe_unlock(lli->lli_smd);
+                        GOTO(out_unlock, 0);
+                }
+
+                obd_adjust_kms(ll_i2obdexp(inode), lli->lli_smd, inode->i_size, 1);
                 lov_stripe_unlock(lli->lli_smd);
-                GOTO(out_unlock, 0);
         }
 
-        obd_adjust_kms(ll_i2obdexp(inode), lli->lli_smd, inode->i_size, 1);
-        lov_stripe_unlock(lli->lli_smd);
-
         if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) &&
                      (inode->i_size & ~CFS_PAGE_MASK))) {
                 /* If the truncate leaves behind a partial page, update its
@@ -168,29 +202,11 @@ void ll_truncate(struct inode *inode)
                 }
         }
 
-        CDEBUG(D_INFO, "calling punch for "LPX64" (new size %Lu=%#Lx)\n",
-               lli->lli_smd->lsm_object_id, inode->i_size, inode->i_size);
-
-        oinfo.oi_md = lli->lli_smd;
-        oinfo.oi_policy.l_extent.start = inode->i_size;
-        oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
-        oinfo.oi_oa = &oa;
-        oa.o_id = lli->lli_smd->lsm_object_id;
-        oa.o_valid = OBD_MD_FLID;
-
-        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID|
-                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME |
-                        OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER | 
-                        OBD_MD_FLBLOCKS);
-
+        new_size = inode->i_size;
         ll_inode_size_unlock(inode, 0);
+        if (!srvlock)
+                ll_file_punch(inode, new_size, 0);
 
-        rc = obd_punch_rqset(ll_i2obdexp(inode), &oinfo, NULL);
-        if (rc)
-                CERROR("obd_truncate fails (%d) ino %lu\n", rc, inode->i_ino);
-        else
-                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                              OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME);
         EXIT;
         return;
 
@@ -1502,3 +1518,263 @@ out_oig:
                 oig_release(oig);
         RETURN(rc);
 }
+
+static void ll_file_put_pages(struct page **pages, int numpages)
+{
+        int i;
+        struct page **pp;
+        ENTRY;
+
+        for (i = 0, pp = pages; i < numpages; i++, pp++) {
+                if (*pp) {
+                        LL_CDEBUG_PAGE(D_PAGE, (*pp), "free\n");
+                        ll_removepage(*pp);
+                        if (page_private(*pp))
+                                CERROR("the llap wasn't freed\n");
+                        (*pp)->mapping = NULL;
+                        if (page_count(*pp) != 1)
+                                CERROR("page %p, flags %#lx, count %i, private %p\n",
+                                (*pp), (*pp)->flags, page_count(*pp),
+                                (void*)page_private(*pp));
+                        __free_pages(*pp, 0);
+                }
+        }
+        OBD_FREE(pages, numpages * sizeof(struct page*));
+        EXIT;
+}
+
+static struct page **ll_file_prepare_pages(int numpages, struct inode *inode,
+                                           unsigned long first)
+{
+        struct page **pages;
+        int i;
+        int rc = 0;
+        ENTRY;
+
+        OBD_ALLOC(pages, sizeof(struct page *) * numpages);
+        if (pages == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
+        for (i = 0; i < numpages; i++) {
+                struct page *page;
+                struct ll_async_page *llap;
+
+                page = alloc_pages(GFP_HIGHUSER, 0);
+                if (page == NULL)
+                        GOTO(err, rc = -ENOMEM);
+                pages[i] = page;
+                /* llap_from_page needs page index and mapping to be set */
+                page->index = first++;
+                page->mapping = inode->i_mapping;
+                llap = llap_from_page(page, LLAP_ORIGIN_LOCKLESS_IO);
+                if (IS_ERR(llap))
+                        GOTO(err, rc = PTR_ERR(llap));
+        }
+        RETURN(pages);
+err:
+        ll_file_put_pages(pages, numpages);
+        RETURN(ERR_PTR(rc));
+ }
+
+static ssize_t ll_file_copy_pages(struct page **pages, int numpages,
+                                  char *buf, loff_t pos, size_t count, int rw)
+{
+        ssize_t amount = 0;
+        int i;
+        ENTRY;
+
+        for (i = 0; i < numpages; i++) {
+                unsigned offset, bytes, left;
+                char *vaddr;
+
+                vaddr = kmap(pages[i]);
+                offset = pos & (CFS_PAGE_SIZE - 1);
+                bytes = min_t(unsigned, CFS_PAGE_SIZE - offset, count);
+                LL_CDEBUG_PAGE(D_PAGE, pages[i], "op = %s, addr = %p, "
+                               "buf = %p, bytes = %u\n",
+                               (rw == WRITE) ? "CFU" : "CTU",
+                               vaddr + offset, buf, bytes);
+                if (rw == WRITE)
+                        left = copy_from_user(vaddr + offset, buf, bytes);
+                else
+                        left = copy_to_user(buf, vaddr + offset, bytes);
+                kunmap(pages[i]);
+                amount += bytes;
+                if (left) {
+                        amount -= left;
+                        break;
+                }
+                buf += bytes;
+                count -= bytes;
+                pos += bytes;
+        }
+        if (amount == 0)
+                RETURN(-EFAULT);
+        RETURN(amount);
+}
+
+static int ll_file_oig_pages(struct inode * inode, struct page **pages,
+                             int numpages, loff_t pos, size_t count, int rw)
+{
+        struct obd_io_group *oig;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct obd_export *exp;
+        loff_t org_pos = pos;
+        obd_flag brw_flags;
+        int rc;
+        int i;
+        ENTRY;
+
+        exp = ll_i2obdexp(inode);
+        if (exp == NULL)
+                RETURN(-EINVAL);
+        rc = oig_init(&oig);
+        if (rc)
+                RETURN(rc);
+        brw_flags = OBD_BRW_SRVLOCK;
+        if (capable(CAP_SYS_RESOURCE))
+                brw_flags |= OBD_BRW_NOQUOTA;
+
+        for (i = 0; i < numpages; i++) {
+                struct ll_async_page *llap;
+                unsigned from, bytes;
+
+                from = pos & (CFS_PAGE_SIZE - 1);
+                bytes = min_t(unsigned, CFS_PAGE_SIZE - from,
+                              count - pos + org_pos);
+                llap = llap_cast_private(pages[i]);
+                LASSERT(llap);
+
+                lock_page(pages[i]);
+
+                LL_CDEBUG_PAGE(D_PAGE, pages[i], "offset "LPU64","
+                               " from %u, bytes = %u\n",
+                               pos, from, bytes);
+                LASSERTF(pos >> CFS_PAGE_SHIFT == pages[i]->index,
+                         "wrong page index %lu (%lu)\n",
+                         pages[i]->index,
+                         (unsigned long)(pos >> CFS_PAGE_SHIFT));
+                rc = obd_queue_group_io(exp, lli->lli_smd, NULL, oig,
+                                        llap->llap_cookie,
+                                        (rw == WRITE) ?
+                                        OBD_BRW_WRITE:OBD_BRW_READ,
+                                        from, bytes, brw_flags,
+                                        ASYNC_READY | ASYNC_URGENT |
+                                        ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);
+                if (rc) {
+                        i++;
+                        GOTO(out, rc);
+                }
+                pos += bytes;
+        }
+        rc = obd_trigger_group_io(exp, lli->lli_smd, NULL, oig);
+        if (rc)
+                GOTO(out, rc);
+        rc = oig_wait(oig);
+out:
+        while(--i >= 0)
+                unlock_page(pages[i]);
+        oig_release(oig);
+        RETURN(rc);
+}
+
+ssize_t ll_file_lockless_io(struct file *file, char *buf, size_t count,
+                                   loff_t *ppos, int rw)
+{
+        loff_t pos;
+        struct inode *inode = file->f_dentry->d_inode;
+        ssize_t rc = 0;
+        int max_pages;
+        size_t amount = 0;
+        unsigned long first, last;
+        ENTRY;
+
+        if (rw == READ) {
+                loff_t isize;
+
+                ll_inode_size_lock(inode, 0);
+                isize = inode->i_size;
+                ll_inode_size_unlock(inode, 0);
+                if (*ppos >= isize)
+                        GOTO(out, rc = 0);
+                if (*ppos + count >= isize)
+                        count -= *ppos + count - isize;
+                if (count == 0)
+                        GOTO(out, rc);
+        } else {
+                rc = generic_write_checks(file, ppos, &count, 0);
+                if (rc)
+                        GOTO(out, rc);
+                rc = remove_suid(file->f_dentry);
+                if (rc)
+                        GOTO(out, rc);
+        }
+        pos = *ppos;
+        first = pos >> CFS_PAGE_SHIFT;
+        last = (pos + count - 1) >> CFS_PAGE_SHIFT;
+        max_pages = PTLRPC_MAX_BRW_PAGES *
+                ll_i2info(inode)->lli_smd->lsm_stripe_count;
+        CDEBUG(D_INFO, "%u, stripe_count = %u\n",
+               PTLRPC_MAX_BRW_PAGES /* max_pages_per_rpc */,
+               ll_i2info(inode)->lli_smd->lsm_stripe_count);
+
+        while (first <= last && rc >= 0) {
+                int pages_for_io;
+                struct page **pages;
+                size_t bytes = count - amount;
+
+                pages_for_io = min_t(int, last - first + 1, max_pages);
+                pages = ll_file_prepare_pages(pages_for_io, inode, first);
+                if (IS_ERR(pages)) {
+                        rc = PTR_ERR(pages);
+                        break;
+                }
+                if (rw == WRITE) {
+                        rc = ll_file_copy_pages(pages, pages_for_io, buf,
+                                                pos + amount, bytes, rw);
+                        if (rc < 0)
+                                GOTO(put_pages, rc);
+                        bytes = rc;
+                }
+                rc = ll_file_oig_pages(inode, pages, pages_for_io,
+                                       pos + amount, bytes, rw);
+                if (rc)
+                        GOTO(put_pages, rc);
+                if (rw == READ) {
+                        rc = ll_file_copy_pages(pages, pages_for_io, buf,
+                                                pos + amount, bytes, rw);
+                        if (rc < 0)
+                                GOTO(put_pages, rc);
+                        bytes = rc;
+                }
+                amount += bytes;
+                buf += bytes;
+put_pages:
+                ll_file_put_pages(pages, pages_for_io);
+                first += pages_for_io;
+                /* a short read/write check */
+                if (pos + amount < ((loff_t)first << CFS_PAGE_SHIFT))
+                        break;
+        }
+        /* NOTE: don't update i_size and KMS in absence of LDLM locks even
+         * write makes the file large */
+        file_accessed(file);
+        if (rw == READ && amount < count && rc == 0) {
+                unsigned long not_cleared;
+
+                not_cleared = clear_user(buf, count - amount);
+                amount = count - not_cleared;
+                if (not_cleared)
+                        rc = -EFAULT;
+        }
+        if (amount > 0) {
+                lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
+                                    (rw == WRITE) ?
+                                    LPROC_LL_LOCKLESS_WRITE :
+                                    LPROC_LL_LOCKLESS_READ,
+                                    (long)amount);
+                *ppos += amount;
+                RETURN(amount);
+        }
+out:
+        RETURN(rc);
+}
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index 582cc1a39c..f29db56893 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -1984,6 +1984,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         CFS_LIST_HEAD(rpc_list);
         unsigned int ending_offset;
         unsigned  starting_offset = 0;
+        int srvlock = 0;
         ENTRY;
 
         /* first we find the pages we're allowed to work with */
@@ -1992,6 +1993,13 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
                 LASSERT(oap->oap_magic == OAP_MAGIC);
 
+                if (page_count != 0 &&
+                    srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
+                        CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
+                               " oap %p, page %p, srvlock %u\n",
+                               oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
+                        break;
+                }
                 /* in llite being 'ready' equates to the page being locked
                  * until completion unlocks it.  commit_write submits a page
                  * as not ready because its unlock will happen unconditionally
@@ -2073,6 +2081,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
                 /* now put the page back in our accounting */
                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                if (page_count == 0)
+                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
                 if (++page_count >= cli->cl_max_pages_per_rpc)
                         break;
 
-- 
GitLab