From 2b5ed846625fb0cd0c1241afd82ddcd18df52d8e Mon Sep 17 00:00:00 2001
From: nikita <nikita>
Date: Sat, 18 Oct 2008 17:16:32 +0000
Subject: [PATCH] Introduce new lu_context functions that are needed on the
 client side, where some system threads (ptlrpcd) are shared by multiple
 modules, and so cannot be stopped during module shutdown. b=16450

---
 lustre/ChangeLog            |   7 +
 lustre/include/lu_object.h  | 245 +++++++++++++++++--------------
 lustre/obdclass/lu_object.c | 277 ++++++++++++++++++++++++++++++------
 lustre/ptlrpc/service.c     |  20 ---
 4 files changed, 378 insertions(+), 171 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index e6046ad643..3a5d5bce11 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -1601,6 +1601,13 @@ Details    : On a server, a file system object is uniquely identified
 	     friends to take additional `lu_conf' argument describing object.
 	     Typically this includes layout information.
 
+Severity   : normal
+Bugzilla   : 16450
+Description: lu_context fixes.
+Details    : Introduce new lu_context functions that are needed on the client
+	     side, where some system threads (ptlrpcd) are shared by multiple
+	     modules, and so cannot be stopped during module shutdown.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h
index 26ab5093e8..c1ab0d3ce3 100644
--- a/lustre/include/lu_object.h
+++ b/lustre/include/lu_object.h
@@ -912,12 +912,15 @@ enum lu_context_state {
  * that allows each layer to associate arbitrary pieces of data with each
  * context (see pthread_key_create(3) for similar interface).
  *
+ * On a client, lu_context is bound to a thread, see cl_env_get().
+ *
+ * \see lu_context_key
  */
 struct lu_context {
-        /*
-         * Theoretically we'd want to use lu_objects and lu_contexts on the
-         * client side too. On the other hand, we don't want to allocate
-         * values of server-side keys for the client contexts and vice versa.
+        /**
+         * lu_context is used on the client side too. Yet we don't want to
+         * allocate values of server-side keys for the client contexts and
+         * vice versa.
          *
          * To achieve this, set of tags in introduced. Contexts and keys are
          * marked with tags. Key value are created only for context whose set
@@ -925,97 +928,161 @@ struct lu_context {
          * from enum lu_context_tag.
          */
         __u32                  lc_tags;
-        /*
+        /**
          * Pointer to the home service thread. NULL for other execution
          * contexts.
          */
         struct ptlrpc_thread  *lc_thread;
-        /*
+        /**
          * Pointer to an array with key values. Internal implementation
          * detail.
          */
         void                 **lc_value;
         enum lu_context_state  lc_state;
+        /**
+         * Linkage into a list of all remembered contexts. Only
+         * `non-transient' contexts, i.e., ones created for service threads
+         * are placed here.
+         */
+        struct list_head       lc_remember;
+        /**
+         * Version counter used to skip calls to lu_context_refill() when no
+         * keys were registered.
+         */
+        unsigned               lc_version;
 };
 
-/*
+/**
  * lu_context_key interface. Similar to pthread_key.
  */
 
 enum lu_context_tag {
-        /*
+        /**
          * Thread on md server
          */
         LCT_MD_THREAD = 1 << 0,
-        /*
+        /**
          * Thread on dt server
          */
         LCT_DT_THREAD = 1 << 1,
-        /*
+        /**
          * Context for transaction handle
          */
         LCT_TX_HANDLE = 1 << 2,
-        /*
+        /**
          * Thread on client
          */
         LCT_CL_THREAD = 1 << 3,
-        /*
-         * Per-request session on server
+        /**
+         * A per-request session on a server, and a per-system-call session on
+         * a client.
          */
         LCT_SESSION   = 1 << 4,
-        /*
+
+        /**
+         * Set when at least one of keys, having values in this context has
+         * non-NULL lu_context_key::lct_exit() method. This is used to
+         * optimize lu_context_exit() call.
+         */
+        LCT_HAS_EXIT  = 1 << 28,
+        /**
          * Don't add references for modules creating key values in that context.
          * This is only for contexts used internally by lu_object framework.
          */
-        LCT_NOREF     = 1 << 30,
-        /*
+        LCT_NOREF     = 1 << 29,
+        /**
+         * Key is being prepared for retiring, don't create new values for it.
+         */
+        LCT_QUIESCENT = 1 << 30,
+        /**
+         * Context should be remembered.
+         */
+        LCT_REMEMBER  = 1 << 31,
+        /**
          * Contexts usable in cache shrinker thread.
          */
         LCT_SHRINKER  = LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD|LCT_NOREF
 };
 
