From 019fc52b288dad0072ca4f077a14e033903ffa55 Mon Sep 17 00:00:00 2001
From: green <green>
Date: Thu, 6 Dec 2007 00:58:18 +0000
Subject: [PATCH] b=10718 r=vitaly,shadow,adilger(some arly versions)

Attach cached pagers to locks that protect those pages. This helps to quickly
find those pages for flush and removal purposes on AST reception and
significantly speeds up lock revocation on big files with pages at
high offsets.
---
 lustre/ChangeLog                 |   7 +
 lustre/include/Makefile.am       |   2 +-
 lustre/include/lustre_cache.h    |  53 ++++
 lustre/include/lustre_dlm.h      |   4 +
 lustre/include/obd.h             |  27 ++-
 lustre/include/obd_class.h       |  59 ++++-
 lustre/include/obd_ost.h         |   4 +
 lustre/include/obd_support.h     |   1 +
 lustre/ldlm/ldlm_lock.c          |   4 +
 lustre/liblustre/rw.c            |   7 +-
 lustre/llite/file.c              | 291 ++++++++--------------
 lustre/llite/llite_close.c       |   1 +
 lustre/llite/llite_internal.h    |   5 +
 lustre/llite/llite_lib.c         |  45 +++-
 lustre/llite/rw.c                | 103 ++++----
 lustre/lov/lov_obd.c             | 138 ++++++++++-
 lustre/obdclass/lprocfs_status.c |   4 +
 lustre/obdecho/echo_client.c     |   2 +-
 lustre/osc/Makefile.in           |   2 +-
 lustre/osc/autoMakefile.am       |   2 +-
 lustre/osc/cache.c               | 398 +++++++++++++++++++++++++++++++
 lustre/osc/osc_internal.h        |   5 +-
 lustre/osc/osc_request.c         | 125 +++++++++-
 lustre/tests/conf-sanity.sh      |  14 +-
 lustre/tests/sanity.sh           |  18 +-
 25 files changed, 1045 insertions(+), 276 deletions(-)
 create mode 100644 lustre/include/lustre_cache.h
 create mode 100644 lustre/osc/cache.c

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 0b7e5a87ba..5edee76894 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -133,6 +133,13 @@ Details    : Implementation of NFS-like root squash capability. Specifically,
              don't allow someone with root access on a client node to be able
              to manipulate files owned by root on a server node.
 
+Severity   : normal
+Bugzilla   : 10718
+Description: Slow trucate/writes to huge files at high offsets.
+Details    : Directly associate cached pages to lock that protect those pages,
+             this allows us to quickly find what pages to write and remove
+	     once lock callback is received.
+
 --------------------------------------------------------------------------------
 
 2007-10-26         Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am
index d87d606bf9..2608a4c9fa 100644
--- a/lustre/include/Makefile.am
+++ b/lustre/include/Makefile.am
@@ -13,5 +13,5 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h 	\
 	     lustre_lite.h lustre_log.h lustre_mds.h lustre_net.h \
              lustre_param.h lustre_quota.h lustre_ucache.h lvfs.h \
 	     obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
-	     obd_ost.h obd_support.h lustre_ver.h
+	     obd_ost.h obd_support.h lustre_cache.h lustre_ver.h
 
