From 60dd182d90f1f9b5fd5d7e064bbe7a5f14f60997 Mon Sep 17 00:00:00 2001
From: anserper <anserper>
Date: Fri, 23 May 2008 10:29:04 +0000
Subject: [PATCH] Branch b1_6 b=14010 i=shadow i=zam

Fast-blocking read case
---
 lustre/include/lustre_dlm.h      |   2 +
 lustre/include/obd.h             |   8 ++
 lustre/include/obd_class.h       |  28 ++++++
 lustre/ldlm/ldlm_lock.c          |  27 ++++++
 lustre/ldlm/ldlm_lockd.c         |   2 +
 lustre/llite/file.c              | 147 +++++++++++++++++++++++++++++--
 lustre/llite/llite_internal.h    |   1 +
 lustre/llite/llite_mmap.c        |   5 ++
 lustre/lov/lov_obd.c             |  43 +++++++++
 lustre/obdclass/lprocfs_status.c |   2 +
 lustre/osc/osc_request.c         |  33 ++++++-
 11 files changed, 288 insertions(+), 10 deletions(-)

diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index 038fac3931..ea0292c194 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -681,6 +681,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
+int ldlm_lock_fast_match(struct ldlm_lock *, int, loff_t, loff_t, void **);
+void ldlm_lock_fast_release(void *, int);
 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                             struct ldlm_res_id *, ldlm_type_t type,
                             ldlm_policy_data_t *, ldlm_mode_t mode,
diff --git a/lustre/include/obd.h b/lustre/include/obd.h
index 40e196de8a..cb6f45e2ef 100644
--- a/lustre/include/obd.h
+++ b/lustre/include/obd.h
@@ -959,6 +959,14 @@ struct obd_ops {
                                  struct obd_async_page_ops *ops, void *data,
                                  void **res, int nocache,
                                  struct lustre_handle *lockh);
+        int (*o_reget_short_lock)(struct obd_export *exp,
+                                 struct lov_stripe_md *lsm,
+                                 void **res, int rw,
+                                 loff_t start, loff_t end,
+                                 void **cookie);
+        int (*o_release_short_lock)(struct obd_export *exp,
+                                    struct lov_stripe_md *lsm, loff_t end,
+                                    void *cookie, int rw);
         int (*o_queue_async_io)(struct obd_export *exp,
                                 struct lov_stripe_md *lsm,
                                 struct lov_oinfo *loi, void *cookie,
diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h
index e2a99e7166..01dad5a126 100644
--- a/lustre/include/obd_class.h
+++ b/lustre/include/obd_class.h
@@ -979,6 +979,34 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
         RETURN(ret);
 }
 
+static inline int obd_reget_short_lock(struct obd_export *exp,
+                                       struct lov_stripe_md *lsm,
+                                       void **res, int rw,
+                                       loff_t start, loff_t end,
+                                       void **cookie)
+{
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, reget_short_lock, -EOPNOTSUPP);
+        EXP_COUNTER_INCREMENT(exp, reget_short_lock);
+
+        RETURN(OBP(exp->exp_obd, reget_short_lock)(exp, lsm, res, rw,
+                                                   start, end, cookie));
+}
+
+static inline int obd_release_short_lock(struct obd_export *exp,
+                                         struct lov_stripe_md *lsm, loff_t end,
+                                         void *cookie, int rw)
+{
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, release_short_lock, -EOPNOTSUPP);
+        EXP_COUNTER_INCREMENT(exp, release_short_lock);
+
+        RETURN(OBP(exp->exp_obd, release_short_lock)(exp, lsm, end,
+                                                     cookie, rw));
+}
+
 static inline int obd_queue_async_io(struct obd_export *exp,
                                      struct lov_stripe_md *lsm,
                                      struct lov_oinfo *loi, void *cookie,
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index 94be0f770c..b510df6bed 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -34,6 +34,7 @@
 # include <libcfs/kp30.h>
 #endif
 
+#include <linux/fs.h>
 #include <obd_class.h>
 #include "ldlm_internal.h"
 
@@ -987,6 +988,32 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
         unlock_res_and_lock(lock);
 }
 