-/*
+/**
  * Key. Represents per-context value slot.
+ *
+ * Keys are usually registered when module owning the key is initialized, and
+ * de-registered when module is unloaded. Once key is registered, all new
+ * contexts with matching tags, will get key value. "Old" contexts, already
+ * initialized at the time of key registration, can be forced to get key value
+ * by calling lu_context_refill().
+ *
+ * Every key value is counted in lu_context_key::lct_used and acquires a
+ * reference on an owning module. This means, that all key values have to be
+ * destroyed before module can be unloaded. This is usually achieved by
+ * stopping threads started by the module, that created contexts in their
+ * entry functions. Situation is complicated by the threads shared by multiple
+ * modules, like ptlrpcd daemon on a client. To work around this problem,
+ * contexts, created in such threads, are `remembered' (see
+ * LCT_REMEMBER)---i.e., added into a global list. When module is preparing
+ * for unloading it does the following:
+ *
+ *     - marks its keys as `quiescent' (lu_context_tag::LCT_QUIESCENT)
+ *       preventing new key values from being allocated in the new contexts,
+ *       and
+ *
+ *     - scans a list of remembered contexts, destroying values of module
+ *       keys, thus releasing references to the module.
+ *
+ * This is done by lu_context_key_quiesce(). If module is re-activated
+ * before key has been de-registered, lu_context_key_revive() call clears
+ * `quiescent' marker.
+ *
+ * lu_context code doesn't provide any internal synchronization for these
+ * activities---it's assumed that startup (including threads start-up) and
+ * shutdown are serialized by some external means.
+ *
+ * \see lu_context
  */
 struct lu_context_key {
-        /*
+        /**
          * Set of tags for which values of this key are to be instantiated.
          */
         __u32 lct_tags;
-        /*
+        /**
          * Value constructor. This is called when new value is created for a
          * context. Returns pointer to new value of error pointer.
          */
         void  *(*lct_init)(const struct lu_context *ctx,
                            struct lu_context_key *key);
-        /*
+        /**
          * Value destructor. Called when context with previously allocated
-         * value of this slot is destroyed. @data is a value that was returned
-         * by a matching call to ->lct_init().
+         * value of this slot is destroyed. \a data is a value that was returned
+         * by a matching call to lu_context_key::lct_init().
          */
         void   (*lct_fini)(const struct lu_context *ctx,
                            struct lu_context_key *key, void *data);
-        /*
+        /**
          * Optional method called on lu_context_exit() for all allocated
          * keys. Can be used by debugging code checking that locks are
          * released, etc.
          */
         void   (*lct_exit)(const struct lu_context *ctx,
                            struct lu_context_key *key, void *data);
-        /*
-         * Internal implementation detail: index within ->lc_value[] reserved
-         * for this key.
+        /**
+         * Internal implementation detail: index within lu_context::lc_value[]
+         * reserved for this key.
          */
         int      lct_index;
-        /*
+        /**
          * Internal implementation detail: number of values created for this
          * key.
          */
         atomic_t lct_used;
-        /*
+        /**
          * Internal implementation detail: module for this key.
          */
         struct module *lct_owner;
+        /**
+         * References to this key. For debugging.
+         */
+        struct lu_ref  lct_reference;
 };
 
 #define LU_KEY_INIT(mod, type)                                    \
@@ -1060,121 +1127,83 @@ do {                                                    \
         (key)->lct_owner = THIS_MODULE;                 \
 } while (0)
 
-
-/*
- * Register new key.
- */
 int   lu_context_key_register(struct lu_context_key *key);
+void  lu_context_key_degister(struct lu_context_key *key);
+void *lu_context_key_get     (const struct lu_context *ctx,
+                               const struct lu_context_key *key);
+void  lu_context_key_quiesce (struct lu_context_key *key);
+void  lu_context_key_revive  (struct lu_context_key *key);
+
+
 /*
- * Deregister key.
+ * LU_KEY_INIT_GENERIC() has to be a macro to correctly determine an
+ * owning module.
  */
