diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 0b7e5a87ba83cf5a005b88279dc33af3c1f759fb..5edee768940d10533108c320147eb7066572aa58 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -133,6 +133,13 @@ Details    : Implementation of NFS-like root squash capability. Specifically,
              don't allow someone with root access on a client node to be able
              to manipulate files owned by root on a server node.
 
+Severity   : normal
+Bugzilla   : 10718
+Description: Slow trucate/writes to huge files at high offsets.
+Details    : Directly associate cached pages to lock that protect those pages,
+             this allows us to quickly find what pages to write and remove
+	     once lock callback is received.
+
 --------------------------------------------------------------------------------
 
 2007-10-26         Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am
index d87d606bf9111830ff3634df4d2751b34d4d8c9e..2608a4c9fad7840a69457b4cd423bc200d23a52a 100644
--- a/lustre/include/Makefile.am
+++ b/lustre/include/Makefile.am
@@ -13,5 +13,5 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h 	\
 	     lustre_lite.h lustre_log.h lustre_mds.h lustre_net.h \
              lustre_param.h lustre_quota.h lustre_ucache.h lvfs.h \
 	     obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
-	     obd_ost.h obd_support.h lustre_ver.h
+	     obd_ost.h obd_support.h lustre_cache.h lustre_ver.h
 
diff --git a/lustre/include/lustre_cache.h b/lustre/include/lustre_cache.h
new file mode 100644
index 0000000000000000000000000000000000000000..291d88293675b7ccefe54c20424aad5fbf8c4f29
--- /dev/null
+++ b/lustre/include/lustre_cache.h
@@ -0,0 +1,53 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef LUSTRE_CACHE_H
+#define LUSTRE_CACHE_H
+#include <obd.h>
+#include <lustre/lustre_idl.h>
+#include <lustre_dlm.h>
+
+struct lustre_cache;
+struct osc_async_page;
+struct page_removal_cb_element {
+        struct list_head        prce_list;
+        obd_page_removal_cb_t   prce_callback;
+};
+
+typedef int (*cache_iterate_extents_cb_t)(struct lustre_cache *,
+                                          struct lustre_handle *,
+                                          struct osc_async_page *,
+                                          void *);
+typedef int (*cache_iterate_locks_cb_t)(struct lustre_cache *,
+                                        struct ldlm_res_id *,
+                                        struct lustre_handle *, void *);
+
+struct lustre_cache {
+        struct list_head         lc_locks_list;
+        spinlock_t               lc_locks_list_lock;
+        struct list_head         lc_page_removal_callback_list;
+        struct obd_device       *lc_obd;
+        obd_pin_extent_cb        lc_pin_extent_cb;
+};
+
+int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh);
+int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
+                     struct osc_async_page *extent,
+                     struct lustre_handle *lockh);
+void cache_remove_extent(struct lustre_cache *, struct osc_async_page *);
+int cache_add_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb,
+                                obd_pin_extent_cb pin_cb);
+int cache_del_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb);
+int cache_iterate_extents(struct lustre_cache *cache, struct lustre_handle *lockh,
+                          cache_iterate_extents_cb_t cb_func, void *data);
+int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh);
+int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
+                        cache_iterate_locks_cb_t cb_fun, void *data);
+struct lustre_cache *cache_create(struct obd_device *obd);
+int cache_destroy(struct lustre_cache *cache);
+
+
+#endif /* LUSTRE_CACHE_H */
diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index e965106cb053268bb097d294edd437aced6405ed..7938c855dce16a125b7b6f8ba54984d6d81debe9 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -446,6 +446,10 @@ struct ldlm_lock {
         void                 *l_lvb_data;       /* an LVB received during */
         void                 *l_lvb_swabber;    /* an enqueue */
         void                 *l_ast_data;
+        spinlock_t            l_extents_list_lock;
+        struct list_head      l_extents_list;
+
+        struct list_head      l_cache_locks_list;
 
         /* Server-side-only members */
 
diff --git a/lustre/include/obd.h b/lustre/include/obd.h
index 9f2f444402f3fe51a532a905c81d9b9623722da2..8c708819f8b61557315da8f082e95264d39666c2 100644
--- a/lustre/include/obd.h
+++ b/lustre/include/obd.h
@@ -240,6 +240,11 @@ struct obd_device_target {
         struct lustre_quota_ctxt  obt_qctxt;
 };
 
+typedef void (*obd_pin_extent_cb)(void *data);
+typedef int (*obd_page_removal_cb_t)(void *data, int discard);
+typedef int (*obd_lock_cancel_cb)(struct ldlm_lock *,struct ldlm_lock_desc *,
+                                   void *, int);
+
 #define FILTER_GROUP_LLOG 1
 #define FILTER_GROUP_ECHO 2
 
@@ -324,6 +329,7 @@ struct filter_obd {
 
 struct mdc_rpc_lock;
 struct obd_import;
+struct lustre_cache;
 struct client_obd {
         struct semaphore         cl_sem;
         struct obd_uuid          cl_target_uuid;
@@ -401,8 +407,10 @@ struct client_obd {
 
         /* used by quotacheck */
         int                      cl_qchk_stat; /* quotacheck stat of the peer */
-
         atomic_t                 cl_resends; /* resend count */
+        /* Cache of triples */
+        struct lustre_cache     *cl_cache;
+        obd_lock_cancel_cb       cl_ext_lock_cancel_cb;
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
@@ -553,6 +561,9 @@ struct lov_obd {
         __u32                   lov_offset_idx; /* aliasing for start_idx  */
         int                     lov_start_count;/* reseed counter */
         int                     lov_connects;
+        obd_page_removal_cb_t   lov_page_removal_cb;
+        obd_pin_extent_cb       lov_page_pin_cb;
+        obd_lock_cancel_cb      lov_lock_cancel_cb;
 };
 
 struct niobuf_local {
@@ -903,7 +914,8 @@ struct obd_ops {
                                  struct lov_oinfo *loi,
                                  cfs_page_t *page, obd_off offset, 
                                  struct obd_async_page_ops *ops, void *data,
-                                 void **res);
+                                 void **res, int nocache,
+                                 struct lustre_handle *lockh);
         int (*o_queue_async_io)(struct obd_export *exp,
                                 struct lov_stripe_md *lsm,
                                 struct lov_oinfo *loi, void *cookie,
@@ -995,6 +1007,17 @@ struct obd_ops {
         int (*o_quotactl)(struct obd_export *, struct obd_quotactl *);
 
         int (*o_ping)(struct obd_export *exp);
+
+        int (*o_register_page_removal_cb)(struct obd_export *exp,
+                                          obd_page_removal_cb_t cb,
+                                          obd_pin_extent_cb pin_cb);
+        int (*o_unregister_page_removal_cb)(struct obd_export *exp,
+                                            obd_page_removal_cb_t cb);
+        int (*o_register_lock_cancel_cb)(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb);
+        int (*o_unregister_lock_cancel_cb)(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb);
+        
         /*
          * NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line
          * to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c.
diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h
index 06b21e33e838cc521a1b7f7bb7b91d9f4dadd30f..57edb95484fa7939ffb0903aab1eadfc8a35fecc 100644
--- a/lustre/include/obd_class.h
+++ b/lustre/include/obd_class.h
@@ -952,7 +952,8 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
                                        struct lov_oinfo *loi,
                                        cfs_page_t *page, obd_off offset,
                                        struct obd_async_page_ops *ops,
-                                       void *data, void **res)
+                                       void *data, void **res, int nocache,
+                                       struct lustre_handle *lockh)
 {
         int ret;
         ENTRY;
@@ -961,7 +962,8 @@ static inline  int obd_prep_async_page(struct obd_export *exp,
         EXP_COUNTER_INCREMENT(exp, prep_async_page);
 
         ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset,
-                                                 ops, data, res);
+                                                 ops, data, res, nocache,
+                                                 lockh);
         RETURN(ret);
 }
 
@@ -1384,6 +1386,59 @@ static inline int obd_register_observer(struct obd_device *obd,
         RETURN(0);
 }
 
+static inline int obd_register_page_removal_cb(struct obd_export *exp,
+                                               obd_page_removal_cb_t cb,
+                                               obd_pin_extent_cb pin_cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, register_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb);
+
+        rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb);
+        RETURN(rc);
+}
+
+static inline int obd_unregister_page_removal_cb(struct obd_export *exp,
+                                                 obd_page_removal_cb_t cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, unregister_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb);
+
+        rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb);
+        RETURN(rc);
+}
+
+static inline int obd_register_lock_cancel_cb(struct obd_export *exp,
+                                              obd_lock_cancel_cb cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, register_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb);
+
+        rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb);
+        RETURN(rc);
+}
+
+static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp,
+                                                 obd_lock_cancel_cb cb)
+{
+        int rc;
+        ENTRY;
+
+        OBD_CHECK_OP(exp->exp_obd, unregister_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb);
+
+        rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb);
+        RETURN(rc);
+}
+
 /* OBD Metadata Support */
 
 extern int obd_init_caches(void);
diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h
index 8dcc9c283fbaa004b48ec1f31aed9ad1c156efd8..d2b399e0cec3c11341399eb7bda4dbe4cf7fc680 100644
--- a/lustre/include/obd_ost.h
+++ b/lustre/include/obd_ost.h
@@ -34,4 +34,8 @@ struct osc_enqueue_args {
         struct ldlm_enqueue_info*oa_ei;
 };
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag);
+
 #endif
diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h
index 50738b15f634d1c8ca9e2b982e3ce927808b1acd..963e0fdf955b93a1322138925f333972f30f7479 100644
--- a/lustre/include/obd_support.h
+++ b/lustre/include/obd_support.h
@@ -201,6 +201,7 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_LDLM_CANCEL_RACE        0x310
 #define OBD_FAIL_LDLM_CANCEL_EVICT_RACE  0x311
 #define OBD_FAIL_LDLM_PAUSE_CANCEL       0x312
+#define OBD_FAIL_LDLM_CLOSE_THREAD       0x313
 
 #define OBD_FAIL_OSC                     0x400
 #define OBD_FAIL_OSC_BRW_READ_BULK       0x401
diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c
index e191e4ba29e724e31ee9348e72939c2f916e6edb..62603023c1206f8f459a97a2654447e8b7b77bc7 100644
--- a/lustre/ldlm/ldlm_lock.c
+++ b/lustre/ldlm/ldlm_lock.c
@@ -348,6 +348,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
         class_handle_hash(&lock->l_handle, lock_handle_addref);
 
+        CFS_INIT_LIST_HEAD(&lock->l_extents_list);
+        spin_lock_init(&lock->l_extents_list_lock);
+        CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
+
         RETURN(lock);
 }
 
diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c
index 76d6c3de35ac86df1fdf57978053525795d0319f..f164d29f6378ad4549126c4e51e97e5f55b07668 100644
--- a/lustre/liblustre/rw.c
+++ b/lustre/liblustre/rw.c
@@ -486,7 +486,9 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group,
                 rc = obd_prep_async_page(exp, lsm, NULL, page,
                                          (obd_off)page->index << CFS_PAGE_SHIFT,
                                          &llu_async_page_ops,
-                                         llap, &llap->llap_cookie);
+                                         llap, &llap->llap_cookie,
+                                         1 /* no cache in liblustre at all */,
+                                         NULL);
                 if (rc) {
                         LASSERT(rc < 0);
                         llap->llap_cookie = NULL;
@@ -557,7 +559,8 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
         if (!llap_cookie_size)
                 llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),
                                                        NULL, NULL, NULL, 0,
-                                                       NULL, NULL, NULL);
+                                                       NULL, NULL, NULL, 0,
+                                                       NULL);
 
         OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));
         if (!group)
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index c0d6f51192f44b87db976333b5b8c7477a52d65a..a5613aa760e1d6cd77f8acbd158b2c0ede5d0fe0 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -631,164 +631,95 @@ check:
         RETURN(stripe);
 }
 