diff --git a/lustre/include/lustre_cache.h b/lustre/include/lustre_cache.h
new file mode 100644
index 0000000000..291d882936
--- /dev/null
+++ b/lustre/include/lustre_cache.h
@@ -0,0 +1,53 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef LUSTRE_CACHE_H
+#define LUSTRE_CACHE_H
+#include <obd.h>
+#include <lustre/lustre_idl.h>
+#include <lustre_dlm.h>
+
+struct lustre_cache;
+struct osc_async_page;
+struct page_removal_cb_element {
+        struct list_head        prce_list;
+        obd_page_removal_cb_t   prce_callback;
+};
+
+typedef int (*cache_iterate_extents_cb_t)(struct lustre_cache *,
+                                          struct lustre_handle *,
+                                          struct osc_async_page *,
+                                          void *);
+typedef int (*cache_iterate_locks_cb_t)(struct lustre_cache *,
+                                        struct ldlm_res_id *,
+                                        struct lustre_handle *, void *);
+
+struct lustre_cache {
+        struct list_head         lc_locks_list;
+        spinlock_t               lc_locks_list_lock;
+        struct list_head         lc_page_removal_callback_list;
+        struct obd_device       *lc_obd;
+        obd_pin_extent_cb        lc_pin_extent_cb;
+};
+
+int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh);
+int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
+                     struct osc_async_page *extent,
+                     struct lustre_handle *lockh);
+void cache_remove_extent(struct lustre_cache *, struct osc_async_page *);
+int cache_add_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb,
+                                obd_pin_extent_cb pin_cb);
+int cache_del_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb);
+int cache_iterate_extents(struct lustre_cache *cache, struct lustre_handle *lockh,
+                          cache_iterate_extents_cb_t cb_func, void *data);
+int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh);
+int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
+                        cache_iterate_locks_cb_t cb_fun, void *data);
+struct lustre_cache *cache_create(struct obd_device *obd);
+int cache_destroy(struct lustre_cache *cache);
+
+
+#endif /* LUSTRE_CACHE_H */
diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index e965106cb0..7938c855dc 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -446,6 +446,10 @@ struct ldlm_lock {
         void                 *l_lvb_data;       /* an LVB received during */
         void                 *l_lvb_swabber;    /* an enqueue */
         void                 *l_ast_data;
+        spinlock_t            l_extents_list_lock;
+        struct list_head      l_extents_list;
+
+        struct list_head      l_cache_locks_list;
 
         /* Server-side-only members */
 
diff --git a/lustre/include/obd.h b/lustre/include/obd.h
index 9f2f444402..8c708819f8 100644
--- a/lustre/include/obd.h
+++ b/lustre/include/obd.h
@@ -240,6 +240,11 @@ struct obd_device_target {
         struct lustre_quota_ctxt  obt_qctxt;
 };
 
+typedef void (*obd_pin_extent_cb)(void *data);
+typedef int (*obd_page_removal_cb_t)(void *data, int discard);
+typedef int (*obd_lock_cancel_cb)(struct ldlm_lock *,struct ldlm_lock_desc *,
+                                   void *, int);
+
 #define FILTER_GROUP_LLOG 1
 #define FILTER_GROUP_ECHO 2
 
@@ -324,6 +329,7 @@ struct filter_obd {
 
 struct mdc_rpc_lock;
 struct obd_import;
+struct lustre_cache;
 struct client_obd {
         struct semaphore         cl_sem;
         struct obd_uuid          cl_target_uuid;
@@ -401,8 +407,10 @@ struct client_obd {
 
         /* used by quotacheck */
         int                      cl_qchk_stat; /* quotacheck stat of the peer */
-
         atomic_t                 cl_resends; /* resend count */
+        /* Cache of triples */
+        struct lustre_cache     *cl_cache;
+        obd_lock_cancel_cb       cl_ext_lock_cancel_cb;
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
@@ -553,6 +561,9 @@ struct lov_obd {
         __u32                   lov_offset_idx; /* aliasing for start_idx  */
         int                     lov_start_count;/* reseed counter */
         int                     lov_connects;
+        obd_page_removal_cb_t   lov_page_removal_cb;
+        obd_pin_extent_cb       lov_page_pin_cb;
+        obd_lock_cancel_cb      lov_lock_cancel_cb;
 };
 
 struct niobuf_local {
@@ -903,7 +914,8 @@ struct obd_ops {
                                  struct lov_oinfo *loi,
                                  cfs_page_t *page, obd_off offset, 
                                  struct obd_async_page_ops *ops, void *data,
-                                 void **res);
+                                 void **res, int nocache,
+                                 struct lustre_handle *lockh);
         int (*o_queue_async_io)(struct obd_export *exp,
                                 struct lov_stripe_md *lsm,
                                 struct lov_oinfo *loi, void *cookie,
@@ -995,6 +1007,17 @@ struct obd_ops {
         int (*o_quotactl)(struct obd_export *, struct obd_quotactl *);
 
         int (*o_ping)(struct obd_export *exp);
+
+        int (*o_register_page_removal_cb)(struct obd_export *exp,
+                                          obd_page_removal_cb_t cb,
+                                          obd_pin_extent_cb pin_cb);
+        int (*o_unregister_page_removal_cb)(struct obd_export *exp,
+                                            obd_page_removal_cb_t cb);
+        int (*o_register_lock_cancel_cb)(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb);
+        int (*o_unregister_lock_cancel_cb)(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb);
+        
         /*
          * NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line
          * to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c.
diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h
index 06b21e33e8..57edb95484 100644
--- a/lustre/include/obd_class.h
+++ b/lustre/include/obd_class.h
@@ -952,7 +952,8 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
                                        struct lov_oinfo *loi,
                                        cfs_page_t *page, obd_off offset,
                                        struct obd_async_page_ops *ops,
-                                       void *data, void **res)
+                                       void *data, void **res, int nocache,
+                                       struct lustre_handle *lockh)
 {
         int ret;
         ENTRY;
@@ -961,7 +962,8 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
         EXP_COUNTER_INCREMENT(exp, prep_async_page);
 
         ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
-                                                 ops, data, res);
+                                                 ops, data, res, nocache,
+                                                 lockh);
         RETURN(ret);
 }
 
@@ -1384,6 +1386,59 @@ static inline int obd_register_observer(struct obd_device *obd,
         RETURN(0);
 }
 
+static inline int obd_register_page_removal_cb(struct obd_export *exp,
+                                               obd_page_removal_cb_t cb,
+                                               obd_pin_extent_cb pin_cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, register_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb);
+
+        rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb);
+        RETURN(rc);
+}
+
+static inline int obd_unregister_page_removal_cb(struct obd_export *exp,
+                                                 obd_page_removal_cb_t cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, unregister_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb);
+
+        rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb);
+        RETURN(rc);
+}
+
+static inline int obd_register_lock_cancel_cb(struct obd_export *exp,
+                                              obd_lock_cancel_cb cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, register_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb);
+
+        rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb);
+        RETURN(rc);
+}
+
+static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp,
+                                                 obd_lock_cancel_cb cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, unregister_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb);
+
+        rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb);
+        RETURN(rc);
+}
+
 /* OBD Metadata Support */
 
 extern int obd_init_caches(void);
diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h
index 8dcc9c283f..d2b399e0ce 100644
--- a/lustre/include/obd_ost.h
+++ b/lustre/include/obd_ost.h
@@ -34,4 +34,8 @@ struct osc_enqueue_args {
         struct ldlm_enqueue_info*oa_ei;
 };
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag);
+
 #endif
diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h
index 50738b15f6..963e0fdf95 100644
--- a/lustre/include/obd_support.h
+++ b/lustre/include/obd_support.h
@@ -201,6 +201,7 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_LDLM_CANCEL_RACE        0x310
 #define OBD_FAIL_LDLM_CANCEL_EVICT_RACE  0x311
 #define OBD_FAIL_LDLM_PAUSE_CANCEL       0x312
+#define OBD_FAIL_LDLM_CLOSE_THREAD       0x313
 
 #define OBD_FAIL_OSC                     0x400
 #define OBD_FAIL_OSC_BRW_READ_BULK       0x401
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index e191e4ba29..62603023c1 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -348,6 +348,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
         class_handle_hash(&lock->l_handle, lock_handle_addref);
 
+        CFS_INIT_LIST_HEAD(&lock->l_extents_list);
+        spin_lock_init(&lock->l_extents_list_lock);
+        CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
+
         RETURN(lock);
 }
 
diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c
index 76d6c3de35..f164d29f63 100644
--- a/lustre/liblustre/rw.c
+++ b/lustre/liblustre/rw.c
@@ -486,7 +486,9 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group,
                 rc = obd_prep_async_page(exp, lsm, NULL, page,
                                          (obd_off)page->index << CFS_PAGE_SHIFT,
                                          &llu_async_page_ops,
-                                         llap, &llap->llap_cookie);
+                                         llap, &llap->llap_cookie,
+                                         1 /* no cache in liblustre at all */,
+                                         NULL);
                 if (rc) {
                         LASSERT(rc < 0);
                         llap->llap_cookie = NULL;
@@ -557,7 +559,8 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
         if (!llap_cookie_size)
                 llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
                                                        NULL, NULL, NULL, 0,
-                                                       NULL, NULL, NULL);
+                                                       NULL, NULL, NULL, 0,
+                                                       NULL);
 
         OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));
         if (!group)
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index c0d6f51192..a5613aa760 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -631,164 +631,95 @@ check:
         RETURN(stripe);
 }
 