-void  lu_context_key_degister(struct lu_context_key *key);
 
-#define LU_KEY_REGISTER_GENERIC(mod)                                             \
-        static int mod##_key_register_generic(struct lu_context_key *k, ...)     \
+#define LU_KEY_INIT_GENERIC(mod)                                        \
+        static void mod##_key_init_generic(struct lu_context_key *k, ...) \
         {                                                                        \
-                struct lu_context_key* key = k;                                  \
+                struct lu_context_key *key = k;                         \
                 va_list args;                                                    \
-                int result;                                                      \
                                                                                  \
                 va_start(args, k);                                               \
-                                                                                 \
                 do {                                                             \
                         LU_CONTEXT_KEY_INIT(key);                                \
-                        result = lu_context_key_register(key);                   \
-                        if (result)                                              \
-                                break;                                           \
-                        key = va_arg(args, struct lu_context_key*);              \
+                        key = va_arg(args, struct lu_context_key *);    \
                 } while (key != NULL);                                           \
-                                                                                 \
-                va_end(args);                                                    \
-                                                                                 \
-                if (result) {                                                    \
-                        va_start(args, k);                                       \
-                        while (k != key) {                                       \
-                                lu_context_key_degister(k);                      \
-                                k = va_arg(args, struct lu_context_key*);        \
-                        }                                                        \
-                        va_end(args);                                            \
-                }                                                                \
-                                                                                 \
-                return result;                                                   \
-        }
-
-#define LU_KEY_DEGISTER_GENERIC(mod)                                             \
-        static void mod##_key_degister_generic(struct lu_context_key *k, ...)    \
-        {                                                                        \
-                va_list args;                                                    \
-                                                                                 \
-                va_start(args, k);                                               \
-                                                                                 \
-                do {                                                             \
-                        lu_context_key_degister(k);                              \
-                        k = va_arg(args, struct lu_context_key*);                \
-                } while (k != NULL);                                             \
-                                                                                 \
                 va_end(args);                                                    \
         }
 
 #define LU_TYPE_INIT(mod, ...)                                         \
-        LU_KEY_REGISTER_GENERIC(mod)                                   \
+        LU_KEY_INIT_GENERIC(mod)                                        \
         static int mod##_type_init(struct lu_device_type *t)           \
         {                                                              \
-                return mod##_key_register_generic(__VA_ARGS__, NULL);  \
+                mod##_key_init_generic(__VA_ARGS__, NULL);              \
+                return lu_context_key_register_many(__VA_ARGS__, NULL); \
         }                                                              \
         struct __##mod##_dummy_type_init {;}
 
 #define LU_TYPE_FINI(mod, ...)                                         \
-        LU_KEY_DEGISTER_GENERIC(mod)                                   \
         static void mod##_type_fini(struct lu_device_type *t)          \
         {                                                              \
-                mod##_key_degister_generic(__VA_ARGS__, NULL);         \
+                lu_context_key_degister_many(__VA_ARGS__, NULL);        \
         }                                                              \
         struct __##mod##_dummy_type_fini {;}
 
+
+
+
 #define LU_TYPE_INIT_FINI(mod, ...)                                 \
         LU_TYPE_INIT(mod, __VA_ARGS__);                             \
-        LU_TYPE_FINI(mod, __VA_ARGS__)
+        LU_TYPE_FINI(mod, __VA_ARGS__);         \
+        LU_TYPE_START(mod, __VA_ARGS__);        \
+        LU_TYPE_STOP(mod, __VA_ARGS__)
 
-/*
- * Return value associated with key @key in context @ctx.
- */
-void *lu_context_key_get(const struct lu_context *ctx,
-                         struct lu_context_key *key);
+int   lu_context_init  (struct lu_context *ctx, __u32 tags);
+void  lu_context_fini  (struct lu_context *ctx);
+void  lu_context_enter (struct lu_context *ctx);
+void  lu_context_exit  (struct lu_context *ctx);
+int   lu_context_refill(struct lu_context *ctx);
 
 /*
- * Initialize context data-structure. Create values for all keys.
- */
-int  lu_context_init(struct lu_context *ctx, __u32 tags);
-/*
- * Finalize context data-structure. Destroy key values.
+ * Helper functions to operate on multiple keys. These are used by the default
+ * device type operations, defined by LU_TYPE_INIT_FINI().
  */
-void lu_context_fini(struct lu_context *ctx);
 