-/* Flush the page cache for an extent as its canceled.  When we're on an LOV,
- * we get a lock cancellation for each stripe, so we have to map the obd's
- * region back onto the stripes in the file that it held.
+/* Get extra page reference to ensure it is not going away */
+void ll_pin_extent_cb(void *data)
+{
+        struct page *page = data;
+        
+        page_cache_get(page);
+
+        return;
+}
+/* Flush the page from page cache for an extent as its canceled.
+ * Page to remove is delivered as @data.
  *
- * No one can dirty the extent until we've finished our work and they can
+ * No one can dirty the extent until we've finished our work and they cannot
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
  * but other kernel actors could have pages locked.
  *
+ * If @discard is set, there is no need to write the page if it is dirty.
+ *
  * Called with the DLM lock held. */
-void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
-                              struct ldlm_lock *lock, __u32 stripe)
+int ll_page_removal_cb(void *data, int discard)
 {
-        ldlm_policy_data_t tmpex;
-        unsigned long start, end, count, skip, i, j;
-        struct page *page;
-        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
-        struct lustre_handle lockh;
-        struct address_space *mapping = inode->i_mapping;
+        int rc;
+        struct page *page = data;
+        struct address_space *mapping;
 
         ENTRY;
-        tmpex = lock->l_policy_data;
-        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
-               i_size_read(inode));
-
-        /* our locks are page granular thanks to osc_enqueue, we invalidate the
-         * whole page. */
-        if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 ||
-            ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0)
-                LDLM_ERROR(lock, "lock not aligned on CFS_PAGE_SIZE %lu", CFS_PAGE_SIZE);
-        LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0);
-        LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0);
-
-        count = ~0;
-        skip = 0;
-        start = tmpex.l_extent.start >> CFS_PAGE_SHIFT;
-        end = tmpex.l_extent.end >> CFS_PAGE_SHIFT;
-        if (lsm->lsm_stripe_count > 1) {
-                count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT;
-                skip = (lsm->lsm_stripe_count - 1) * count;
-                start += start/count * skip + stripe * count;
-                if (end != ~0)
-                        end += end/count * skip + stripe * count;
-        }
-        if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT)
-                end = ~0;
-
-        i = i_size_read(inode) ? (i_size_read(inode) - 1) >> CFS_PAGE_SHIFT : 0;
-        if (i < end)
-                end = i;
-
-        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
-               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
-               count, skip, end, discard ? " (DISCARDING)" : "");
-
-        /* walk through the vmas on the inode and tear down mmaped pages that
-         * intersect with the lock.  this stops immediately if there are no
-         * mmap()ed regions of the file.  This is not efficient at all and
-         * should be short lived. We'll associate mmap()ed pages with the lock
-         * and will be able to find them directly */
-        for (i = start; i <= end; i += (j + skip)) {
-                j = min(count - (i % count), end - i + 1);
-                LASSERT(j > 0);
-                LASSERT(mapping);
-                if (ll_teardown_mmaps(mapping,
-                                      (__u64)i << CFS_PAGE_SHIFT,
-                                      ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
-                        break;
-        }
 
-        /* this is the simplistic implementation of page eviction at
-         * cancelation.  It is careful to get races with other page
-         * lockers handled correctly.  fixes from bug 20 will make it
-         * more efficient by associating locks with pages and with
-         * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count; i <= end;
-             j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) {
-                if (j == count) {
-                        CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip);
-                        i += skip;
-                        j = 0;
-                        if (i > end)
-                                break;
-                }
-                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
-                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
-                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                         start, i, end);
+        /* We have page reference already from ll_pin_page */
+        lock_page(page);
 
-                if (!mapping_has_pages(mapping)) {
-                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
-                        break;
-                }
+        /* Already truncated by somebody */
+        if (!page->mapping)
+                GOTO(out, rc = 0);
 
-                cond_resched();
+        mapping = page->mapping;
 
-                page = find_get_page(mapping, i);
-                if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
-                               i, tmpex.l_extent.start);
-                lock_page(page);
+        ll_teardown_mmaps(mapping,
+                          (__u64)page->index << PAGE_CACHE_SHIFT,
+                          ((__u64)page->index<<PAGE_CACHE_SHIFT)|
+                                                              ~PAGE_CACHE_MASK);        
+        LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n");
 
-                /* page->mapping to check with racing against teardown */
-                if (!discard && clear_page_dirty_for_io(page)) {
-                        rc = ll_call_writepage(inode, page);
-                        /* either waiting for io to complete or reacquiring
-                         * the lock that the failed writepage released */
-                        lock_page(page);
-                        wait_on_page_writeback(page);
-                        if (rc != 0) {
-                                CERROR("writepage inode %lu(%p) of page %p "
-                                       "failed: %d\n", inode->i_ino, inode,
-                                       page, rc);
+        if (!discard && clear_page_dirty_for_io(page)) {
+                LASSERT(page->mapping);
+                rc = ll_call_writepage(page->mapping->host, page);
+                /* either waiting for io to complete or reacquiring
+                 * the lock that the failed writepage released */
+                lock_page(page);
+                wait_on_page_writeback(page);
+                if (rc != 0) {
+                        CERROR("writepage inode %lu(%p) of page %p "
+                               "failed: %d\n", mapping->host->i_ino,
+                               mapping->host, page, rc);
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-                                if (rc == -ENOSPC)
-                                        set_bit(AS_ENOSPC, &mapping->flags);
-                                else
-                                        set_bit(AS_EIO, &mapping->flags);
+                        if (rc == -ENOSPC)
+                                set_bit(AS_ENOSPC, &mapping->flags);
+                        else
+                                set_bit(AS_EIO, &mapping->flags);
 #else
-                                mapping->gfp_mask |= AS_EIO_MASK;
+                        mapping->gfp_mask |= AS_EIO_MASK;
 #endif
-                        }
-                }
-
-                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
-                /* check to see if another DLM lock covers this page  b=2765 */
-                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
-                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
-                                      LDLM_FL_TEST_LOCK,
-                                      &lock->l_resource->lr_name, LDLM_EXTENT,
-                                      &tmpex, LCK_PR | LCK_PW, &lockh);
-                if (rc2 == 0 && page->mapping != NULL) {
-                        struct ll_async_page *llap = llap_cast_private(page);
-                        // checking again to account for writeback's lock_page()
-                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
-                        if (llap)
-                                ll_ra_accounting(llap, mapping);
-                        ll_truncate_complete_page(page);
                 }
-                unlock_page(page);
-                page_cache_release(page);
-        }
-        LASSERTF(tmpex.l_extent.start <=
-                 (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL :
-                  lock->l_policy_data.l_extent.end + 1),
-                 "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n",
-                 tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
-                 start, i, end);
+        }
+        if (page->mapping != NULL) {
+                struct ll_async_page *llap = llap_cast_private(page);
+                // checking again to account for writeback's lock_page()
+                LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
+                if (llap)
+                        ll_ra_accounting(llap, page->mapping);
+                ll_truncate_complete_page(page);
+        }
         EXIT;