-/* Flush the page cache for an extent as its canceled.  When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+        struct page *page = data;
+        
+        page_cache_get(page);
+
+        return;
+}
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
  *
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
  * but other kernel actors could have pages locked.
  *
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
  * Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
-                              struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
 {
-        ldlm_policy_data_t tmpex;
-        unsigned long start, end, count, skip, i, j;
-        struct page *page;
-        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
-        struct lustre_handle lockh;
-        struct address_space *mapping = inode->i_mapping;
+        int rc;
+        struct page *page = data;
+        struct address_space *mapping;
 
         ENTRY;
-        tmpex = lock->l_policy_data;
-        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
-               i_size_read(inode));
-
-        /* our locks are page granular thanks to osc_enqueue, we invalidate the
-         * whole page. */
-        if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
-            ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
-                LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
-        LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
-        LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
-        count = ~0;
-        skip = 0;
-        start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
-        end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
-        if (lsm->lsm_stripe_count > 1) {
-                count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
-                skip = (lsm->lsm_stripe_count - 1) * count;
-                start += start/count * skip + stripe * count;
-                if (end != ~0)
-                        end += end/count * skip + stripe * count;
-        }
-        if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
-                end = ~0;
-
-        i = i_size_read(inode) ? (i_size_read(inode) - 1) >> CFS_PAGE_SHIFT : 0;
-        if (i < end)
-                end = i;
-
-        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
-               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
-               count, skip, end, discard ? " (DISCARDING)" : "");
-
-        /* walk through the vmas on the inode and tear down mmaped pages that
-         * intersect with the lock.  this stops immediately if there are no
-         * mmap()ed regions of the file.  This is not efficient at all and
-         * should be short lived. We'll associate mmap()ed pages with the lock
-         * and will be able to find them directly */
-        for (i = start; i <= end; i += (j + skip)) {
-                j = min(count - (i % count), end - i + 1);
-                LASSERT(j > 0);
-                LASSERT(mapping);
-                if (ll_teardown_mmaps(mapping,
-                                      (__u64)i << CFS_PAGE_SHIFT,
-                                      ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
-                        break;
-        }
 
-        /* this is the simplistic implementation of page eviction at
-         * cancelation.  It is careful to get races with other page
-         * lockers handled correctly.  fixes from bug 20 will make it
-         * more efficient by associating locks with pages and with
-         * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count; i <= end;
-             j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
-                if (j == count) {
-                        CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
-                        i += skip;
-                        j = 0;
-                        if (i > end)
-                                break;
-                }
-                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
-                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
-                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                         start, i, end);
+        /* We have page reference already from ll_pin_page */
+        lock_page(page);
 
-                if (!mapping_has_pages(mapping)) {
-                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
-                        break;
-                }
+        /* Already truncated by somebody */
+        if (!page->mapping)
+                GOTO(out, rc = 0);
 
-                cond_resched();
+        mapping = page->mapping;
 
-                page = find_get_page(mapping, i);
-                if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
-                               i, tmpex.l_extent.start);
-                lock_page(page);
+        ll_teardown_mmaps(mapping,
+                          (__u64)page->index << PAGE_CACHE_SHIFT,
+                          ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+                                                              ~PAGE_CACHE_MASK);        
+        LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
 
-                /* page->mapping to check with racing against teardown */
-                if (!discard && clear_page_dirty_for_io(page)) {
-                        rc = ll_call_writepage(inode, page);
-                        /* either waiting for io to complete or reacquiring
-                         * the lock that the failed writepage released */
-                        lock_page(page);
-                        wait_on_page_writeback(page);
-                        if (rc != 0) {
-                                CERROR("writepage inode %lu(%p) of page %p "
-                                       "failed: %d\n", inode->i_ino, inode,
-                                       page, rc);
+        if (!discard && clear_page_dirty_for_io(page)) {
+                LASSERT(page->mapping);
+                rc = ll_call_writepage(page->mapping->host, page);
+                /* either waiting for io to complete or reacquiring
+                 * the lock that the failed writepage released */
+                lock_page(page);
+                wait_on_page_writeback(page);
+                if (rc != 0) {
+                        CERROR("writepage inode %lu(%p) of page %p "
+                               "failed: %d\n", mapping->host->i_ino,
+                               mapping->host, page, rc);
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-                                if (rc == -ENOSPC)
-                                        set_bit(AS_ENOSPC, &mapping->flags);
-                                else
-                                        set_bit(AS_EIO, &mapping->flags);
+                        if (rc == -ENOSPC)
+                                set_bit(AS_ENOSPC, &mapping->flags);
+                        else
+                                set_bit(AS_EIO, &mapping->flags);
 #else
-                                mapping->gfp_mask |= AS_EIO_MASK;
+                        mapping->gfp_mask |= AS_EIO_MASK;
 #endif
-                        }
-                }
-
-                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
-                /* check to see if another DLM lock covers this page  b=2765 */
-                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
-                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
-                                      LDLM_FL_TEST_LOCK,
-                                      &lock->l_resource->lr_name, LDLM_EXTENT,
-                                      &tmpex, LCK_PR | LCK_PW, &lockh);
-                if (rc2 == 0 && page->mapping != NULL) {
-                        struct ll_async_page *llap = llap_cast_private(page);
-                        // checking again to account for writeback's lock_page()
-                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
-                        if (llap)
-                                ll_ra_accounting(llap, mapping);
-                        ll_truncate_complete_page(page);
                 }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        LASSERTF(tmpex.l_extent.start <=
-                 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
-                  lock->l_policy_data.l_extent.end + 1),
-                 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
-                 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                 start, i, end);
+        }
+        if (page->mapping != NULL) {
+                struct ll_async_page *llap = llap_cast_private(page);
+                // checking again to account for writeback's lock_page()
+                LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+                if (llap)
+                        ll_ra_accounting(llap, page->mapping);
+                ll_truncate_complete_page(page);
+        }
         EXIT;
+out:
+        LASSERT(!PageWriteback(page));
+        unlock_page(page);
+        page_cache_release(page);
+
+        return 0;
 }
 
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
-                                   struct ldlm_lock_desc *new, void *data,
-                                   int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
+        struct inode *inode;
+        struct ll_inode_info *lli;
+        struct lov_stripe_md *lsm;
+        int stripe;
+        __u64 kms;
+
         ENTRY;
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
@@ -796,61 +727,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 LBUG();
         }
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-                struct inode *inode;
-                struct ll_inode_info *lli;
-                struct lov_stripe_md *lsm;
-                int stripe;
-                __u64 kms;
-
-                /* This lock wasn't granted, don't try to evict pages */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
-
-                inode = ll_inode_from_lock(lock);
-                if (inode == NULL)
-                        RETURN(0);
-                lli = ll_i2info(inode);
-                if (lli == NULL)
-                        goto iput;
-                if (lli->lli_smd == NULL)
-                        goto iput;
-                lsm = lli->lli_smd;
-
-                stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
-                        goto iput;
-
-                ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+        inode = ll_inode_from_lock(lock);
+        if (inode == NULL)
+                RETURN(0);
+        lli = ll_i2info(inode);
+        if (lli == NULL)
+                GOTO(iput, 0);
+        if (lli->lli_smd == NULL)
+                GOTO(iput, 0);
+        lsm = lli->lli_smd;
 
-                lov_stripe_lock(lsm);
-                lock_res_and_lock(lock);
-                kms = ldlm_extent_shift_kms(lock,
-                                            lsm->lsm_oinfo[stripe]->loi_kms);
+        stripe = ll_lock_to_stripe_offset(inode, lock);
+        if (stripe < 0)
+                GOTO(iput, 0);
 
-                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
-                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
-                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);
-                lsm->lsm_oinfo[stripe]->loi_kms = kms;
-                unlock_res_and_lock(lock);
-                lov_stripe_unlock(lsm);
-                ll_try_done_writing(inode);
-        iput:
-                iput(inode);
-                break;
-        }
-        default:
-                LBUG();
-        }
+        lov_stripe_lock(lsm);
+        lock_res_and_lock(lock);
+        kms = ldlm_extent_shift_kms(lock,
+                                    lsm->lsm_oinfo[stripe]->loi_kms);
+
+        if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+                LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+                           lsm->lsm_oinfo[stripe]->loi_kms, kms);
+        lsm->lsm_oinfo[stripe]->loi_kms = kms;
+        unlock_res_and_lock(lock);
+        lov_stripe_unlock(lsm);
+        ll_try_done_writing(inode);
+        EXIT;
+iput:
+        iput(inode);
 
-        RETURN(0);
+        return 0;
 }
 
 #if 0
@@ -973,7 +880,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
         
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = NULL;
@@ -1034,7 +941,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1097,7 +1004,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c
index 10a98fab8c..d36f2d6f8e 100644
--- a/lustre/llite/llite_close.c
+++ b/lustre/llite/llite_close.c
@@ -236,6 +236,7 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret)
         struct ll_close_queue *lcq;
         pid_t pid;
 