-/*
- * Called before entering context.
- */
-void lu_context_enter(struct lu_context *ctx);
-/*
- * Called after exiting from @ctx
- */
-void lu_context_exit(struct lu_context *ctx);
-
-/*
- * Allocate for context all missing keys that were registered after context
- * creation.
- */
-int lu_context_refill(const struct lu_context *ctx);
+int  lu_context_key_register_many(struct lu_context_key *k, ...);
+void lu_context_key_degister_many(struct lu_context_key *k, ...);
+void lu_context_key_revive_many  (struct lu_context_key *k, ...);
+void lu_context_key_quiesce_many (struct lu_context_key *k, ...);
 
-/*
+/**
  * Environment.
  */
 struct lu_env {
-        /*
+        /**
          * "Local" context, used to store data instead of stack.
          */
         struct lu_context  le_ctx;
-        /*
+        /**
          * "Session" context for per-request data.
          */
         struct lu_context *le_ses;
@@ -1183,7 +1212,15 @@ struct lu_env {
 int  lu_env_init(struct lu_env *env, struct lu_context *ses, __u32 tags);
 void lu_env_fini(struct lu_env *env);
 
-/*
+/** @} lu_context */
+
+/**
+ * Output site statistical counters into a buffer. Suitable for
+ * ll_rd_*()-style functions.
+ */
+int lu_site_stats_print(const struct lu_site *s, char *page, int count);
+
+/**
  * Common name structure to be passed around for various name related methods.
  */
 struct lu_name {
diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c
index f8b7ea7639..30c4fe28f1 100644
--- a/lustre/obdclass/lu_object.c
+++ b/lustre/obdclass/lu_object.c
@@ -852,24 +852,34 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
 
                 next = ldt->ldt_ops->ldto_device_free(env, scan);
                 type = ldt->ldt_obd_type;
+                if (type != NULL) {
                 type->typ_refcnt--;
                 class_put_type(type);
         }
+        }
 }
 EXPORT_SYMBOL(lu_stack_fini);
 
 enum {
-        /*
+        /**
          * Maximal number of tld slots.
          */
-        LU_CONTEXT_KEY_NR = 16
+        LU_CONTEXT_KEY_NR = 32
 };
 
 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
 static spinlock_t lu_keys_guard = SPIN_LOCK_UNLOCKED;
 
-/*
+/**
+ * Global counter incremented whenever key is registered, unregistered,
+ * revived or quiesced. This is used to void unnecessary calls to
+ * lu_context_refill(). No locking is provided, as initialization and shutdown
+ * are supposed to be externally serialized.
+ */
+static unsigned key_set_version = 0;
+
+/**
  * Register new key.
  */
 int lu_context_key_register(struct lu_context_key *key)
@@ -889,7 +899,9 @@ int lu_context_key_register(struct lu_context_key *key)
                         key->lct_index = i;
                         atomic_set(&key->lct_used, 1);
                         lu_keys[i] = key;
+                        lu_ref_init(&key->lct_reference);
                         result = 0;
+                        ++key_set_version;
                         break;
                 }
         }
@@ -909,6 +921,7 @@ static void key_fini(struct lu_context *ctx, int index)
                 LASSERT(atomic_read(&key->lct_used) > 1);
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
+                lu_ref_del(&key->lct_reference, "ctx", ctx);
                 atomic_dec(&key->lct_used);
                 LASSERT(key->lct_owner != NULL);
                 if (!(ctx->lc_tags & LCT_NOREF)) {
@@ -919,14 +932,15 @@ static void key_fini(struct lu_context *ctx, int index)
         }
 }
 
-/*
+/**
  * Deregister key.
  */
 void lu_context_key_degister(struct lu_context_key *key)
 {
         LASSERT(atomic_read(&key->lct_used) >= 1);
-        LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
+        LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
 
+        ++key_set_version;
         key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
         if (atomic_read(&key->lct_used) > 1)
@@ -937,18 +951,134 @@ void lu_context_key_degister(struct lu_context_key *key)
 }
 EXPORT_SYMBOL(lu_context_key_degister);
 