+int ldlm_lock_fast_match(struct ldlm_lock *lock, int rw,
+                                loff_t start, loff_t end,
+                                void **cookie)
+{
+        LASSERT(rw == READ || rw == WRITE);
+        /* should LCK_GROUP be handled in a special way? */
+        if (lock && (rw == READ || (lock->l_granted_mode & (LCK_PW|LCK_GROUP))) &&
+            (lock->l_policy_data.l_extent.start <= start) &&
+            (lock->l_policy_data.l_extent.end >= end)) {
+                ldlm_lock_addref_internal(lock, rw == WRITE ? LCK_PW : LCK_PR);
+                *cookie = (void *)lock;
+                return 1; /* avoid using rc for stack relief */
+        }
+        return 0;
+}
+
+void ldlm_lock_fast_release(void *cookie, int rw)
+{
+        struct ldlm_lock *lock = (struct ldlm_lock *)cookie;
+
+        LASSERT(lock != NULL);
+        LASSERT(rw == READ || rw == WRITE);
+        LASSERT(rw == READ || (lock->l_granted_mode & (LCK_PW | LCK_GROUP)));
+        ldlm_lock_decref_internal(lock, rw == WRITE ? LCK_PW : LCK_PR);
+}
+
 /* Can be called in two ways:
  *
  * If 'ns' is NULL, then lockh describes an existing lock that we want to look
diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c
index 8e356930d7..93d140b44a 100644
--- a/lustre/ldlm/ldlm_lockd.c
+++ b/lustre/ldlm/ldlm_lockd.c
@@ -2145,6 +2145,8 @@ EXPORT_SYMBOL(ldlm_lock2handle);
 EXPORT_SYMBOL(__ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock_get);
 EXPORT_SYMBOL(ldlm_lock_put);
+EXPORT_SYMBOL(ldlm_lock_fast_match);
+EXPORT_SYMBOL(ldlm_lock_fast_release);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_cancel);
 EXPORT_SYMBOL(ldlm_lock_addref);
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index bb95f2740c..4366c7ca09 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -1260,6 +1260,132 @@ static int iov_copy_update(unsigned long *nr_segs, const struct iovec **iov_out,
         return 0;
 }
 
+static int ll_reget_short_lock(struct page *page, int rw,
+                               loff_t start, loff_t end,
+                               void **cookie)
+{
+        struct ll_async_page *llap;
+        struct obd_export *exp;
+        struct inode *inode = page->mapping->host;
+
+        ENTRY;
+
+        exp = ll_i2obdexp(inode);
+        if (exp == NULL)
+                RETURN(0);
+
+        llap = llap_cast_private(page);
+        if (llap == NULL)
+                RETURN(0);
+
+        RETURN(obd_reget_short_lock(exp, ll_i2info(inode)->lli_smd,
+                                    &llap->llap_cookie, rw, start, end,
+                                    cookie));
+}
+
+static void ll_release_short_lock(struct inode *inode, loff_t end,
+                                  void *cookie, int rw)
+{
+        struct obd_export *exp;
+        int rc;
+
+        exp = ll_i2obdexp(inode);
+        if (exp == NULL)
+                return;
+
+        rc = obd_release_short_lock(exp, ll_i2info(inode)->lli_smd, end,
+                                    cookie, rw);
+        if (rc < 0)
+                CERROR("unlock failed (%d)\n", rc);
+}
+
+static inline int ll_file_get_fast_lock(struct file *file,
+                                        loff_t ppos, loff_t end,
+                                        const struct iovec *iov,
+                                        unsigned long nr_segs,
+                                        void **cookie, int rw)
+{
+        int rc = 0, seg;
+        struct page *page;
+
+        ENTRY;
+
+        /* we would like this read request to be lockfree */
+        for (seg = 0; seg < nr_segs; seg++) {
+                const struct iovec *iv = &iov[seg];
+                if (ll_region_mapped((unsigned long)iv->iov_base, iv->iov_len))
+                        GOTO(out, rc);
+        }
+
+        page = find_lock_page(file->f_dentry->d_inode->i_mapping,
+                              ppos >> CFS_PAGE_SHIFT);
+        if (page) {
+                if (ll_reget_short_lock(page, rw, ppos, end, cookie))
+                        rc = 1;
+
+                unlock_page(page);
+                page_cache_release(page);
+        }
+
+out:
+        RETURN(rc);
+}
+
+static inline void ll_file_put_fast_lock(struct inode *inode, loff_t end,
+                                         void *cookie, int rw)
+{
+        ll_release_short_lock(inode, end, cookie, rw);
+}
+
+enum ll_lock_style {
+        LL_LOCK_STYLE_NOLOCK   = 0,
+        LL_LOCK_STYLE_FASTLOCK = 1,
+        LL_LOCK_STYLE_TREELOCK = 2
+};
+
+static inline int ll_file_get_lock(struct file *file, loff_t ppos, loff_t end,
+                                   const struct iovec *iov, unsigned long nr_segs,
+                                   void **cookie, struct ll_lock_tree *tree,
+                                   int rw)
+{
+        int rc;
+
+        ENTRY;
+
+        if (ll_file_get_fast_lock(file, ppos, end, iov, nr_segs, cookie, rw))
+                RETURN(LL_LOCK_STYLE_FASTLOCK);
+
+        rc = ll_file_get_tree_lock_iov(tree, file, iov, nr_segs,
+                                       ppos, end, rw);
+        /* rc: 1 for tree lock, 0 for no lock, <0 for error */
+        switch (rc) {
+        case 1:
+                RETURN(LL_LOCK_STYLE_TREELOCK);
+        case 0:
+                RETURN(LL_LOCK_STYLE_NOLOCK);
+        }
+
+        /* an error happened if we reached this point, rc = -errno here */
+        RETURN(rc);
+}
+
+static inline void ll_file_put_lock(struct inode *inode, loff_t end,
+                                    enum ll_lock_style lock_style,
+                                    void *cookie, struct ll_lock_tree *tree, int rw)
+
+{
+        switch (lock_style) {
+        case LL_LOCK_STYLE_TREELOCK:
+                ll_tree_unlock(tree);
+                break;
+        case LL_LOCK_STYLE_FASTLOCK:
+                ll_file_put_fast_lock(inode, end, cookie, rw);
+                break;
+        default:
+                CERROR("invalid locking style (%d)\n", lock_style);
+        }
+}
+
 #ifdef HAVE_FILE_READV
 static ssize_t ll_file_readv(struct file *file, const struct iovec *iov,
                               unsigned long nr_segs, loff_t *ppos)