+        OBD_FAIL_RETURN(OBD_FAIL_LDLM_CLOSE_THREAD, -EINTR);
         OBD_ALLOC(lcq, sizeof(*lcq));
         if (lcq == NULL)
                 return -ENOMEM;
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index 9f9c436a4e..88e93333d1 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -406,6 +406,7 @@ struct ll_async_page {
                          llap_origin:3,
                          llap_ra_used:1,
                          llap_ignore_quota:1,
+                         llap_nocache:1,
                          llap_lockless_io_page:1;
         void            *llap_cookie;
         struct page     *llap_page;
@@ -508,6 +509,10 @@ int ll_prepare_mdc_op_data(struct mdc_op_data *,
 struct lookup_intent *ll_convert_intent(struct open_intent *oit,
                                         int lookup_flags);
 #endif
+void ll_pin_extent_cb(void *data);
+int ll_page_removal_cb(void *data, int discard);
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag);
 int lookup_it_finish(struct ptlrpc_request *request, int offset,
                      struct lookup_intent *it, void *data);
 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c
index fdaf011332..66dc80f3f5 100644
--- a/lustre/llite/llite_lib.c
+++ b/lustre/llite/llite_lib.c
@@ -34,6 +34,7 @@
 #include <lprocfs_status.h>
 #include <lustre_disk.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 #include "llite_internal.h"
 
 cfs_mem_cache_t *ll_file_data_slab;
@@ -287,7 +288,6 @@ static int client_common_fill_super(struct super_block *sb,
         obd->obd_upcall.onu_upcall = ll_ocd_update;
         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
 
-
         err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, data, NULL);
         if (err == -EBUSY) {
                 LCONSOLE_ERROR_MSG(0x150, "An OST (osc %s) is performing "
@@ -304,15 +304,33 @@ static int client_common_fill_super(struct super_block *sb,
         sbi->ll_lco.lco_flags = data->ocd_connect_flags;
         spin_unlock(&sbi->ll_lco.lco_lock);
 
-        mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+        err = obd_register_page_removal_cb(sbi->ll_osc_exp,
+                                           ll_page_removal_cb, 
+                                           ll_pin_extent_cb);
+        if (err) {
+                CERROR("cannot register page removal callback: rc = %d\n",err);
+                GOTO(out_osc, err);
+        }
+        err = obd_register_lock_cancel_cb(sbi->ll_osc_exp,
+                                          ll_extent_lock_cancel_cb);
+        if (err) {
+                CERROR("cannot register lock cancel callback: rc = %d\n", err);
+                GOTO(out_page_rm_cb, err);
+        }
+
+        err = mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+        if (err) {
+                CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
+                GOTO(out_lock_cn_cb, err);
+        }
 
         err = obd_prep_async_page(sbi->ll_osc_exp, NULL, NULL, NULL,
-                                  0, NULL, NULL, NULL);
+                                  0, NULL, NULL, NULL, 0, NULL);
         if (err < 0) {
                 LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
                                    "filesystem. There must be at least one "
                                    "active OST for a client to start.\n");
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         if (!ll_async_page_slab) {
@@ -322,13 +340,13 @@ static int client_common_fill_super(struct super_block *sb,
                                                           ll_async_page_slab_size,
                                                           0, 0);
                 if (!ll_async_page_slab)
-                        GOTO(out_osc, -ENOMEM);
+                        GOTO(out_lock_cn_cb, -ENOMEM);
         }
 
         err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
         CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id);
         sbi->ll_rootino = rootfid.id;
@@ -346,14 +364,14 @@ static int client_common_fill_super(struct super_block *sb,
                           0, &request);
         if (err) {
                 CERROR("mdc_getattr failed for root: rc = %d\n", err);
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         err = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp, &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
                 ptlrpc_req_finished (request);
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         LASSERT(sbi->ll_rootino != 0);
@@ -396,6 +414,12 @@ static int client_common_fill_super(struct super_block *sb,
 out_root:
         if (root)
                 iput(root);
+out_lock_cn_cb:
+        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,
+                                      ll_extent_lock_cancel_cb);
+out_page_rm_cb:
+        obd_unregister_page_removal_cb(sbi->ll_osc_exp,
+                                       ll_page_removal_cb);
 out_osc:
         obd_disconnect(sbi->ll_osc_exp);
         sbi->ll_osc_exp = NULL;
@@ -595,6 +619,11 @@ void client_common_put_super(struct super_block *sb)
         prune_deathrow(sbi, 0);
 
         list_del(&sbi->ll_conn_chain);
+
+        obd_unregister_page_removal_cb(sbi->ll_osc_exp,
+                                       ll_page_removal_cb);
+        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,ll_extent_lock_cancel_cb);
+
         obd_disconnect(sbi->ll_osc_exp);
         sbi->ll_osc_exp = NULL;
 
diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c
index 22eafb86c2..0511a321c1 100644
--- a/lustre/llite/rw.c
+++ b/lustre/llite/rw.c
@@ -566,7 +566,9 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
         return count;
 }
 
-static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
+static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
+                                                       unsigned origin,
+                                                       struct lustre_handle *lockh)
 {
         struct ll_async_page *llap;
         struct obd_export *exp;
@@ -619,9 +621,14 @@ static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
         llap->llap_magic = LLAP_MAGIC;
         llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
 
+        /* XXX: for bug 11270 - check for lockless origin here! */
+        if (origin == LLAP_ORIGIN_LOCKLESS_IO)
+                llap->llap_nocache = 1;
+
         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
                                  (obd_off)page->index << CFS_PAGE_SHIFT,
-                                 &ll_async_page_ops, llap, &llap->llap_cookie);
+                                 &ll_async_page_ops, llap, &llap->llap_cookie,
+                                 llap->llap_nocache, lockh);
         if (rc) {
                 OBD_SLAB_FREE(llap, ll_async_page_slab,
                               ll_async_page_slab_size);
@@ -669,6 +676,12 @@ static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
         RETURN(llap);
 }
 
+static inline struct ll_async_page *llap_from_page(struct page *page,
+                                                   unsigned origin)
+{
+        return llap_from_page_with_lockh(page, origin, NULL);
+}
+
 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
                                struct ll_async_page *llap,
                                unsigned to, obd_flag async_flags)
@@ -764,12 +777,14 @@ out:
 int ll_commit_write(struct file *file, struct page *page, unsigned from,
                     unsigned to)
 {
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
         struct inode *inode = page->mapping->host;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct obd_export *exp;
         struct ll_async_page *llap;
         loff_t size;
+        struct lustre_handle *lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -780,7 +795,10 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
                inode, page, from, to, page->index);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_COMMIT_WRITE);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+                lockh = &fd->fd_cwlockh;
+
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
@@ -959,6 +977,7 @@ static void __ll_put_llap(struct page *page)
  * here. */
 void ll_removepage(struct page *page)
 {
+        struct ll_async_page *llap = llap_cast_private(page);
         ENTRY;
 
         LASSERT(!in_interrupt());
@@ -970,37 +989,15 @@ void ll_removepage(struct page *page)
                 return;
         }
 
-        LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+        LASSERT(!llap->llap_lockless_io_page);
+        LASSERT(!llap->llap_nocache);
+
         LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
         __ll_put_llap(page);
 
         EXIT;
 }
 