-/*
- * Return value associated with key @key in context @ctx.
+/**
+ * Register a number of keys. This has to be called after all keys have been
+ * initialized by a call to LU_CONTEXT_KEY_INIT().
+ */
+int lu_context_key_register_many(struct lu_context_key *k, ...)
+{
+        struct lu_context_key *key = k;
+        va_list args;
+        int result;
+
+        va_start(args, k);
+        do {
+                result = lu_context_key_register(key);
+                if (result)
+                        break;
+                key = va_arg(args, struct lu_context_key *);
+        } while (key != NULL);
+        va_end(args);
+
+        if (result != 0) {
+                va_start(args, k);
+                while (k != key) {
+                        lu_context_key_degister(k);
+                        k = va_arg(args, struct lu_context_key *);
+                }
+                va_end(args);
+        }
+
+        return result;
+}
+EXPORT_SYMBOL(lu_context_key_register_many);
+
+/**
+ * De-register a number of keys. This is a dual to
+ * lu_context_key_register_many().
+ */
+void lu_context_key_degister_many(struct lu_context_key *k, ...)
+{
+        va_list args;
+
+        va_start(args, k);
+        do {
+                lu_context_key_degister(k);
+                k = va_arg(args, struct lu_context_key*);
+        } while (k != NULL);
+        va_end(args);
+}
+EXPORT_SYMBOL(lu_context_key_degister_many);
+
+/**
+ * Revive a number of keys.
+ */
+void lu_context_key_revive_many(struct lu_context_key *k, ...)
+{
+        va_list args;
+
+        va_start(args, k);
+        do {
+                lu_context_key_revive(k);
+                k = va_arg(args, struct lu_context_key*);
+        } while (k != NULL);
+        va_end(args);
+}
+EXPORT_SYMBOL(lu_context_key_revive_many);
+
+/**
+ * Quiescent a number of keys.
+ */
+void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
+{
+        va_list args;
+
+        va_start(args, k);
+        do {
+                lu_context_key_quiesce(k);
+                k = va_arg(args, struct lu_context_key*);
+        } while (k != NULL);
+        va_end(args);
+}
+EXPORT_SYMBOL(lu_context_key_quiesce_many);
+
+/**
+ * Return value associated with key \a key in context \a ctx.
  */
 void *lu_context_key_get(const struct lu_context *ctx,
-                         struct lu_context_key *key)
+                         const struct lu_context_key *key)
 {
-        LASSERT(ctx->lc_state == LCS_ENTERED);
-        LASSERT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
+        LINVRNT(ctx->lc_state == LCS_ENTERED);
+        LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
         return ctx->lc_value[key->lct_index];
 }
 EXPORT_SYMBOL(lu_context_key_get);
 
+/**
+ * List of remembered contexts. XXX document me.
+ */
+static CFS_LIST_HEAD(lu_context_remembered);
+
+/**
+ * Destroy \a key in all remembered contexts. This is used to destroy key
+ * values in "shared" contexts (like service threads), when a module owning
+ * the key is about to be unloaded.
+ */
+void lu_context_key_quiesce(struct lu_context_key *key)
+{
+        struct lu_context *ctx;
+
+        if (!(key->lct_tags & LCT_QUIESCENT)) {
+                key->lct_tags |= LCT_QUIESCENT;
+                /*
+                 * XXX memory barrier has to go here.
+                 */
+                spin_lock(&lu_keys_guard);
+                list_for_each_entry(ctx, &lu_context_remembered, lc_remember)
+                        key_fini(ctx, key->lct_index);
+                spin_unlock(&lu_keys_guard);
+                ++key_set_version;
+        }
+}
+EXPORT_SYMBOL(lu_context_key_quiesce);
+
+void lu_context_key_revive(struct lu_context_key *key)
+{
+        key->lct_tags &= ~LCT_QUIESCENT;
+        ++key_set_version;
+}
+EXPORT_SYMBOL(lu_context_key_revive);
+
 static void keys_fini(struct lu_context *ctx)
 {
         int i;
@@ -962,7 +1092,7 @@ static void keys_fini(struct lu_context *ctx)
         }
 }
 