+out:
+        LASSERT(!PageWriteback(page));
+        unlock_page(page);
+        page_cache_release(page);
+
+        return 0;
 }
 
-static int ll_extent_lock_callback(struct ldlm_lock *lock,
-                                   struct ldlm_lock_desc *new, void *data,
-                                   int flag)
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag)
 {
-        struct lustre_handle lockh = { 0 };
-        int rc;
+        struct inode *inode;
+        struct ll_inode_info *lli;
+        struct lov_stripe_md *lsm;
+        int stripe;
+        __u64 kms;
+
         ENTRY;
 
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
@@ -796,61 +727,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                 LBUG();
         }
 
-        switch (flag) {
-        case LDLM_CB_BLOCKING:
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
-                break;
-        case LDLM_CB_CANCELING: {
-                struct inode *inode;
-                struct ll_inode_info *lli;
-                struct lov_stripe_md *lsm;
-                int stripe;
-                __u64 kms;
-
-                /* This lock wasn't granted, don't try to evict pages */
-                if (lock->l_req_mode != lock->l_granted_mode)
-                        RETURN(0);
-
-                inode = ll_inode_from_lock(lock);
-                if (inode == NULL)
-                        RETURN(0);
-                lli = ll_i2info(inode);
-                if (lli == NULL)
-                        goto iput;
-                if (lli->lli_smd == NULL)
-                        goto iput;
-                lsm = lli->lli_smd;
-
-                stripe = ll_lock_to_stripe_offset(inode, lock);
-                if (stripe < 0)
-                        goto iput;
-
-                ll_pgcache_remove_extent(inode, lsm, lock, stripe);
+        inode = ll_inode_from_lock(lock);
+        if (inode == NULL)
+                RETURN(0);
+        lli = ll_i2info(inode);
+        if (lli == NULL)
+                GOTO(iput, 0);
+        if (lli->lli_smd == NULL)
+                GOTO(iput, 0);
+        lsm = lli->lli_smd;
 
-                lov_stripe_lock(lsm);
-                lock_res_and_lock(lock);
-                kms = ldlm_extent_shift_kms(lock,
-                                            lsm->lsm_oinfo[stripe]->loi_kms);
+        stripe = ll_lock_to_stripe_offset(inode, lock);
+        if (stripe < 0)
+                GOTO(iput, 0);
 
-                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
-                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
-                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);
-                lsm->lsm_oinfo[stripe]->loi_kms = kms;
-                unlock_res_and_lock(lock);
-                lov_stripe_unlock(lsm);
-                ll_try_done_writing(inode);
-        iput:
-                iput(inode);
-                break;
-        }
-        default:
-                LBUG();
-        }
+        lov_stripe_lock(lsm);
+        lock_res_and_lock(lock);
+        kms = ldlm_extent_shift_kms(lock,
+                                    lsm->lsm_oinfo[stripe]->loi_kms);
+
+        if (lsm->lsm_oinfo[stripe]->loi_kms != kms)
+                LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
+                           lsm->lsm_oinfo[stripe]->loi_kms, kms);
+        lsm->lsm_oinfo[stripe]->loi_kms = kms;
+        unlock_res_and_lock(lock);
+        lov_stripe_unlock(lsm);
+        ll_try_done_writing(inode);
+        EXIT;
+iput:
+        iput(inode);
 
-        RETURN(0);
+        return 0;
 }
 
 #if 0
@@ -973,7 +880,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
         
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = NULL;
@@ -1034,7 +941,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
@@ -1097,7 +1004,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_cb_bl = ll_extent_lock_callback;
+        einfo.ei_cb_bl = osc_extent_blocking_cb;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
         einfo.ei_cbdata = inode;
diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c
index 10a98fab8c00c2de71da4f8445390e350fcb76d0..d36f2d6f8e2377f7c42d88cf50a57fe39f3c2f3d 100644
--- a/lustre/llite/llite_close.c
+++ b/lustre/llite/llite_close.c
@@ -236,6 +236,7 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret)
         struct ll_close_queue *lcq;
         pid_t pid;
 
+        OBD_FAIL_RETURN(OBD_FAIL_LDLM_CLOSE_THREAD, -EINTR);
         OBD_ALLOC(lcq, sizeof(*lcq));
         if (lcq == NULL)
                 return -ENOMEM;
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index 9f9c436a4eaa22c7a7fbb9b299eb9578db4f76dd..88e93333d183748ad3ac0cbd2308ca40461ae7d6 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -406,6 +406,7 @@ struct ll_async_page {
                          llap_origin:3,
                          llap_ra_used:1,
                          llap_ignore_quota:1,
+                         llap_nocache:1,
                          llap_lockless_io_page:1;
         void            *llap_cookie;
         struct page     *llap_page;
@@ -508,6 +509,10 @@ int ll_prepare_mdc_op_data(struct mdc_op_data *,
 struct lookup_intent *ll_convert_intent(struct open_intent *oit,
                                         int lookup_flags);
 #endif
+void ll_pin_extent_cb(void *data);
+int ll_page_removal_cb(void *data, int discard);
+int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
+                             void *data, int flag);
 int lookup_it_finish(struct ptlrpc_request *request, int offset,
                      struct lookup_intent *it, void *data);
 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry);
diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c
index fdaf0113321d0b80e237fb2c9fe28994bc6de15b..66dc80f3f5e870bc7df14458742dd721482debe0 100644
--- a/lustre/llite/llite_lib.c
+++ b/lustre/llite/llite_lib.c
@@ -34,6 +34,7 @@
 #include <lprocfs_status.h>
 #include <lustre_disk.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 #include "llite_internal.h"
 
 cfs_mem_cache_t *ll_file_data_slab;
@@ -287,7 +288,6 @@ static int client_common_fill_super(struct super_block *sb,
         obd->obd_upcall.onu_upcall = ll_ocd_update;
         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
 
-
         err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, data, NULL);
         if (err == -EBUSY) {
                 LCONSOLE_ERROR_MSG(0x150, "An OST (osc %s) is performing "
@@ -304,15 +304,33 @@ static int client_common_fill_super(struct super_block *sb,
         sbi->ll_lco.lco_flags = data->ocd_connect_flags;
         spin_unlock(&sbi->ll_lco.lco_lock);
 
-        mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+        err = obd_register_page_removal_cb(sbi->ll_osc_exp,
+                                           ll_page_removal_cb, 
+                                           ll_pin_extent_cb);
+        if (err) {
+                CERROR("cannot register page removal callback: rc = %d\n",err);
+                GOTO(out_osc, err);
+        }
+        err = obd_register_lock_cancel_cb(sbi->ll_osc_exp,
+                                          ll_extent_lock_cancel_cb);
+        if (err) {
+                CERROR("cannot register lock cancel callback: rc = %d\n", err);
+                GOTO(out_page_rm_cb, err);
+        }
+
+        err = mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+        if (err) {
+                CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
+                GOTO(out_lock_cn_cb, err);
+        }
 
         err = obd_prep_async_page(sbi->ll_osc_exp, NULL, NULL, NULL,
-                                  0, NULL, NULL, NULL);
+                                  0, NULL, NULL, NULL, 0, NULL);
         if (err < 0) {
                 LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
                                    "filesystem. There must be at least one "
                                    "active OST for a client to start.\n");
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         if (!ll_async_page_slab) {
@@ -322,13 +340,13 @@ static int client_common_fill_super(struct super_block *sb,
                                                           ll_async_page_slab_size,
                                                           0, 0);
                 if (!ll_async_page_slab)
-                        GOTO(out_osc, -ENOMEM);
+                        GOTO(out_lock_cn_cb, -ENOMEM);
         }
 
         err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
         CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id);
         sbi->ll_rootino = rootfid.id;
@@ -346,14 +364,14 @@ static int client_common_fill_super(struct super_block *sb,
                           0, &request);
         if (err) {
                 CERROR("mdc_getattr failed for root: rc = %d\n", err);
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         err = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp, &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
                 ptlrpc_req_finished (request);
-                GOTO(out_osc, err);
+                GOTO(out_lock_cn_cb, err);
         }
 
         LASSERT(sbi->ll_rootino != 0);
@@ -396,6 +414,12 @@ static int client_common_fill_super(struct super_block *sb,
 out_root:
         if (root)
                 iput(root);
+out_lock_cn_cb:
+        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,
+                                      ll_extent_lock_cancel_cb);
+out_page_rm_cb:
+        obd_unregister_page_removal_cb(sbi->ll_osc_exp,
+                                       ll_page_removal_cb);
 out_osc:
         obd_disconnect(sbi->ll_osc_exp);
         sbi->ll_osc_exp = NULL;
@@ -595,6 +619,11 @@ void client_common_put_super(struct super_block *sb)
         prune_deathrow(sbi, 0);
 
         list_del(&sbi->ll_conn_chain);
+
+        obd_unregister_page_removal_cb(sbi->ll_osc_exp,
+                                       ll_page_removal_cb);
+        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,ll_extent_lock_cancel_cb);
+
         obd_disconnect(sbi->ll_osc_exp);
         sbi->ll_osc_exp = NULL;
 
diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c
index 22eafb86c24bb4d317d18611ed21119295323aa0..0511a321c194f6e56ec1d9ca7b05c438ea6c054b 100644
--- a/lustre/llite/rw.c
+++ b/lustre/llite/rw.c
@@ -566,7 +566,9 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
         return count;
 }
 
-static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
+static struct ll_async_page *llap_from_page_with_lockh(struct page *page,
+                                                       unsigned origin,
+                                                       struct lustre_handle *lockh)
 {
         struct ll_async_page *llap;
         struct obd_export *exp;
@@ -619,9 +621,14 @@ static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
         llap->llap_magic = LLAP_MAGIC;
         llap->llap_cookie = (void *)llap + size_round(sizeof(*llap));
 
+        /* XXX: for bug 11270 - check for lockless origin here! */
+        if (origin == LLAP_ORIGIN_LOCKLESS_IO)
+                llap->llap_nocache = 1;
+
         rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page,
                                  (obd_off)page->index << CFS_PAGE_SHIFT,
-                                 &ll_async_page_ops, llap, &llap->llap_cookie);
+                                 &ll_async_page_ops, llap, &llap->llap_cookie,
+                                 llap->llap_nocache, lockh);
         if (rc) {
                 OBD_SLAB_FREE(llap, ll_async_page_slab,
                               ll_async_page_slab_size);
@@ -669,6 +676,12 @@ static struct ll_async_page *llap_from_page(struct page *page, unsigned origin)
         RETURN(llap);
 }
 
+static inline struct ll_async_page *llap_from_page(struct page *page,
+                                                   unsigned origin)
+{
+        return llap_from_page_with_lockh(page, origin, NULL);
+}
+
 static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
                                struct ll_async_page *llap,
                                unsigned to, obd_flag async_flags)
@@ -764,12 +777,14 @@ out:
 int ll_commit_write(struct file *file, struct page *page, unsigned from,
                     unsigned to)
 {
+        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
         struct inode *inode = page->mapping->host;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
         struct obd_export *exp;
         struct ll_async_page *llap;
         loff_t size;
+        struct lustre_handle *lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -780,7 +795,10 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
                inode, page, from, to, page->index);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_COMMIT_WRITE);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+                lockh = &fd->fd_cwlockh;
+
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh);
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
@@ -959,6 +977,7 @@ static void __ll_put_llap(struct page *page)
  * here. */
 void ll_removepage(struct page *page)
 {
+        struct ll_async_page *llap = llap_cast_private(page);
         ENTRY;
 
         LASSERT(!in_interrupt());
@@ -970,37 +989,15 @@ void ll_removepage(struct page *page)
                 return;
         }
 
-        LASSERT(!llap_cast_private(page)->llap_lockless_io_page);
+        LASSERT(!llap->llap_lockless_io_page);
+        LASSERT(!llap->llap_nocache);
+
         LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
         __ll_put_llap(page);
 
         EXIT;
 }
 
-static int ll_page_matches(struct page *page, int fd_flags)
-{
-        struct lustre_handle match_lockh = {0};
-        struct inode *inode = page->mapping->host;
-        ldlm_policy_data_t page_extent;
-        int flags, matches;
-        ENTRY;
-
-        if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED))
-                RETURN(1);
-
-        page_extent.l_extent.start = (__u64)page->index << CFS_PAGE_SHIFT;
-        page_extent.l_extent.end =
-                page_extent.l_extent.start + CFS_PAGE_SIZE - 1;
-        flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED;
-        if (!(fd_flags & LL_FILE_READAHEAD))
-                flags |= LDLM_FL_CBPENDING;
-        matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
-                            ll_i2info(inode)->lli_smd, LDLM_EXTENT,
-                            &page_extent, LCK_PR | LCK_PW, &flags, inode,
-                            &match_lockh);
-        RETURN(matches);
-}
-
 static int ll_issue_page_read(struct obd_export *exp,
                               struct ll_async_page *llap,
                               struct obd_io_group *oig, int defer)