@@ -1281,11 +1407,12 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
         int ra = 0;
         loff_t end;
         ssize_t retval, chunk, sum = 0;
-        int tree_locked;
+        int lock_style;
         struct iovec *iov_copy = NULL;
         unsigned long nrsegs_copy, nrsegs_orig = 0;
         size_t count, iov_offset = 0;
         __u64 kms;
+        void *cookie;
         ENTRY;
 
         count = ll_file_get_iov_count(iov, &nr_segs);
@@ -1334,6 +1461,7 @@ static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
                         RETURN(-EFAULT);
                 RETURN(sum);
         }
+
 repeat:
         if (sbi->ll_max_rw_chunk != 0) {
                 /* first, let's know the end of the current stripe */
@@ -1373,10 +1501,10 @@ repeat:
                 nrsegs_copy = nr_segs;
         }
 
-        tree_locked = ll_file_get_tree_lock_iov(&tree, file, iov_copy,
-                                                nrsegs_copy, *ppos, end, READ);
-        if (tree_locked < 0)
-                GOTO(out, retval = tree_locked);
+        lock_style = ll_file_get_lock(file, *ppos, end, iov_copy, nrsegs_copy,
+                                      &cookie, &tree, READ);
+        if (lock_style < 0)
+                GOTO(out, retval = lock_style);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1407,8 +1535,9 @@ repeat:
                 ll_inode_size_unlock(inode, 1);
                 retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED);
                 if (retval) {
-                        if (tree_locked)
-                                ll_tree_unlock(&tree);
+                        if (lock_style != LL_LOCK_STYLE_NOLOCK)
+                                ll_file_put_lock(inode, end, lock_style,
+                                                 cookie, &tree, READ);
                         goto out;
                 }
         } else {
@@ -1428,7 +1557,7 @@ repeat:
                inode->i_ino, chunk, *ppos, i_size_read(inode));
 
         /* turn off the kernel's read-ahead */
-        if (tree_locked) {
+        if (lock_style != LL_LOCK_STYLE_NOLOCK) {
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
                 file->f_ramax = 0;
 #else
@@ -1450,7 +1579,7 @@ repeat:
                 retval = generic_file_aio_read(iocb, iov_copy, nrsegs_copy,
                                                *ppos);
 #endif
-                ll_tree_unlock(&tree);
+                ll_file_put_lock(inode, end, lock_style, cookie, &tree, READ);
         } else {
                 retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
                                              READ, chunk);
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index 65ab7f9c89..5605f8c5a0 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -603,6 +603,7 @@ extern struct file_operations ll_file_operations_noflock;
 extern struct inode_operations ll_file_inode_operations;
 extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *);
 extern int ll_have_md_lock(struct inode *inode, __u64 bits);
+int ll_region_mapped(unsigned long addr, size_t count);
 int ll_extent_lock(struct ll_file_data *, struct inode *,
                    struct lov_stripe_md *, int mode, ldlm_policy_data_t *,
                    struct lustre_handle *, int ast_flags);
diff --git a/lustre/llite/llite_mmap.c b/lustre/llite/llite_mmap.c
index 275ea3d915..55db9c336e 100644
--- a/lustre/llite/llite_mmap.c
+++ b/lustre/llite/llite_mmap.c
@@ -312,6 +312,11 @@ static struct vm_area_struct * our_vma(unsigned long addr, size_t count)
         RETURN(ret);
 }
 