-static int ll_page_matches(struct page *page, int fd_flags)
-{
-        struct lustre_handle match_lockh = {0};
-        struct inode *inode = page->mapping->host;
-        ldlm_policy_data_t page_extent;
-        int flags, matches;
-        ENTRY;
-
-        if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED))
-                RETURN(1);
-
-        page_extent.l_extent.start = (__u64)page->index << CFS_PAGE_SHIFT;
-        page_extent.l_extent.end =
-                page_extent.l_extent.start + CFS_PAGE_SIZE - 1;
-        flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
-        if (!(fd_flags & LL_FILE_READAHEAD))
-                flags |= LDLM_FL_CBPENDING;
-        matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
-                            ll_i2info(inode)->lli_smd, LDLM_EXTENT,
-                            &page_extent, LCK_PR | LCK_PW, &flags, inode,
-                            &match_lockh);
-        RETURN(matches);
-}
-
 static int ll_issue_page_read(struct obd_export *exp,
                               struct ll_async_page *llap,
                               struct obd_io_group *oig, int defer)
@@ -1209,22 +1206,22 @@ static int ll_readahead(struct ll_readahead_state *ras,
                 /* we do this first so that we can see the page in the /proc
                  * accounting */
                 llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
-                if (IS_ERR(llap) || llap->llap_defer_uptodate)
+                if (IS_ERR(llap) || llap->llap_defer_uptodate) {
+                        if (PTR_ERR(llap) == -ENOLCK) {
+                                ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
+                                match_failed = 1;
+                                CDEBUG(D_READA | D_PAGE,
+                                       "Adding page to cache failed index "
+                                       "%lu\n", i);
+                        }
                         goto next_page;
+                }
 
                 /* skip completed pages */
                 if (Page_Uptodate(page))
                         goto next_page;
 
                 /* bail when we hit the end of the lock. */
-                if ((rc = ll_page_matches(page, flags|LL_FILE_READAHEAD)) <= 0){
-                        LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
-                                       "lock match failed: rc %d\n", rc);
-                        ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
-                        match_failed = 1;
-                        goto next_page;
-                }
-
                 rc = ll_issue_page_read(exp, llap, oig, 1);
                 if (rc == 0) {
                         reserved--;
@@ -1403,6 +1400,7 @@ int ll_writepage(struct page *page)
         if (IS_ERR(llap))
                 GOTO(out, rc = PTR_ERR(llap));
 
+        LASSERT(!llap->llap_nocache);
         LASSERT(!PageWriteback(page));
         set_page_writeback(page);
 
@@ -1450,6 +1448,7 @@ int ll_readpage(struct file *filp, struct page *page)
         struct obd_export *exp;
         struct ll_async_page *llap;
         struct obd_io_group *oig = NULL;
+        struct lustre_handle *lockh = NULL;
         int rc;
         ENTRY;
 
@@ -1481,9 +1480,19 @@ int ll_readpage(struct file *filp, struct page *page)
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_READPAGE);
-        if (IS_ERR(llap))
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+                lockh = &fd->fd_cwlockh;
+
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+        if (IS_ERR(llap)) {
+                if (PTR_ERR(llap) == -ENOLCK) {
+                        CWARN("ino %lu page %lu (%llu) not covered by "
+                              "a lock (mmap?).  check debug logs.\n",
+                              inode->i_ino, page->index,
+                              (long long)page->index << PAGE_CACHE_SHIFT);
+                }
                 GOTO(out, rc = PTR_ERR(llap));
+        }
 
         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
                 ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
@@ -1504,22 +1513,6 @@ int ll_readpage(struct file *filp, struct page *page)
                 GOTO(out_oig, rc = 0);
         }
 
-        if (likely((fd->fd_flags & LL_FILE_IGNORE_LOCK) == 0)) {
-                rc = ll_page_matches(page, fd->fd_flags);
-                if (rc < 0) {
-                        LL_CDEBUG_PAGE(D_ERROR, page,
-                                       "lock match failed: rc %d\n", rc);
-                        GOTO(out, rc);
-                }
-
-                if (rc == 0) {
-                        CWARN("ino %lu page %lu (%llu) not covered by "
-                              "a lock (mmap?).  check debug logs.\n",
-                              inode->i_ino, page->index,
-                              (long long)page->index << CFS_PAGE_SHIFT);
-                }
-        }
-
         rc = ll_issue_page_read(exp, llap, oig, 0);
         if (rc)
                 GOTO(out, rc);
diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c
index 99e0ff70e1..0f614284e2 100644
--- a/lustre/lov/lov_obd.c
+++ b/lustre/lov/lov_obd.c
@@ -48,6 +48,7 @@
 #include <obd_ost.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 
 #include "lov_internal.h"
 
@@ -87,6 +88,94 @@ void lov_putref(struct obd_device *obd)
         mutex_up(&lov->lov_lock);
 }
 
+static int lov_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+                return -EBUSY;
+
+        if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
+                return -EBUSY;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+                                                   func, pin_cb);
+        }
+
+        lov->lov_page_removal_cb = func;
+        lov->lov_page_pin_cb = pin_cb;
+
+        return rc;
+}
+
+static int lov_unregister_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+                return -EINVAL;
+
+        lov->lov_page_removal_cb = NULL;
+        lov->lov_page_pin_cb = NULL;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+                                                     func);
+        }
+
+        return rc;
+}
+
+static int lov_register_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+                return -EBUSY;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+                                                  func);
+        }
+
+        lov->lov_lock_cancel_cb = func;
+
+        return rc;
+}
+
+static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+                return -EINVAL;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+                                                    func);
+        }
+        lov->lov_lock_cancel_cb = NULL;
+        return rc;
+}
+
 #define MAX_STRING_SIZE 128
 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, 
                            struct obd_connect_data *data)
@@ -160,10 +249,33 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 RETURN(-ENODEV);
         }
 
+        rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                          lov->lov_page_removal_cb,
+                                          lov->lov_page_pin_cb);
+        if (rc) {
+                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+                lov->lov_tgts[index]->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
+        rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                         lov->lov_lock_cancel_cb);
+        if (rc) {
+                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                               lov->lov_page_removal_cb);
+                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+                lov->lov_tgts[index]->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
         rc = obd_register_observer(tgt_obd, obd);
         if (rc) {
                 CERROR("Target %s register_observer error %d\n",
                        obd_uuid2str(&tgt_uuid), rc);
+                obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                              lov->lov_lock_cancel_cb);
+                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                               lov->lov_page_removal_cb);
                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
                 lov->lov_tgts[index]->ltd_exp = NULL;
                 RETURN(rc);
@@ -265,6 +377,10 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", 
                obd->obd_name, osc_obd->obd_name);
 
+        obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                      lov->lov_lock_cancel_cb);
+        obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                       lov->lov_page_removal_cb);
         if (lov->lov_tgts[index]->ltd_active) {
                 lov->lov_tgts[index]->ltd_active = 0;
                 lov->desc.ld_active_tgt_count--;
@@ -1658,10 +1774,12 @@ static struct obd_async_page_ops lov_async_page_ops = {
 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                            struct lov_oinfo *loi, cfs_page_t *page,
                            obd_off offset, struct obd_async_page_ops *ops,
-                           void *data, void **res)
+                           void *data, void **res, int nocache,
+                           struct lustre_handle *lockh)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_async_page *lap;
+        struct lov_lock_handles *lov_lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -1678,7 +1796,8 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                 }
                 rc = size_round(sizeof(*lap)) +
                         obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
-                                            NULL, NULL, 0, NULL, NULL, NULL);
+                                            NULL, NULL, 0, NULL, NULL, NULL, 0,
+                                            NULL);
                 RETURN(rc);
         }
         ASSERT_LSM_MAGIC(lsm);
@@ -1699,10 +1818,19 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
 