@@ -1209,22 +1206,22 @@ static int ll_readahead(struct ll_readahead_state *ras,
                 /* we do this first so that we can see the page in the /proc
                  * accounting */
                 llap = llap_from_page(page, LLAP_ORIGIN_READAHEAD);
-                if (IS_ERR(llap) || llap->llap_defer_uptodate)
+                if (IS_ERR(llap) || llap->llap_defer_uptodate) {
+                        if (PTR_ERR(llap) == -ENOLCK) {
+                                ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
+                                match_failed = 1;
+                                CDEBUG(D_READA | D_PAGE,
+                                       "Adding page to cache failed index "
+                                       "%lu\n", i);
+                        }
                         goto next_page;
+                }
 
                 /* skip completed pages */
                 if (Page_Uptodate(page))
                         goto next_page;
 
                 /* bail when we hit the end of the lock. */
-                if ((rc = ll_page_matches(page, flags|LL_FILE_READAHEAD)) <= 0){
-                        LL_CDEBUG_PAGE(D_READA | D_PAGE, page,
-                                       "lock match failed: rc %d\n", rc);
-                        ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH);
-                        match_failed = 1;
-                        goto next_page;
-                }
-
                 rc = ll_issue_page_read(exp, llap, oig, 1);
                 if (rc == 0) {
                         reserved--;
@@ -1403,6 +1400,7 @@ int ll_writepage(struct page *page)
         if (IS_ERR(llap))
                 GOTO(out, rc = PTR_ERR(llap));
 
+        LASSERT(!llap->llap_nocache);
         LASSERT(!PageWriteback(page));
         set_page_writeback(page);
 
@@ -1450,6 +1448,7 @@ int ll_readpage(struct file *filp, struct page *page)
         struct obd_export *exp;
         struct ll_async_page *llap;
         struct obd_io_group *oig = NULL;
+        struct lustre_handle *lockh = NULL;
         int rc;
         ENTRY;
 
@@ -1481,9 +1480,19 @@ int ll_readpage(struct file *filp, struct page *page)
         if (exp == NULL)
                 GOTO(out, rc = -EINVAL);
 
-        llap = llap_from_page(page, LLAP_ORIGIN_READPAGE);
-        if (IS_ERR(llap))
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED)
+                lockh = &fd->fd_cwlockh;
+
+        llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh);
+        if (IS_ERR(llap)) {
+                if (PTR_ERR(llap) == -ENOLCK) {
+                        CWARN("ino %lu page %lu (%llu) not covered by "
+                              "a lock (mmap?).  check debug logs.\n",
+                              inode->i_ino, page->index,
+                              (long long)page->index << PAGE_CACHE_SHIFT);
+                }
                 GOTO(out, rc = PTR_ERR(llap));
+        }
 
         if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages)
                 ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index,
@@ -1504,22 +1513,6 @@ int ll_readpage(struct file *filp, struct page *page)
                 GOTO(out_oig, rc = 0);
         }
 
-        if (likely((fd->fd_flags & LL_FILE_IGNORE_LOCK) == 0)) {
-                rc = ll_page_matches(page, fd->fd_flags);
-                if (rc < 0) {
-                        LL_CDEBUG_PAGE(D_ERROR, page,
-                                       "lock match failed: rc %d\n", rc);
-                        GOTO(out, rc);
-                }
-
-                if (rc == 0) {
-                        CWARN("ino %lu page %lu (%llu) not covered by "
-                              "a lock (mmap?).  check debug logs.\n",
-                              inode->i_ino, page->index,
-                              (long long)page->index << CFS_PAGE_SHIFT);
-                }
-        }
-
         rc = ll_issue_page_read(exp, llap, oig, 0);
         if (rc)
                 GOTO(out, rc);
diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c
index 99e0ff70e1952795992e8e2859183f0cab77612e..0f614284e2d4583743b49fca22ac2e5153bd7069 100644
--- a/lustre/lov/lov_obd.c
+++ b/lustre/lov/lov_obd.c
@@ -48,6 +48,7 @@
 #include <obd_ost.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 
 #include "lov_internal.h"
 
@@ -87,6 +88,94 @@ void lov_putref(struct obd_device *obd)
         mutex_up(&lov->lov_lock);
 }
 
+static int lov_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+                return -EBUSY;
+
+        if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
+                return -EBUSY;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+                                                   func, pin_cb);
+        }
+
+        lov->lov_page_removal_cb = func;
+        lov->lov_page_pin_cb = pin_cb;
+
+        return rc;
+}
+
+static int lov_unregister_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
+                return -EINVAL;
+
+        lov->lov_page_removal_cb = NULL;
+        lov->lov_page_pin_cb = NULL;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
+                                                     func);
+        }
+
+        return rc;
+}
+
+static int lov_register_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+                return -EBUSY;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+                                                  func);
+        }
+
+        lov->lov_lock_cancel_cb = func;
+
+        return rc;
+}
+
+static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb func)
+{
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        int i, rc = 0;
+
+        if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
+                return -EINVAL;
+
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
+                        continue;
+                rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
+                                                    func);
+        }
+        lov->lov_lock_cancel_cb = NULL;
+        return rc;
+}
+
 #define MAX_STRING_SIZE 128
 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, 
                            struct obd_connect_data *data)
@@ -160,10 +249,33 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 RETURN(-ENODEV);
         }
 
+        rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                          lov->lov_page_removal_cb,
+                                          lov->lov_page_pin_cb);
+        if (rc) {
+                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+                lov->lov_tgts[index]->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
+        rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                         lov->lov_lock_cancel_cb);
+        if (rc) {
+                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                               lov->lov_page_removal_cb);
+                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
+                lov->lov_tgts[index]->ltd_exp = NULL;
+                RETURN(rc);
+        }
+
         rc = obd_register_observer(tgt_obd, obd);
         if (rc) {
                 CERROR("Target %s register_observer error %d\n",
                        obd_uuid2str(&tgt_uuid), rc);
+                obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                              lov->lov_lock_cancel_cb);
+                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                               lov->lov_page_removal_cb);
                 obd_disconnect(lov->lov_tgts[index]->ltd_exp);
                 lov->lov_tgts[index]->ltd_exp = NULL;
                 RETURN(rc);
@@ -265,6 +377,10 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", 
                obd->obd_name, osc_obd->obd_name);
 
+        obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
+                                      lov->lov_lock_cancel_cb);
+        obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
+                                       lov->lov_page_removal_cb);
         if (lov->lov_tgts[index]->ltd_active) {
                 lov->lov_tgts[index]->ltd_active = 0;
                 lov->desc.ld_active_tgt_count--;
@@ -1658,10 +1774,12 @@ static struct obd_async_page_ops lov_async_page_ops = {
 int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                            struct lov_oinfo *loi, cfs_page_t *page,
                            obd_off offset, struct obd_async_page_ops *ops,
-                           void *data, void **res)
+                           void *data, void **res, int nocache,
+                           struct lustre_handle *lockh)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
         struct lov_async_page *lap;
+        struct lov_lock_handles *lov_lockh = NULL;
         int rc = 0;
         ENTRY;
 
@@ -1678,7 +1796,8 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                 }
                 rc = size_round(sizeof(*lap)) +
                         obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL,
-                                            NULL, NULL, 0, NULL, NULL, NULL);
+                                            NULL, NULL, 0, NULL, NULL, NULL, 0,
+                                            NULL);
                 RETURN(rc);
         }
         ASSERT_LSM_MAGIC(lsm);
@@ -1699,10 +1818,19 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap));
 
+        if (lockh) {
+                lov_lockh = lov_handle2llh(lockh);
+                if (lov_lockh) {
+                        lockh = lov_lockh->llh_handles + lap->lap_stripe;
+                }
+        }
+
         rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
                                  lsm, loi, page, lap->lap_sub_offset,
                                  &lov_async_page_ops, lap,
-                                 &lap->lap_sub_cookie);
+                                 &lap->lap_sub_cookie, nocache, lockh);
+        if (lov_lockh)
+                lov_llh_put(lov_lockh);
         if (rc)
                 RETURN(rc);
         CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page,
@@ -2664,6 +2792,10 @@ struct obd_ops lov_obd_ops = {
         .o_llog_init           = lov_llog_init,
         .o_llog_finish         = lov_llog_finish,
         .o_notify              = lov_notify,
+        .o_register_page_removal_cb = lov_register_page_removal_cb,
+        .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
 };
 
 static quota_interface_t *quota_interface;
diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c
index 7cb6e99586904bf32388f6c5ae5ebd481c921907..ff469548367824c7d5a71d526329f93e269cac8c 100644
--- a/lustre/obdclass/lprocfs_status.c
+++ b/lustre/obdclass/lprocfs_status.c
@@ -1120,6 +1120,10 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotacheck);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotactl);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, ping);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_page_removal_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats,stats,unregister_page_removal_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_lock_cancel_cb);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats,unregister_lock_cancel_cb);
 }
 
 int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c
index 51099bb2ec3ab9bed41498eb74f6473a7aa7a388..d7f54a9100612abb931f1f6555c203b24416a3f8 100644
--- a/lustre/obdecho/echo_client.c
+++ b/lustre/obdecho/echo_client.c
@@ -858,7 +858,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw,
 
                 rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
                                          eap->eap_off, &ec_async_page_ops,
-                                         eap, &eap->eap_cookie);
+                                         eap, &eap->eap_cookie, 1, NULL);
                 if (rc) {
                         spin_lock(&eas.eas_lock);
                         eas.eas_rc = rc;
diff --git a/lustre/osc/Makefile.in b/lustre/osc/Makefile.in
index ce9107fc6409b56bc58b2cc40f52eb7a1b60ab60..2eb2eea34dac9a74e521c1d3903e3c67662d4763 100644
--- a/lustre/osc/Makefile.in
+++ b/lustre/osc/Makefile.in
@@ -1,4 +1,4 @@
 MODULES := osc
-osc-objs := osc_request.o lproc_osc.o osc_create.o
+osc-objs := osc_request.o lproc_osc.o osc_create.o cache.o
 
 @INCLUDE_RULES@
diff --git a/lustre/osc/autoMakefile.am b/lustre/osc/autoMakefile.am
index 2b00785c30c5fe4e29f3fdf258bff06dfc6414e3..985e4739d1e302c383731c3a751353492f1c3ddd 100644
--- a/lustre/osc/autoMakefile.am
+++ b/lustre/osc/autoMakefile.am
@@ -5,7 +5,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libosc.a
-libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h
+libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h cache.c
 libosc_a_CPPFLAGS = $(LLCPPFLAGS)
 libosc_a_CFLAGS = $(LLCFLAGS)
 endif
diff --git a/lustre/osc/cache.c b/lustre/osc/cache.c
new file mode 100644
index 0000000000000000000000000000000000000000..c144d96e887125f896c48e81e4a7d39ce3b5877a
--- /dev/null
+++ b/lustre/osc/cache.c
@@ -0,0 +1,398 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author Oleg Drokin <green@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ *
+ * Cache of triples - object, lock, extent
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_OSC
+
+#ifdef __KERNEL__
+# include <linux/version.h>
+# include <linux/module.h>
+# include <linux/list.h>
+#else                           /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <lustre_dlm.h>
+#include <lustre_cache.h>
+#include <obd.h>
+#include <lustre_debug.h>
+
+#include "osc_internal.h"
+
+/* Adding @lock to the @cache */
+int cache_add_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
+{
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+        if (!lock)      // Lock disappeared under us.
+                return 0;
+
+        spin_lock(&cache->lc_locks_list_lock);
+        list_add_tail(&lock->l_cache_locks_list, &cache->lc_locks_list);
+        spin_unlock(&cache->lc_locks_list_lock);
+
+        LDLM_LOCK_PUT(lock);
+
+        return 0;
+}
+
+/* Tries to add @extent to lock represented by @lockh if non-NULL, otherwise
+   just tries to match some suitable lock by resource and data contained in
+   @extent */
+/* Should be called with oap->lock held (except on initial addition, see
+   comment in osc_request.c*/
+int cache_add_extent(struct lustre_cache *cache, struct ldlm_res_id *res,
+                     struct osc_async_page *extent, struct lustre_handle *lockh)
+{
+        struct lustre_handle tmplockh;
+        ldlm_policy_data_t tmpex;
+        struct ldlm_lock *lock = NULL;
+        ENTRY;
+
+        /* Don't add anything second time */
+        if (!list_empty(&extent->oap_page_list)) {
+                LBUG();
+                RETURN(0);
+        }
+
+        if (lockh && lustre_handle_is_used(lockh)) {
+                lock = ldlm_handle2lock(lockh);
+                if (!lock)
+                        RETURN(-ENOLCK);
+
+                LASSERTF(lock->l_policy_data.l_extent.start <=
+                         extent->oap_obj_off &&
+                         extent->oap_obj_off + CFS_PAGE_SIZE - 1 <=
+                         lock->l_policy_data.l_extent.end,
+                         "Got wrong lock [" LPU64 "," LPU64 "] for page with "
+                         "offset " LPU64 "\n",
+                         lock->l_policy_data.l_extent.start,
+                         lock->l_policy_data.l_extent.end, extent->oap_obj_off);
+        } else {
+                int mode;
+                /* Real extent width calculation here once we have real
+                 * extents
+                 */
+                tmpex.l_extent.start = extent->oap_obj_off;
+                tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
+
+                /* XXX find lock from extent or something like that */
+                /* The lock mode does not matter. If this is dirty page - then
+                 * there could be only one PW lock. If the page is clean,
+                 * any PR lock is good
+                 */
+                mode = ldlm_lock_match(cache->lc_obd->obd_namespace,
+                                       LDLM_FL_BLOCK_GRANTED |
+                                       LDLM_FL_CBPENDING, res, LDLM_EXTENT,
+                                       &tmpex, LCK_PW | LCK_PR, &tmplockh);
+
+                if (mode <= 0) {
+                        CDEBUG(D_CACHE, "No lock to attach " LPU64 "->" LPU64
+                               " extent to!\n", tmpex.l_extent.start,
+                               tmpex.l_extent.end);
+                        RETURN((mode < 0) ? mode : -ENOLCK);
+                }
+
+                lock = ldlm_handle2lock(&tmplockh);
+                if (!lock) {    // Race - lock disappeared under us (eviction?)
+                        CDEBUG(D_CACHE, "Newly matched lock just disappeared "
+                               "under us\n");
+                        RETURN(-ENOLCK);
+                }
+                ldlm_lock_decref(&tmplockh, mode);
+        }
+
+        spin_lock(&lock->l_extents_list_lock);
+        list_add_tail(&extent->oap_page_list, &lock->l_extents_list);
+        spin_unlock(&lock->l_extents_list_lock);
+        extent->oap_ldlm_lock = lock;
+        LDLM_LOCK_PUT(lock);
+
+        RETURN(0);
+}
+
+static int cache_extent_removal_event(struct lustre_cache *cache,
+                                      void *data, int discard)
+{
+        struct page *page = data;
+        struct page_removal_cb_element *element;
+
+        list_for_each_entry(element, &cache->lc_page_removal_callback_list,
+                            prce_list) {
+                element->prce_callback(page, discard);
+        }
+        return 0;
+}
+
+/* Registers set of pin/remove callbacks for extents. Current limitation is
+   there could be only one pin_cb per cache.
+   @pin_cb is called when we have the page locked to pin it in memory so that
+   it does not disappear after we release page lock (which we need to do
+   to avoid deadlocks).
+   @func_cb is removal callback that is called after page and all spinlocks are
+   released, and is supposed to clean the page and remove it from all
+   (vfs) caches it might be in */
+int cache_add_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb,
+                                obd_pin_extent_cb pin_cb)
+{
+        struct page_removal_cb_element *element;
+
+        if (!func_cb)
+                return 0;
+        OBD_ALLOC(element, sizeof(*element));
+        if (!element)
+                return -ENOMEM;
+        element->prce_callback = func_cb;
+        list_add_tail(&element->prce_list,
+                      &cache->lc_page_removal_callback_list);
+
+        cache->lc_pin_extent_cb = pin_cb;
+        return 0;
+}
+EXPORT_SYMBOL(cache_add_extent_removal_cb);
+
+/* Unregister exntent removal callback registered earlier. If the list of
+   registered removal callbacks becomes empty, we also clear pin callback
+   since it could only be one */
+int cache_del_extent_removal_cb(struct lustre_cache *cache,
+                                obd_page_removal_cb_t func_cb)
+{
+        int found = 0;
+        struct page_removal_cb_element *element, *t;
+
+        list_for_each_entry_safe(element, t,
+                                 &cache->lc_page_removal_callback_list,
+                                 prce_list) {
+                if (element->prce_callback == func_cb) {
+                        list_del(&element->prce_list);
+                        OBD_FREE(element, sizeof(*element));
+                        found = 1;
+                        /* We continue iterating the list in case this function
+                           was registered more than once */
+                }
+        }
+
+        if (list_empty(&cache->lc_page_removal_callback_list))
+                cache->lc_pin_extent_cb = NULL;
+
+        return !found;
+}
+EXPORT_SYMBOL(cache_del_extent_removal_cb);
+
+static int cache_remove_extent_nolock(struct lustre_cache *cache,
+                                      struct osc_async_page *extent)
+{
+        int have_lock = !!extent->oap_ldlm_lock;
+        /* We used to check oap_ldlm_lock for non NULL here, but it might be
+           NULL, in fact, due to parallel page eviction clearing it and waiting
+           on a lock's page list lock */
+        extent->oap_ldlm_lock = NULL;
+
+        if (!list_empty(&extent->oap_page_list))
+                list_del_init(&extent->oap_page_list);
+
+        return have_lock;
+}
+
+/* Request the @extent to be removed from cache and locks it belongs to. */
+void cache_remove_extent(struct lustre_cache *cache,
+                         struct osc_async_page *extent)
+{
+        struct ldlm_lock *lock;
+
+        spin_lock(&extent->oap_lock);
+        lock = extent->oap_ldlm_lock;
+
+        extent->oap_ldlm_lock = NULL;
+        spin_unlock(&extent->oap_lock);
+
+        /* No lock - means this extent is not in any list */
+        if (!lock)
+                return;
+
+        spin_lock(&lock->l_extents_list_lock);
+        if (!list_empty(&extent->oap_page_list))
+                list_del_init(&extent->oap_page_list);
+        spin_unlock(&lock->l_extents_list_lock);
+}
+
+/* Iterate through list of extents in given lock identified by @lockh,
+   calling @cb_func for every such extent. Also passed @data to every call.
+   Stops iterating prematurely if @cb_func returns nonzero. */
+int cache_iterate_extents(struct lustre_cache *cache,
+                          struct lustre_handle *lockh,
+                          cache_iterate_extents_cb_t cb_func, void *data)
+{
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+        struct osc_async_page *extent, *t;
+
+        if (!lock)      // Lock disappeared
+                return 0;
+        /* Parallel page removal from mem pressure can race with us */
+        spin_lock(&lock->l_extents_list_lock);
+        list_for_each_entry_safe(extent, t, &lock->l_extents_list,
+                                 oap_page_list) {
+                if (cb_func(cache, lockh, extent, data))
+                        break;
+        }
+        spin_unlock(&lock->l_extents_list_lock);
+        LDLM_LOCK_PUT(lock);
+
+        return 0;
+}
+
+static int cache_remove_extents_from_lock(struct lustre_cache *cache,
+                                          struct ldlm_lock *lock, void *data)
+{
+        struct osc_async_page *extent;
+        void *ext_data;
+
+        LASSERT(lock);
+
+        spin_lock(&lock->l_extents_list_lock);
+        while (!list_empty(&lock->l_extents_list)) {
+                extent = list_entry(lock->l_extents_list.next,
+                                    struct osc_async_page, oap_page_list);
+
+                spin_lock(&extent->oap_lock);
+                /* If there is no lock referenced from this oap, it means
+                   there is parallel page-removal process waiting to free that
+                   page on l_extents_list_lock and it holds page lock.
+                   We need this page to completely go away and for that to
+                   happen we will just try to truncate it here too.
+                   Serialisation on page lock will achieve that goal for us. */
+                /* Try to add extent back to the cache first, but only if we
+                 * cancel read lock, write locks cannot have other overlapping
+                 * locks. If adding is not possible (or canceling pw lock),
+                 * then remove extent from cache */
+                if (!cache_remove_extent_nolock(cache, extent) ||
+                    (lock->l_granted_mode == LCK_PW) ||
+                    cache_add_extent(cache, &lock->l_resource->lr_name, extent,
+                                     NULL)) {
+                        /* We need to remember this oap_page value now,
+                           once we release spinlocks, extent struct
+                           might be freed and we endup requesting
+                           page with address 0x5a5a5a5a in
+                           cache_extent_removal_event */
+                        ext_data = extent->oap_page;
+                        cache->lc_pin_extent_cb(extent->oap_page);
+                        spin_unlock(&extent->oap_lock);
+                        spin_unlock(&lock->l_extents_list_lock);
+                        cache_extent_removal_event(cache, ext_data,
+                                                   lock->
+                                                   l_flags &
+                                                   LDLM_FL_DISCARD_DATA);
+                        spin_lock(&lock->l_extents_list_lock);
+                } else {
+                        spin_unlock(&extent->oap_lock);
+                }
+        }
+        spin_unlock(&lock->l_extents_list_lock);
+
+        return 0;
+}
+
+/* Remoes @lock from cache after necessary checks. */
+int cache_remove_lock(struct lustre_cache *cache, struct lustre_handle *lockh)
+{
+        struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+        if (!lock)  // The lock was removed by somebody just now, nothing to do
+                return 0;
+
+        cache_remove_extents_from_lock(cache, lock, NULL /*data */ );
+
+        spin_lock(&cache->lc_locks_list_lock);
+        list_del_init(&lock->l_cache_locks_list);
+        spin_unlock(&cache->lc_locks_list_lock);
+
+        LDLM_LOCK_PUT(lock);
+
+        return 0;
+}
+
+/* Supposed to iterate through all locks in the cache for given resource.
+   Not implemented atthe moment. */
+int cache_iterate_locks(struct lustre_cache *cache, struct ldlm_res_id *res,
+                        cache_iterate_locks_cb_t cb_fun, void *data)
+{
+        return -ENOTSUPP;
+}
+
+/* Create lustre cache and attach it to @obd */
+struct lustre_cache *cache_create(struct obd_device *obd)
+{
+        struct lustre_cache *cache;
+
+        OBD_ALLOC(cache, sizeof(*cache));
+        if (!cache)
+                GOTO(out, NULL);
+        spin_lock_init(&cache->lc_locks_list_lock);
+        CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
+        CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
+        cache->lc_obd = obd;
+
+      out:
+        return cache;
+}
+
+/* Destroy @cache and free its memory */
+int cache_destroy(struct lustre_cache *cache)
+{
+        if (cache) {
+                spin_lock(&cache->lc_locks_list_lock);
+                if (!list_empty(&cache->lc_locks_list)) {
+                        struct ldlm_lock *lock, *tmp;
+                        CERROR("still have locks in the list on cleanup:\n");
+
+                        list_for_each_entry_safe(lock, tmp,
+                                                 &cache->lc_locks_list,
+                                                 l_cache_locks_list) {
+                                list_del_init(&lock->l_cache_locks_list);
+                                /* XXX: Of course natural idea would be to print
+                                   offending locks here, but if we use
+                                   e.g. LDLM_ERROR, we will likely crash here,
+                                   as LDLM error tries to access e.g.
+                                   nonexisting namespace. Normally this kind of
+                                   case could only happen when somebody did not
+                                   release lock reference and we have other ways
+                                   to detect this. */
+                                /* Make sure there are no pages left under the
+                                   lock */
+                                LASSERT(list_empty(&lock->l_extents_list));
+                        }
+                }
+                spin_unlock(&cache->lc_locks_list_lock);
+                LASSERT(list_empty(&cache->lc_page_removal_callback_list));
+                OBD_FREE(cache, sizeof(*cache));
+        }
+
+        return 0;
+}
diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h
index 72ce3ec4a3f6d178d372d3a1b23228fe54dc6286..451a79b24709b117b7168ec8047efbf2a5b28258 100644
--- a/lustre/osc/osc_internal.h
+++ b/lustre/osc/osc_internal.h
@@ -28,8 +28,11 @@ struct osc_async_page {
         struct client_obd       *oap_cli;
         struct lov_oinfo        *oap_loi;
 
-	struct obd_async_page_ops *oap_caller_ops;
+        struct obd_async_page_ops *oap_caller_ops;
         void                    *oap_caller_data;
+        struct list_head         oap_page_list;
+        struct ldlm_lock        *oap_ldlm_lock;
+        spinlock_t               oap_lock;
 };
 
 #define oap_page        oap_brw_page.pg
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index 42b877d7e225723023fcc4027fe87c3c71a9ed90..47843db5d74a233a4fed2b3228e3a686e076d53b 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -56,12 +56,14 @@
 #include <lustre_log.h>
 #include <lustre_debug.h>
 #include <lustre_param.h>