+int ll_region_mapped(unsigned long addr, size_t count)
+{
+        return !!our_vma(addr, count);
+}
+
 int lt_get_mmap_locks(struct ll_lock_tree *tree,
                       unsigned long addr, size_t count)
 {
diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c
index a61c01da42..5ab5eedace 100644
--- a/lustre/lov/lov_obd.c
+++ b/lustre/lov/lov_obd.c
@@ -2771,6 +2771,47 @@ void lov_stripe_unlock(struct lov_stripe_md *md)
 }
 EXPORT_SYMBOL(lov_stripe_unlock);
 
+static int lov_reget_short_lock(struct obd_export *exp,
+                                struct lov_stripe_md *lsm,
+                                void **res, int rw,
+                                loff_t start, loff_t end,
+                                void **cookie)
+{
+        struct lov_async_page *l = *res;
+        loff_t stripe_start, stripe_end = start;
+
+        ENTRY;
+
+        /* ensure we don't cross stripe boundaries */
+        lov_extent_calc(exp, lsm, OBD_CALC_STRIPE_END, (obd_off *)&stripe_end);
+        if (stripe_end <= end)
+                RETURN(0);
+
+        /* map the region limits to the object limits */
+        lov_stripe_offset(lsm, start, l->lap_stripe, &stripe_start);
+        lov_stripe_offset(lsm, end, l->lap_stripe, &stripe_end);
+
+        RETURN(obd_reget_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
+                                    lsm_oinfo[l->lap_stripe]->loi_ost_idx]->
+                                    ltd_exp, NULL, &l->lap_sub_cookie,
+                                    rw, stripe_start, stripe_end, cookie));
+}
+
+static int lov_release_short_lock(struct obd_export *exp,
+                                  struct lov_stripe_md *lsm, loff_t end,
+                                  void *cookie, int rw)
+{
+        int stripe;
+
+        ENTRY;
+
+        stripe = lov_stripe_number(lsm, end);
+
+        RETURN(obd_release_short_lock(exp->exp_obd->u.lov.lov_tgts[lsm->
+                                      lsm_oinfo[stripe]->loi_ost_idx]->
+                                      ltd_exp, NULL, end, cookie, rw));
+}
+
 struct obd_ops lov_obd_ops = {
         .o_owner               = THIS_MODULE,
         .o_setup               = lov_setup,
@@ -2793,6 +2834,8 @@ struct obd_ops lov_obd_ops = {
         .o_brw                 = lov_brw,
         .o_brw_async           = lov_brw_async,
         .o_prep_async_page     = lov_prep_async_page,
+        .o_reget_short_lock    = lov_reget_short_lock,
+        .o_release_short_lock  = lov_release_short_lock,
         .o_queue_async_io      = lov_queue_async_io,
         .o_set_async_flags     = lov_set_async_flags,
         .o_queue_group_io      = lov_queue_group_io,
diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c
index ac9990fc74..305be436f9 100644
--- a/lustre/obdclass/lprocfs_status.c
+++ b/lustre/obdclass/lprocfs_status.c
@@ -1109,6 +1109,8 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, brw_async);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, prep_async_page);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, reget_short_lock);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, release_short_lock);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_async_io);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, queue_group_io);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, trigger_group_io);
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index c96ca76752..b214be9e0a 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -2465,6 +2465,35 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         RETURN(-EDQUOT);
 }
 
+static int osc_reget_short_lock(struct obd_export *exp,
+                                struct lov_stripe_md *lsm,
+                                void **res, int rw,
+                                loff_t start, loff_t end,
+                                void **cookie)
+{
+        struct osc_async_page *oap = *res;
+        int rc;
+
+        ENTRY;
+
+        spin_lock(&oap->oap_lock);
+        rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
+                                  start, end, cookie);
+        spin_unlock(&oap->oap_lock);
+
+        RETURN(rc);
+}
+
+static int osc_release_short_lock(struct obd_export *exp,
+                                  struct lov_stripe_md *lsm, loff_t end,
+                                  void *cookie, int rw)
+{
+        ENTRY;
+        ldlm_lock_fast_release(cookie, rw);
+        /* no error could have happened at this layer */
+        RETURN(0);
+}
+
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
                         obd_off offset, struct obd_async_page_ops *ops,
@@ -2474,7 +2503,7 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         struct osc_async_page *oap;
         struct ldlm_res_id oid = {{0}};
         int rc = 0;
-        
+
         ENTRY;
 
         if (!page)
@@ -3923,6 +3952,8 @@ struct obd_ops osc_obd_ops = {
         .o_brw                  = osc_brw,
         .o_brw_async            = osc_brw_async,
         .o_prep_async_page      = osc_prep_async_page,
+        .o_reget_short_lock     = osc_reget_short_lock,
+        .o_release_short_lock   = osc_release_short_lock,
         .o_queue_async_io       = osc_queue_async_io,
         .o_set_async_flags      = osc_set_async_flags,
         .o_queue_group_io       = osc_queue_group_io,
-- 
GitLab