+        if (lockh) {
+                lov_lockh = lov_handle2llh(lockh);
+                if (lov_lockh) {
+                        lockh = lov_lockh->llh_handles + lap->lap_stripe;
+                }
+        }
+
         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                  lsm, loi, page, lap->lap_sub_offset,
                                  &lov_async_page_ops, lap,
-                                 &lap->lap_sub_cookie);
+                                 &lap->lap_sub_cookie, nocache, lockh);
+        if (lov_lockh)
+                lov_llh_put(lov_lockh);
         if (rc)
                 RETURN(rc);
         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
@@ -2664,6 +2792,10 @@ struct obd_ops lov_obd_ops = {
         .o_llog_init           = lov_llog_init,
         .o_llog_finish         = lov_llog_finish,
         .o_notify              = lov_notify,
+        .o_register_page_removal_cb = lov_register_page_removal_cb,
+        .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
 };
 
 static quota_interface_t *quota_interface;
diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c
index 7cb6e99586..ff46954836 100644
--- a/lustre/obdclass/lprocfs_status.c
+++ b/lustre/obdclass/lprocfs_status.c
@@ -1120,6 +1120,10 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotacheck);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotactl);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, ping);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_page_removal_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats,stats,unregister_page_removal_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_lock_cancel_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats,unregister_lock_cancel_cb);
 }
 
 int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c
index 51099bb2ec..d7f54a9100 100644
--- a/lustre/obdecho/echo_client.c
+++ b/lustre/obdecho/echo_client.c
@@ -858,7 +858,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
                 rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
                                          eap->eap_off, &ec_async_page_ops,