+#include <lustre_cache.h>
 #include "osc_internal.h"
 
 static quota_interface_t *quota_interface = NULL;
 extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
+int osc_cleanup(struct obd_device *obd);
 
 static quota_interface_t *quota_interface;
 extern quota_interface_t osc_quota_interface;
@@ -2394,9 +2396,13 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                         struct lov_oinfo *loi, cfs_page_t *page,
                         obd_off offset, struct obd_async_page_ops *ops,
-                        void *data, void **res)
+                        void *data, void **res, int nocache,
+                        struct lustre_handle *lockh)
 {
         struct osc_async_page *oap;
+        struct ldlm_res_id oid = {{0}};
+        int rc = 0;
+        
         ENTRY;
 
         if (!page)
@@ -2416,9 +2422,24 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
+        INIT_LIST_HEAD(&oap->oap_page_list);
 
         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
 
+        spin_lock_init(&oap->oap_lock);
+
+        /* If the page was marked as notcacheable - don't add to any locks */ 
+        if (!nocache) {
+                oid.name[0] = loi->loi_id;
+                /* This is the only place where we can call cache_add_extent
+                   without oap_lock, because this page is locked now, and
+                   the lock we are adding it to is referenced, so cannot lose
+                   any pages either. */
+                rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
+                if (rc)
+                        RETURN(rc);
+        }
+
         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
         RETURN(0);
 }