-static int keys_fill(const struct lu_context *ctx)
+static int keys_fill(struct lu_context *ctx)
 {
         int i;
 
@@ -970,12 +1100,17 @@ static int keys_fill(const struct lu_context *ctx)
                 struct lu_context_key *key;
 
                 key = lu_keys[i];
-                if (ctx->lc_value[i] == NULL &&
-                    key != NULL && key->lct_tags & ctx->lc_tags) {
+                if (ctx->lc_value[i] == NULL && key != NULL &&
+                    (key->lct_tags & ctx->lc_tags) &&
+                    /*
+                     * Don't create values for a LCT_QUIESCENT key, as this
+                     * will pin module owning a key.
+                     */
+                    !(key->lct_tags & LCT_QUIESCENT)) {
                         void *value;
 
-                        LASSERT(key->lct_init != NULL);
-                        LASSERT(key->lct_index == i);
+                        LINVRNT(key->lct_init != NULL);
+                        LINVRNT(key->lct_index == i);
 
                         value = key->lct_init(ctx, key);
                         if (unlikely(IS_ERR(value)))
@@ -983,9 +1118,18 @@ static int keys_fill(const struct lu_context *ctx)
                         LASSERT(key->lct_owner != NULL);
                         if (!(ctx->lc_tags & LCT_NOREF))
                                 try_module_get(key->lct_owner);
+                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
                         atomic_inc(&key->lct_used);
+                        /*
+                         * This is the only place in the code, where an
+                         * element of ctx->lc_value[] array is set to non-NULL
+                         * value.
+                         */
                         ctx->lc_value[i] = value;
+                        if (key->lct_exit != NULL)
+                                ctx->lc_tags |= LCT_HAS_EXIT;
                 }
+                ctx->lc_version = key_set_version;
         }
         return 0;
 }
@@ -1005,7 +1149,7 @@ static int keys_init(struct lu_context *ctx)
         return result;
 }
 
-/*
+/**
  * Initialize context data-structure. Create values for all keys.
  */
 int lu_context_init(struct lu_context *ctx, __u32 tags)
@@ -1013,41 +1157,50 @@ int lu_context_init(struct lu_context *ctx, __u32 tags)
         memset(ctx, 0, sizeof *ctx);
         ctx->lc_state = LCS_INITIALIZED;
         ctx->lc_tags = tags;
+        if (tags & LCT_REMEMBER) {
+                spin_lock(&lu_keys_guard);
+                list_add(&ctx->lc_remember, &lu_context_remembered);
+                spin_unlock(&lu_keys_guard);
+        } else
+                CFS_INIT_LIST_HEAD(&ctx->lc_remember);
         return keys_init(ctx);
 }
 EXPORT_SYMBOL(lu_context_init);
 
-/*
+/**
  * Finalize context data-structure. Destroy key values.
  */
 void lu_context_fini(struct lu_context *ctx)
 {
-        LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
+        LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
         ctx->lc_state = LCS_FINALIZED;
         keys_fini(ctx);
+        spin_lock(&lu_keys_guard);
+        list_del_init(&ctx->lc_remember);
+        spin_unlock(&lu_keys_guard);
 }
 EXPORT_SYMBOL(lu_context_fini);
 
-/*
+/**
  * Called before entering context.
  */
 void lu_context_enter(struct lu_context *ctx)
 {
-        LASSERT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
+        LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
         ctx->lc_state = LCS_ENTERED;
 }
 EXPORT_SYMBOL(lu_context_enter);
 