-                                         eap, &eap->eap_cookie);
+                                         eap, &eap->eap_cookie, 1, NULL);
                 if (rc) {
                         spin_lock(&eas.eas_lock);
                         eas.eas_rc = rc;
diff --git a/lustre/osc/Makefile.in b/lustre/osc/Makefile.in
index ce9107fc64..2eb2eea34d 100644
--- a/lustre/osc/Makefile.in
+++ b/lustre/osc/Makefile.in
@@ -1,4 +1,4 @@
 MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o cache.o
 
 @INCLUDE_RULES@
diff --git a/lustre/osc/autoMakefile.am b/lustre/osc/autoMakefile.am
index 2b00785c30..985e4739d1 100644
--- a/lustre/osc/autoMakefile.am
+++ b/lustre/osc/autoMakefile.am
@@ -5,7 +5,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h cache.c
 libosc_a_CPPFLAGS = $(LLCPPFLAGS)
 libosc_a_CFLAGS = $(LLCFLAGS)
 endif
diff --git a/lustre/osc/cache.c b/lustre/osc/cache.c
new file mode 100644
index 0000000000..c144d96e88
--- /dev/null
+++ b/lustre/osc/cache.c
@@ -0,0 +1,398 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author Oleg Drokin <green@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ *
+ * Cache of triples - object, lock, extent
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_OSC
+
+#ifdef __KERNEL__
+# include <linux/version.h>
+# include <linux/module.h>
+# include <linux/list.h>
+#else                           /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <lustre_dlm.h>
+#include <lustre_cache.h>
+#include <obd.h>
+#include <lustre_debug.h>
+
+#include "osc_internal.h"
+
+/* Adding @lock to the @cache */
+int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
+{
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+        if (!lock)      // Lock disappeared under us.
+                return 0;
+
+        spin_lock(&cache->lc_locks_list_lock);
+        list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
+        spin_unlock(&cache->lc_locks_list_lock);
+
+        LDLM_LOCK_PUT(lock);
+
+        return 0;
+}
+
+/* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
+   just tries to match some suitable lock by resource and data contained in
+   @extent */
+/* Should be called with oap->lock held (except on initial addition, see
+   comment in osc_request.c*/
+int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
+                     struct osc_async_page *extent, struct lustre_handle *lockh)
+{
+        struct lustre_handle tmplockh;
+        ldlm_policy_data_t tmpex;
+        struct ldlm_lock *lock = NULL;
+        ENTRY;
+
+        /* Don't add anything second time */
+        if (!list_empty(&extent->oap_page_list)) {
+                LBUG();
+                RETURN(0);
+        }
+
+        if (lockh && lustre_handle_is_used(lockh)) {
+                lock = ldlm_handle2lock(lockh);
+                if (!lock)
+                        RETURN(-ENOLCK);
+
+                LASSERTF(lock->l_policy_data.l_extent.start <=
+                         extent->oap_obj_off &&
+                         extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
+                         lock->l_policy_data.l_extent.end,
+                         "Got wrong lock [" LPU64 "," LPU64 "] for page with "
+                         "offset " LPU64 "\n",
+                         lock->l_policy_data.l_extent.start,
+                         lock->l_policy_data.l_extent.end, extent->oap_obj_off);
+        } else {
+                int mode;
+                /* Real extent width calculation here once we have real
+                 * extents
+                 */
+                tmpex.l_extent.start = extent->oap_obj_off;
+                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
+
+                /* XXX find lock from extent or something like that */
+                /* The lock mode does not matter. If this is dirty page - then
+                 * there could be only one PW lock. If the page is clean,
+                 * any PR lock is good
+                 */
+                mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
+                                       LDLM_FL_BLOCK_GRANTED |
+                                       LDLM_FL_CBPENDING, res, LDLM_EXTENT,
+                                       &tmpex, LCK_PW | LCK_PR, &tmplockh);
+
+                if (mode <= 0) {
+                        CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
+                               " extent to!\n", tmpex.l_extent.start,
+                               tmpex.l_extent.end);
+                        RETURN((mode < 0) ? mode : -ENOLCK);
+                }
+
+                lock = ldlm_handle2lock(&tmplockh);
+                if (!lock) {    // Race - lock disappeared under us (eviction?)
+                        CDEBUG(D_CACHE, "Newly matched lock just disappeared "
+                               "under us\n");
+                        RETURN(-ENOLCK);
+                }
+                ldlm_lock_decref(&tmplockh, mode);
+        }
+
+        spin_lock(&lock->l_extents_list_lock);
+        list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
+        spin_unlock(&lock->l_extents_list_lock);
+        extent->oap_ldlm_lock = lock;
+        LDLM_LOCK_PUT(lock);
+
+        RETURN(0);
+}
+
+static int cache_extent_removal_event(struct lustre_cache *cache,
+                                      void *data, int discard)
+{
+        struct page *page = data;
+        struct page_removal_cb_element *element;
+
+        list_for_each_entry(element, &cache->lc_page_removal_callback_list,
+                            prce_list) {
+                element->prce_callback(page, discard);
+        }
+        return 0;
+}
+
+/* Registers set of pin/remove callbacks for extents. Current limitation is
+   there could be only one pin_cb per cache.
+   @pin_cb is called when we have the page locked to pin it in memory so that
+   it does not disappear after we release page lock (which we need to do
+   to avoid deadlocks).
+   @func_cb is removal callback that is called after page and all spinlocks are
+   released, and is supposed to clean the page and remove it from all
+   (vfs) caches it might be in */
+int cache_add_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb,
+                                obd_pin_extent_cb pin_cb)
+{
+        struct page_removal_cb_element *element;
+
+        if (!func_cb)
+                return 0;
+        OBD_ALLOC(element, sizeof(*element));
+        if (!element)
+                return -ENOMEM;
+        element->prce_callback = func_cb;
+        list_add_tail(&element->prce_list,
+                      &cache->lc_page_removal_callback_list);
+
+        cache->lc_pin_extent_cb = pin_cb;
+        return 0;
+}
+EXPORT_SYMBOL(cache_add_extent_removal_cb);
+
+/* Unregister exntent removal callback registered earlier. If the list of
+   registered removal callbacks becomes empty, we also clear pin callback
+   since it could only be one */
+int cache_del_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb)
+{
+        int found = 0;
+        struct page_removal_cb_element *element, *t;
+
+        list_for_each_entry_safe(element, t,
+                                 &cache->lc_page_removal_callback_list,
+                                 prce_list) {
+                if (element->prce_callback == func_cb) {
+                        list_del(&element->prce_list);
+                        OBD_FREE(element, sizeof(*element));
+                        found = 1;
+                        /* We continue iterating the list in case this function
+                           was registered more than once */
+                }
+        }
+
+        if (list_empty(&cache->lc_page_removal_callback_list))
+                cache->lc_pin_extent_cb = NULL;
+
+        return !found;
+}
+EXPORT_SYMBOL(cache_del_extent_removal_cb);
+
+static int cache_remove_extent_nolock(struct lustre_cache *cache,
+                                      struct osc_async_page *extent)
+{
+        int have_lock = !!extent->oap_ldlm_lock;
+        /* We used to check oap_ldlm_lock for non NULL here, but it might be
+           NULL, in fact, due to parallel page eviction clearing it and waiting
+           on a lock's page list lock */
+        extent->oap_ldlm_lock = NULL;
+
+        if (!list_empty(&extent->oap_page_list))
+                list_del_init(&extent->oap_page_list);
+
+        return have_lock;
+}
+
+/* Request the @extent to be removed from cache and locks it belongs to. */
+void cache_remove_extent(struct lustre_cache *cache,
+                         struct osc_async_page *extent)
+{
+        struct ldlm_lock *lock;
+
+        spin_lock(&extent->oap_lock);
+        lock = extent->oap_ldlm_lock;
+
+        extent->oap_ldlm_lock = NULL;
+        spin_unlock(&extent->oap_lock);
+
+        /* No lock - means this extent is not in any list */
+        if (!lock)
+                return;
+
+        spin_lock(&lock->l_extents_list_lock);
+        if (!list_empty(&extent->oap_page_list))
+                list_del_init(&extent->oap_page_list);
+        spin_unlock(&lock->l_extents_list_lock);
+}
+
+/* Iterate through list of extents in given lock identified by @lockh,
+   calling @cb_func for every such extent. Also passed @data to every call.
+   Stops iterating prematurely if @cb_func returns nonzero. */
+int cache_iterate_extents(struct lustre_cache *cache,
+                          struct lustre_handle *lockh,
+                          cache_iterate_extents_cb_t cb_func, void *data)
+{
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+        struct osc_async_page *extent, *t;
+
+        if (!lock)      // Lock disappeared
+                return 0;
+        /* Parallel page removal from mem pressure can race with us */
+        spin_lock(&lock->l_extents_list_lock);
+        list_for_each_entry_safe(extent, t, &lock->l_extents_list,
+                                 oap_page_list) {
+                if (cb_func(cache, lockh, extent, data))
+                        break;
+        }
+        spin_unlock(&lock->l_extents_list_lock);
+        LDLM_LOCK_PUT(lock);
+
+        return 0;
+}
+
+static int cache_remove_extents_from_lock(struct lustre_cache *cache,
+                                          struct ldlm_lock *lock, void *data)
+{
+        struct osc_async_page *extent;
+        void *ext_data;
+
+        LASSERT(lock);
+
+        spin_lock(&lock->l_extents_list_lock);
+        while (!list_empty(&lock->l_extents_list)) {
+                extent = list_entry(lock->l_extents_list.next,
+                                    struct osc_async_page, oap_page_list);
+
+                spin_lock(&extent->oap_lock);
+                /* If there is no lock referenced from this oap, it means
+                   there is parallel page-removal process waiting to free that
+                   page on l_extents_list_lock and it holds page lock.
+                   We need this page to completely go away and for that to
+                   happen we will just try to truncate it here too.
+                   Serialisation on page lock will achieve that goal for us. */
+                /* Try to add extent back to the cache first, but only if we
+                 * cancel read lock, write locks cannot have other overlapping
+                 * locks. If adding is not possible (or canceling pw lock),
+                 * then remove extent from cache */
+                if (!cache_remove_extent_nolock(cache, extent) ||
+                    (lock->l_granted_mode == LCK_PW) ||
+                    cache_add_extent(cache, &lock->l_resource->lr_name, extent,
+                                     NULL)) {
+                        /* We need to remember this oap_page value now,
+                           once we release spinlocks, extent struct
+                           might be freed and we endup requesting
+                           page with address 0x5a5a5a5a in
+                           cache_extent_removal_event */
+                        ext_data = extent->oap_page;
+                        cache->lc_pin_extent_cb(extent->oap_page);
+                        spin_unlock(&extent->oap_lock);
+                        spin_unlock(&lock->l_extents_list_lock);
+                        cache_extent_removal_event(cache, ext_data,
+                                                   lock->
+                                                   l_flags &
+                                                   LDLM_FL_DISCARD_DATA);
+                        spin_lock(&lock->l_extents_list_lock);
+                } else {
+                        spin_unlock(&extent->oap_lock);
+                }
+        }
+        spin_unlock(&lock->l_extents_list_lock);
+
+        return 0;
+}
+
+/* Remoes @lock from cache after necessary checks. */
+int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
+{
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+        if (!lock)  // The lock was removed by somebody just now, nothing to do
+                return 0;
+
+        cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
+
+        spin_lock(&cache->lc_locks_list_lock);
+        list_del_init(&lock->l_cache_locks_list);
+        spin_unlock(&cache->lc_locks_list_lock);
+
+        LDLM_LOCK_PUT(lock);
+
+        return 0;
+}
+
+/* Supposed to iterate through all locks in the cache for given resource.
+   Not implemented atthe moment. */
+int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
+                        cache_iterate_locks_cb_t cb_fun, void *data)
+{
+        return -ENOTSUPP;
+}
+
+/* Create lustre cache and attach it to @obd */
+struct lustre_cache *cache_create(struct obd_device *obd)
+{
+        struct lustre_cache *cache;
+
+        OBD_ALLOC(cache, sizeof(*cache));
+        if (!cache)
+                GOTO(out, NULL);
+        spin_lock_init(&cache->lc_locks_list_lock);
+        CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
+        CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
+        cache->lc_obd = obd;
+
+      out:
+        return cache;
+}
+
+/* Destroy @cache and free its memory */
+int cache_destroy(struct lustre_cache *cache)
+{
+        if (cache) {
+                spin_lock(&cache->lc_locks_list_lock);
+                if (!list_empty(&cache->lc_locks_list)) {
+                        struct ldlm_lock *lock, *tmp;
+                        CERROR("still have locks in the list on cleanup:\n");
+
+                        list_for_each_entry_safe(lock, tmp,
+                                                 &cache->lc_locks_list,
+                                                 l_cache_locks_list) {
+                                list_del_init(&lock->l_cache_locks_list);
+                                /* XXX: Of course natural idea would be to print
+                                   offending locks here, but if we use
+                                   e.g. LDLM_ERROR, we will likely crash here,
+                                   as LDLM error tries to access e.g.
+                                   nonexisting namespace. Normally this kind of
+                                   case could only happen when somebody did not
+                                   release lock reference and we have other ways
+                                   to detect this. */
+                                /* Make sure there are no pages left under the
+                                   lock */
+                                LASSERT(list_empty(&lock->l_extents_list));
+                        }
+                }
+                spin_unlock(&cache->lc_locks_list_lock);
+                LASSERT(list_empty(&cache->lc_page_removal_callback_list));
+                OBD_FREE(cache, sizeof(*cache));
+        }
+
+        return 0;
+}
diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h
index 72ce3ec4a3..451a79b247 100644
--- a/lustre/osc/osc_internal.h
+++ b/lustre/osc/osc_internal.h
@@ -28,8 +28,11 @@ struct osc_async_page {
         struct client_obd       *oap_cli;
         struct lov_oinfo        *oap_loi;
 
-	struct obd_async_page_ops *oap_caller_ops;
+        struct obd_async_page_ops *oap_caller_ops;
         void                    *oap_caller_data;
+        struct list_head         oap_page_list;
+        struct ldlm_lock        *oap_ldlm_lock;
+        spinlock_t               oap_lock;
 };
 
 #define oap_page        oap_brw_page.pg
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index 42b877d7e2..47843db5d7 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -56,12 +56,14 @@
 #include <lustre_log.h>
 #include <lustre_debug.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 #include "osc_internal.h"
 
 static quota_interface_t *quota_interface = NULL;
 extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
 
 static quota_interface_t *quota_interface;
 extern quota_interface_t osc_quota_interface;