@@ -2703,6 +2724,7 @@ static int osc_teardown_async_page(struct obd_export *exp,
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
         }
         loi_list_maint(cli, loi);
+        cache_remove_extent(cli->cl_cache, oap);
 
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out:
@@ -2710,6 +2732,49 @@ out:
         RETURN(rc);
 }
 
+int osc_extent_blocking_cb(struct ldlm_lock *lock,
+                           struct ldlm_lock_desc *new, void *data,
+                           int flag)
+{
+        struct lustre_handle lockh = { 0 };
+        int rc;
+        ENTRY;  
+                
+        if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
+                LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
+                LBUG(); 
+        }       
+
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_cancel failed: %d\n", rc);
+                break;
+        case LDLM_CB_CANCELING: {
+
+                ldlm_lock2handle(lock, &lockh);
+                /* This lock wasn't granted, don't try to do anything */
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        RETURN(0);
+
+                cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
+                                  &lockh);
+
+                if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
+                        lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
+                                                          lock, new, data,flag);
+                break;
+        }
+        default:
+                LBUG();
+        }
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(osc_extent_blocking_cb);
+
 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                                     int flags)
 {
@@ -2751,8 +2816,8 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
         return 0;
 }
 
-static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
-                            int intent, int rc)
+static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
+                            struct obd_info *oinfo, int intent, int rc)
 {
         ENTRY;
 
@@ -2778,6 +2843,9 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
         }
 
+        if (!rc)
+                cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
+
         /* Call the update callback. */
         rc = oinfo->oi_cb_up(oinfo, rc);
         RETURN(rc);
@@ -2804,7 +2872,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                    aa->oa_oi->oi_lockh, rc);
 
         /* Complete osc stuff. */
-        rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
+        rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
 
         /* Release the lock for async request. */
         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
@@ -2932,7 +3000,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
 
-        rc = osc_enqueue_fini(req, oinfo, intent, rc);
+        rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
         if (intent)
                 ptlrpc_req_finished(req);
 
@@ -3621,6 +3689,11 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
                                             OST_MAXREQSIZE,
                                             ptlrpc_add_rqs_to_pool);
+                cli->cl_cache = cache_create(obd);
+                if (!cli->cl_cache) {
+                        osc_cleanup(obd);
+                        rc = -ENOMEM;
+                }
         }
 
         RETURN(rc);
@@ -3683,12 +3756,50 @@ int osc_cleanup(struct obd_device *obd)
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
+        cache_destroy(obd->u.cli.cl_cache);
         rc = client_obd_cleanup(obd);
 
         ptlrpcd_decref();
         RETURN(rc);
 }
 
+static int osc_register_page_removal_cb(struct obd_export *exp,
+                                        obd_page_removal_cb_t func,
+                                        obd_pin_extent_cb pin_cb)
+{
+        return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+                                           pin_cb);
+}
+
+static int osc_unregister_page_removal_cb(struct obd_export *exp,
+                                          obd_page_removal_cb_t func)
+{
+        return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+}
+
+static int osc_register_lock_cancel_cb(struct obd_export *exp,
+                                       obd_lock_cancel_cb cb)
+{
+        LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+        return 0;
+}
+
+static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+                                         obd_lock_cancel_cb cb)
+{
+        if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+                CERROR("Unregistering cancel cb %p, while only %p was "
+                       "registered\n", cb,
+                       exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+                RETURN(-EINVAL);
+        }
+
+        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+        return 0;
+}
+
 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
@@ -3745,6 +3856,10 @@ struct obd_ops osc_obd_ops = {
         .o_llog_init            = osc_llog_init,
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
+        .o_register_page_removal_cb = osc_register_page_removal_cb,
+        .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
 };
 int __init osc_init(void)
 {
diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh
index c86e7862bb949b59e182392fe1612aa0b612ddb3..1d4d4a915cb0501b89286ea5cbb77312e0dc1c80 100644
--- a/lustre/tests/conf-sanity.sh
+++ b/lustre/tests/conf-sanity.sh
@@ -1240,9 +1240,21 @@ test_33() { # bug 12333
 }
 run_test 33 "Mount ost with a large index number"
 
-umount_client $MOUNT	
+umount_client $MOUNT
 cleanup_nocli
 
+test_23() {
+        start_ost
+        start_mds
+        # Simulate -EINTR during mount OBD_FAIL_LDLM_CLOSE_THREAD
+        sysctl -w lustre.fail_loc=0x80000313
+        mount_client $MOUNT
+        cleanup
+}
+run_test 23 "Simulate -EINTR during mount"
+
+equals_msg "Done"
+echo "$0: completed"
 test_33a() {
         setup
 
diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh
index c3cdf26c1c97efe517cb078e1e05b49d7701abbc..35c29def2f258c70696872539dcaa27aeb0c9074 100644
--- a/lustre/tests/sanity.sh
+++ b/lustre/tests/sanity.sh
@@ -2589,7 +2589,9 @@ test_62() {
         cat $f && error "cat succeeded, expect -EIO"
         sysctl -w lustre.fail_loc=0
 }
-run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
+# This test is now irrelevant (as of bug 10718 inclusion), we no longer
+# match every page all of the time.
+#run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)"
 
 # bug 2319 - oig_wait() interrupted causes crash because of invalid waitq.
 test_63() {
@@ -3323,6 +3325,20 @@ test_79() { # bug 12743
 }
 run_test 79 "df report consistency check ======================="
 
+test_80() { # bug 10718
+        dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 seek=1M
+        sync; sleep 1; sync
+        BEFORE=`date +%s`
+        cancel_lru_locks OSC
+        AFTER=`date +%s`
+        DIFF=$((AFTER-BEFORE))
+        if [ $DIFF -gt 1 ] ; then
+                error "elapsed for 1M@1T = $DIFF"
+        fi
+        true
+}
+run_test 80 "Page eviction is equally fast at high offsets too  ===="
+
 # on the LLNL clusters, runas will still pick up root's $TMP settings,
 # which will not be writable for the runas user, and then you get a CVS
 # error message with a corrupt path string (CVS bug) and panic.