-/*
- * Called after exiting from @ctx
+/**
+ * Called after exiting from \a ctx
  */
 void lu_context_exit(struct lu_context *ctx)
 {
         int i;
 
-        LASSERT(ctx->lc_state == LCS_ENTERED);
+        LINVRNT(ctx->lc_state == LCS_ENTERED);
         ctx->lc_state = LCS_LEFT;
-        if (ctx->lc_value != NULL) {
+        if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
                         if (ctx->lc_value[i] != NULL) {
                                 struct lu_context_key *key;
@@ -1063,14 +1216,14 @@ void lu_context_exit(struct lu_context *ctx)
 }
 EXPORT_SYMBOL(lu_context_exit);
 
-/*
+/**
  * Allocate for context all missing keys that were registered after context
  * creation.
  */
-int lu_context_refill(const struct lu_context *ctx)
+int lu_context_refill(struct lu_context *ctx)
 {
-        LASSERT(ctx->lc_value != NULL);
-        return keys_fill(ctx);
+        LINVRNT(ctx->lc_value != NULL);
+        return ctx->lc_version == key_set_version ? 0 : keys_fill(ctx);
 }
 EXPORT_SYMBOL(lu_context_refill);
 
@@ -1079,7 +1232,7 @@ static int lu_env_setup(struct lu_env *env, struct lu_context *ses,
 {
         int result;
 
-        LASSERT(ergo(!noref, !(tags & LCT_NOREF)));
+        LINVRNT(ergo(!noref, !(tags & LCT_NOREF)));
 
         env->le_ses = ses;
         result = lu_context_init(&env->le_ctx, tags);
@@ -1108,6 +1261,20 @@ void lu_env_fini(struct lu_env *env)
 }
 EXPORT_SYMBOL(lu_env_fini);
 
+int lu_env_refill(struct lu_env *env)
+{
+        int result;
+
+        result = lu_context_refill(&env->le_ctx);
+        if (result == 0 && env->le_ses != NULL)
+                result = lu_context_refill(env->le_ses);
+        return result;
+}
+EXPORT_SYMBOL(lu_env_refill);
+
+static struct shrinker *lu_site_shrinker = NULL;
+
+#ifdef __KERNEL__
 static int lu_cache_shrink(int nr, unsigned int gfp_mask)
 {
         struct lu_site *s;
@@ -1140,43 +1307,57 @@ static int lu_cache_shrink(int nr, unsigned int gfp_mask)
         return cached;
 }
 
-static struct shrinker *lu_site_shrinker = NULL;
+#else  /* !__KERNEL__ */
+static int lu_cache_shrink(int nr, unsigned int gfp_mask)
+{
+        return 0;
+}
+#endif /* __KERNEL__ */
 
-/*
+int  lu_ref_global_init(void);
+void lu_ref_global_fini(void);
+
+/**
  * Initialization of global lu_* data.
  */
 int lu_global_init(void)
 {
         int result;
 
+        CDEBUG(D_CONSOLE, "Lustre LU module (%p).\n", &lu_keys);
+
         LU_CONTEXT_KEY_INIT(&lu_global_key);
         result = lu_context_key_register(&lu_global_key);
-        if (result == 0) {
+        if (result != 0)
+                return result;
                 /*
-                 * At this level, we don't know what tags are needed, so
-                 * allocate them conservatively. This should not be too bad,
-                 * because this environment is global.
+         * At this level, we don't know what tags are needed, so allocate them
+         * conservatively. This should not be too bad, because this
+         * environment is global.
                  */
                 down(&lu_sites_guard);
                 result = lu_env_init_noref(&lu_shrink_env, NULL, LCT_SHRINKER);
                 up(&lu_sites_guard);
-                if (result == 0) {
+        if (result != 0)
+                return result;
+
+        result = lu_ref_global_init();
+        if (result != 0)
+                return result;
                         /*
-                         * seeks estimation: 3 seeks to read a record from oi,
-                         * one to read inode, one for ea. Unfortunately
-                         * setting this high value results in lu_object/inode
-                         * cache consuming all the memory.
-                         */
-                        lu_site_shrinker = set_shrinker(DEFAULT_SEEKS,
-                                                        lu_cache_shrink);
-                        if (result == 0)
+         * seeks estimation: 3 seeks to read a record from oi, one to read
+         * inode, one for ea. Unfortunately setting this high value results in
+         * lu_object/inode cache consuming all the memory.
+         */
+        lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, lu_cache_shrink);
+        if (lu_site_shrinker == NULL)
+                return -ENOMEM;
+
                                 result = lu_time_global_init();
-                }
-        }
         return result;
 }
 
-/*
+/**
  * Dual to lu_global_init().
  */
 void lu_global_fini(void)
@@ -1196,6 +1377,8 @@ void lu_global_fini(void)
         down(&lu_sites_guard);
         lu_env_fini(&lu_shrink_env);
         up(&lu_sites_guard);
+
+        lu_ref_global_fini();
 }
 
 struct lu_buf LU_BUF_NULL = {
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
index 1f08a77c14..54b56c1859 100644
--- a/lustre/ptlrpc/service.c
+++ b/lustre/ptlrpc/service.c
@@ -603,26 +603,6 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         EXIT;
 }
 
-#ifndef __KERNEL__
-int lu_context_init(struct lu_context *ctx, __u32 tags)
-{
-        return 0;
-}
-
-void lu_context_fini(struct lu_context *ctx)
-{
-}
-
-void lu_context_enter(struct lu_context *ctx)
-{
-}
-
-void lu_context_exit(struct lu_context *ctx)
-{
-}
-
-#endif
-
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
         if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
-- 
GitLab