@@ -2394,9 +2396,13 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
                         obd_off offset, struct obd_async_page_ops *ops,
-                        void *data, void **res)
+                        void *data, void **res, int nocache,
+                        struct lustre_handle *lockh)
 {
         struct osc_async_page *oap;
+        struct ldlm_res_id oid = {{0}};
+        int rc = 0;
+        
         ENTRY;
 
         if (!page)
@@ -2416,9 +2422,24 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+        INIT_LIST_HEAD(&oap->oap_page_list);
 
         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
 
+        spin_lock_init(&oap->oap_lock);
+
+        /* If the page was marked as notcacheable - don't add to any locks */ 
+        if (!nocache) {
+                oid.name[0] = loi->loi_id;
+                /* This is the only place where we can call cache_add_extent
+                   without oap_lock, because this page is locked now, and
+                   the lock we are adding it to is referenced, so cannot lose
+                   any pages either. */
+                rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+                if (rc)
+                        RETURN(rc);
+        }
+
         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
         RETURN(0);
 }
@@ -2703,6 +2724,7 @@ static int osc_teardown_async_page(struct obd_export *exp,
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
         }
         loi_list_maint(cli, loi);
+        cache_remove_extent(cli->cl_cache, oap);
 
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out:
@@ -2710,6 +2732,49 @@ out:
         RETURN(rc);
 }
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag)
+{
+        struct lustre_handle lockh = { 0 };
+        int rc;
+        ENTRY;  
+                
+        if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+                LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+                LBUG(); 
+        }       
+
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_CANCELING: {
+
+                ldlm_lock2handle(lock, &lockh);
+                /* This lock wasn't granted, don't try to do anything */
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        RETURN(0);
+
+                cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+                                  &lockh);
+
+                if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+                        lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+                                                          lock, new, data,flag);
+                break;
+        }
+        default:
+                LBUG();
+        }
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                                     int flags)
 {
@@ -2751,8 +2816,8 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
         return 0;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
-                            int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+                            struct obd_info *oinfo, int intent, int rc)
 {
         ENTRY;
 
@@ -2778,6 +2843,9 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
         }
 
+        if (!rc)
+                cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
         /* Call the update callback. */
         rc = oinfo->oi_cb_up(oinfo, rc);
         RETURN(rc);
@@ -2804,7 +2872,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                    aa->oa_oi->oi_lockh, rc);
 
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+        rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
 
         /* Release the lock for async request. */
         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
@@ -2932,7 +3000,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(req, oinfo, intent, rc);
+        rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
@@ -3621,6 +3689,11 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+                cli->cl_cache = cache_create(obd);
+                if (!cli->cl_cache) {
+                        osc_cleanup(obd);
+                        rc = -ENOMEM;
+                }
         }
 
         RETURN(rc);
@@ -3683,12 +3756,50 @@ int osc_cleanup(struct obd_device *obd)
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
+        cache_destroy(obd->u.cli.cl_cache);
         rc = client_obd_cleanup(obd);
 
         ptlrpcd_decref();
         RETURN(rc);
 }
 
+static int osc_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+                                           pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+                                          obd_page_removal_cb_t func)
+{
+        return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb)
+{
+        LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+        return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb)
+{
+        if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+                CERROR("Unregistering cancel cb %p, while only %p was "
+                       "registered\n", cb,
+                       exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+                RETURN(-EINVAL);
+        }
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+        return 0;
+}
+
 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
@@ -3745,6 +3856,10 @@ struct obd_ops osc_obd_ops = {
         .o_llog_init            = osc_llog_init,
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
+        .o_register_page_removal_cb = osc_register_page_removal_cb,
+        .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
 };
 int __init osc_init(void)
 {
diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh
index c86e7862bb..1d4d4a915c 100644
--- a/lustre/tests/conf-sanity.sh
+++ b/lustre/tests/conf-sanity.sh
@@ -1240,9 +1240,21 @@ test_33() { # bug 12333
 }
 run_test 33 "Mount ost with a large index number"
 
-umount_client $MOUNT	
+umount_client $MOUNT
 cleanup_nocli
 
+test_23() {
+        start_ost
+        start_mds
+        # Simulate -EINTR during mount OBD_FAIL_LDLM_CLOSE_THREAD
+        sysctl -w lustre.fail_loc=0x80000313
+        mount_client $MOUNT
+        cleanup
+}
+run_test 23 "Simulate -EINTR during mount"
+
+equals_msg "Done"
+echo "$0: completed"
 test_33a() {
         setup
 
diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh
index c3cdf26c1c..35c29def2f 100644
--- a/lustre/tests/sanity.sh
+++ b/lustre/tests/sanity.sh
@@ -2589,7 +2589,9 @@ test_62() {
         cat $f && error "cat succeeded, expect -EIO"
         sysctl -w lustre.fail_loc=0
 }
-run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
+# This test is now irrelevant (as of bug 10718 inclusion), we no longer
+# match every page all of the time.
+#run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
 
 # bug 2319 - oig_wait() interrupted causes crash because of invalid waitq.
 test_63() {
@@ -3323,6 +3325,20 @@ test_79() { # bug 12743
 }
 run_test 79 "df report consistency check ======================="
 
+test_80() { # bug 10718
+        dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 seek=1M
+        sync; sleep 1; sync
+        BEFORE=`date +%s`
+        cancel_lru_locks OSC
+        AFTER=`date +%s`
+        DIFF=$((AFTER-BEFORE))
+        if [ $DIFF -gt 1 ] ; then
+                error "elapsed for 1M@1T = $DIFF"
+        fi
+        true
+}
+run_test 80 "Page eviction is equally fast at high offsets too  ===="
+
 # on the LLNL clusters, runas will still pick up root's $TMP settings,
 # which will not be writable for the runas user, and then you get a CVS
 # error message with a corrupt path string (CVS bug) and panic.
-- 
GitLab