diff --git a/lustre/include/linux/lustre_sec.h b/lustre/include/linux/lustre_sec.h
index 442617832f304c4d5074e568ac3fe7e3ad73558b..e452316c17462e4c9011ed3a68e6f93d3eb0f9d5 100644
--- a/lustre/include/linux/lustre_sec.h
+++ b/lustre/include/linux/lustre_sec.h
@@ -138,10 +138,17 @@ struct ptlrpc_credops {
                            struct ptlrpc_request *req);
 };
 
-#define PTLRPC_CRED_UPTODATE    0x00000001 /* uptodate */
-#define PTLRPC_CRED_DEAD        0x00000002 /* mark expired gracefully */
-#define PTLRPC_CRED_ERROR       0x00000004 /* fatal error (refresh, etc.) */
-#define PTLRPC_CRED_FLAGS_MASK  0x00000007
+#define PTLRPC_CRED_UPTODATE_BIT        0 /* uptodate */
+#define PTLRPC_CRED_DEAD_BIT            1 /* mark expired gracefully */
+#define PTLRPC_CRED_ERROR_BIT           2 /* fatal error (refresh, etc.) */
+
+#define PTLRPC_CRED_UPTODATE            (1 << PTLRPC_CRED_UPTODATE_BIT)
+#define PTLRPC_CRED_DEAD                (1 << PTLRPC_CRED_DEAD_BIT)
+#define PTLRPC_CRED_ERROR               (1 << PTLRPC_CRED_ERROR_BIT)
+
+#define PTLRPC_CRED_FLAGS_MASK          (PTLRPC_CRED_UPTODATE |         \
+                                         PTLRPC_CRED_DEAD |             \
+                                         PTLRPC_CRED_ERROR)
 
 struct ptlrpc_cred {
         struct list_head        pc_hash;   /* linked into hash table */
@@ -149,7 +156,7 @@ struct ptlrpc_cred {
         struct ptlrpc_sec      *pc_sec;
         struct ptlrpc_credops  *pc_ops;
         unsigned long           pc_expire;
-        int                     pc_flags;
+        unsigned long           pc_flags;
         /* XXX maybe should not be here */
         __u64                   pc_pag;
         uid_t                   pc_uid;
@@ -227,14 +234,6 @@ static inline void ptlrpcs_cred_get(struct ptlrpc_cred *cred)
         atomic_inc(&cred->pc_refcount);
 }
 
-static inline int ptlrpcs_cred_is_uptodate(struct ptlrpc_cred *cred)
-{
-        LASSERT(cred);
-        LASSERT(atomic_read(&cred->pc_refcount));
-        return ((cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) ==
-                PTLRPC_CRED_UPTODATE);
-}
-
 static inline int ptlrpcs_cred_refresh(struct ptlrpc_cred *cred)
 {
         LASSERT(cred);
@@ -244,25 +243,32 @@ static inline int ptlrpcs_cred_refresh(struct ptlrpc_cred *cred)
         return cred->pc_ops->refresh(cred);
 }
 
-/* we set the cred flags is safe since cred cache code don't
- * touch cred with refcount > 0
- */
-static inline void ptlrpcs_cred_expire(struct ptlrpc_cred *cred)
+static inline int ptlrpcs_cred_is_uptodate(struct ptlrpc_cred *cred)
 {
-        LASSERT(atomic_read(&cred->pc_refcount));
-        LASSERT(cred->pc_sec);
+        smp_mb();
+        return ((cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) ==
+                PTLRPC_CRED_UPTODATE);
+}
 
-        if (cred->pc_flags & PTLRPC_CRED_DEAD)
-                return;
-        cred->pc_flags |= PTLRPC_CRED_DEAD;
-        cred->pc_flags &= ~PTLRPC_CRED_UPTODATE;
-        CWARN("cred %p: get expired\n", cred);
+static inline int ptlrpcs_cred_is_dead(struct ptlrpc_cred *cred)
+{
+        smp_mb();
+        return ((cred->pc_flags & (PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR)) != 0);
 }
 
-/* usually called upon an UPTODATE cred */
-static inline int ptlrpcs_cred_check_expire(struct ptlrpc_cred *cred)
+#define ptlrpcs_cred_expire(cred)                                       \
+        if (!test_and_set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags)) { \
+                CWARN("cred %p: get expired\n", cred);                  \
+                clear_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags);   \
+        }
+
+static inline int ptlrpcs_cred_check_uptodate(struct ptlrpc_cred *cred)
 {
         LASSERT(atomic_read(&cred->pc_refcount));
+
+        if (!ptlrpcs_cred_is_uptodate(cred))
+                return 1;
+
         if (cred->pc_expire == 0)
                 return 0;
         if (time_after(cred->pc_expire, get_seconds()))
@@ -346,6 +352,7 @@ void ptlrpcs_cli_free_repbuf(struct ptlrpc_request *req);
 /* higher interface */
 int  ptlrpcs_import_get_sec(struct obd_import *imp);
 void ptlrpcs_import_drop_sec(struct obd_import *imp);
+void ptlrpcs_import_flush_creds(struct obd_import *imp, uid_t uid);
 int  ptlrpcs_req_get_cred(struct ptlrpc_request *req);
 void ptlrpcs_req_drop_cred(struct ptlrpc_request *req);
 int  ptlrpcs_req_replace_dead_cred(struct ptlrpc_request *req);
diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h
index c3de92a093344f2bf2c008270a907271bdaa166d..0a9fb94f08beceb8dc0e16c6c5a863999bef3fd4 100644
--- a/lustre/include/lustre/lustre_user.h
+++ b/lustre/include/lustre/lustre_user.h
@@ -64,6 +64,8 @@
 #define LL_IOC_GROUP_LOCK               _IOW ('f', 161, long)
 #define LL_IOC_GROUP_UNLOCK             _IOW ('f', 162, long)
 
+#define LL_IOC_FLUSH_CRED               _IOW ('f', 170, long)
+
 #define O_LOV_DELAY_CREATE    0100000000  /* hopefully this does not conflict */
 
 #define LL_FILE_IGNORE_LOCK   0x00000001
diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c
index b5b96e3507adbdbeaa765fabfe4e0fc1247c353c..d712635e8f4d693e227142f1d37ef92411e9a4e5 100644
--- a/lustre/llite/dir.c
+++ b/lustre/llite/dir.c
@@ -728,6 +728,8 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
                 obd_ioctl_freedata(buf, len);
                 RETURN(rc);
         }
+        case LL_IOC_FLUSH_CRED:
+                RETURN(ll_flush_cred(inode));
         default:
                 return obd_iocontrol(cmd, sbi->ll_dt_exp, 0,
                                      NULL, (void *)arg);
diff --git a/lustre/llite/file.c b/lustre/llite/file.c
index 7505f9da464e09449e28495360170175fb3d7520..ede5dcdb7903ece2a6f1a95d58d0d3118f6726be 100644
--- a/lustre/llite/file.c
+++ b/lustre/llite/file.c
@@ -1462,6 +1462,8 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         case EXT3_IOC_SETVERSION_OLD:
         case EXT3_IOC_SETVERSION:
         */
+        case LL_IOC_FLUSH_CRED:
+                RETURN(ll_flush_cred(inode));
         default:
                 RETURN( obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
                                       (void *)arg) );
diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h
index 25ae7e6b341f14a888e7cf2c3f8e7d90aaf1992c..b2103caa4a3e776a398be1c41dd47a429c21897f 100644
--- a/lustre/llite/llite_internal.h
+++ b/lustre/llite/llite_internal.h
@@ -331,6 +331,7 @@ struct dentry *ll_fh_to_dentry(struct super_block *sb, __u32 *data, int len,
 int ll_dentry_to_fh(struct dentry *, __u32 *datap, int *lenp, int need_parent);
 int null_if_equal(struct ldlm_lock *lock, void *data);
 int ll_process_config_update(struct ll_sb_info *sbi, int clean);
+int ll_flush_cred(struct inode *inode);
 
 /* llite/special.c */
 extern struct inode_operations ll_special_inode_operations;
diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c
index 85508c79dd4d47e67d1270b4ad82a3b2abe6050c..52f4c7fc897aa7849cbe6b4c75696dfd44ea11ac 100644
--- a/lustre/llite/llite_lib.c
+++ b/lustre/llite/llite_lib.c
@@ -1795,3 +1795,31 @@ int ll_get_fid(struct obd_export *exp, struct lustre_id *idp,
 
         return rc;
 }
+
+int ll_flush_cred(struct inode *inode)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        uid_t   uid = current->fsuid;
+        int     rc = 0;
+
+        /* XXX to avoid adding api, we simply use set_info() interface
+         * to notify underlying obds. set_info() is more like a ioctl() now...
+         */
+        if (sbi->ll_md_exp) {
+                rc = obd_set_info(sbi->ll_md_exp,
+                                  strlen("flush_cred"), "flush_cred",
+                                  sizeof(uid), &uid);
+                if (rc)
+                        return rc;
+        }
+
+        if (sbi->ll_dt_exp) {
+                rc = obd_set_info(sbi->ll_dt_exp,
+                                  strlen("flush_cred"), "flush_cred",
+                                  sizeof(uid), &uid);
+                if (rc)
+                        return rc;
+        }
+
+        return rc;
+}
diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c
index ac36396cd73ed4729931e93551b19ff70419ee35..bd74795355f3015da6e973741491f6e1a7248ec6 100644
--- a/lustre/lmv/lmv_obd.c
+++ b/lustre/lmv/lmv_obd.c
@@ -1904,8 +1904,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
 int lmv_set_info(struct obd_export *exp, obd_count keylen,
                  void *key, obd_count vallen, void *val)
 {
-        struct obd_device *obd;
-        struct lmv_obd *lmv;
+        struct lmv_tgt_desc    *tgt;
+        struct obd_device      *obd;
+        struct lmv_obd         *lmv;
         ENTRY;
 
         obd = class_exp2obd(exp);
@@ -1925,7 +1926,6 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen,
         /* maybe this could be default */
         if ((keylen == strlen("sec") && strcmp(key, "sec") == 0) ||
             (keylen == strlen("nllu") && strcmp(key, "nllu") == 0)) {
-                struct lmv_tgt_desc *tgt;
                 struct obd_export *exp;
                 int rc = 0, err, i;
 
@@ -1961,6 +1961,23 @@ int lmv_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(rc);
         }
 
+        if ((keylen == strlen("flush_cred") &&
+             strcmp(key, "flush_cred") == 0)) {
+                int rc = 0, i;
+
+                for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count;
+                     i++, tgt++) {
+                        if (!tgt->ltd_exp)
+                                continue;
+                        rc = obd_set_info(tgt->ltd_exp,
+                                          keylen, key, vallen, val);
+                        if (rc)
+                                RETURN(rc);
+                }
+
+                RETURN(0);
+        }
+
         RETURN(-EINVAL);
 }
 
diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c
index 9baefffa22cf781254f4bec6756d4de8fcade115..152d5d7cb24e06ad529215cdd369ace61ae6b87c 100644
--- a/lustre/lov/lov_obd.c
+++ b/lustre/lov/lov_obd.c
@@ -2137,6 +2137,21 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 spin_unlock(&lov->lov_lock);
 
                 RETURN(rc);
+        } else if (KEY_IS("flush_cred")) {
+                struct lov_tgt_desc *tgt;
+                int rc = 0, i;
+
+                for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count;
+                     i++, tgt++) {
+                        if (!tgt->ltd_exp)
+                                continue;
+                        rc = obd_set_info(tgt->ltd_exp,
+                                          keylen, key, vallen, val);
+                        if (rc)
+                                RETURN(rc);
+                }
+
+                RETURN(0);
         } else {
                 RETURN(-EINVAL);
         }
diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c
index f62cd998be32e0accc92f792c0a422268e2ab9e4..4144ef7ef6ee2ac298af321e910e36dd1b9fe1c0 100644
--- a/lustre/mdc/mdc_request.c
+++ b/lustre/mdc/mdc_request.c
@@ -877,6 +877,14 @@ int mdc_set_info(struct obd_export *exp, obd_count keylen,
                 }
                 CERROR("unrecognized security type %s\n", (char*) val);
                 rc = -EINVAL;
+        } else if (keylen == strlen("flush_cred") &&
+                   memcmp(key, "flush_cred", keylen) == 0) {
+                struct client_obd *cli = &exp->exp_obd->u.cli;
+
+                if (cli->cl_import)
+                        ptlrpcs_import_flush_creds(cli->cl_import,
+                                                   *((uid_t *) val));
+                rc = 0;
         } else if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
                 struct client_obd *cl = &exp->exp_obd->u.cli;
                 if (vallen != sizeof(int))
diff --git a/lustre/mds/mds_lsd.c b/lustre/mds/mds_lsd.c
index 9a4066ef99683d26914d987fb0818ac5eea83607..3e7bd1240145ffc918ef481a64d9c1ff46fc48ed 100644
--- a/lustre/mds/mds_lsd.c
+++ b/lustre/mds/mds_lsd.c
@@ -136,8 +136,7 @@ static int lsd_make_upcall(struct upcall_cache *cache,
                        "/proc/fs/lustre/mds/lsd_upcall\n",
                        argv[0], argv[1], rc);
         } else {
-                CWARN("Invoked upcall %s %s\n",
-                        argv[0], argv[1]);
+                CDEBUG(D_SEC, "Invoked upcall %s %s\n", argv[0], argv[1]);
         }
         RETURN(rc);
 }
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index bd0924c990db3d953eb89fea2ba2802d8834a84e..12a42e52edfa1691c0c8d8b841960c9e09b8491d 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -2935,6 +2935,16 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(-EINVAL);
         }
 
+        if (keylen == strlen("flush_cred") &&
+            memcmp(key, "flush_cred", keylen) == 0) {
+                struct client_obd *cli = &exp->exp_obd->u.cli;
+
+                if (cli->cl_import)
+                        ptlrpcs_import_flush_creds(cli->cl_import,
+                                                   *((uid_t *) val));
+                RETURN(0);
+        }
+
         if (keylen < strlen("mds_conn") ||
             memcmp(key, "mds_conn", keylen) != 0)
                 RETURN(-EINVAL);
diff --git a/lustre/sec/gss/sec_gss.c b/lustre/sec/gss/sec_gss.c
index ad7bfe8a57fbcef8d1f02cd6b1b12299a1f34c60..d5f83c160afad8bb82ecf7be0e8df8016924f618 100644
--- a/lustre/sec/gss/sec_gss.c
+++ b/lustre/sec/gss/sec_gss.c
@@ -88,7 +88,7 @@ struct rpc_clnt;
  **********************************************/
 
 #define SECINIT_RPC_TIMEOUT     (30)
-#define SECFINI_RPC_TIMEOUT     (30)
+#define SECFINI_RPC_TIMEOUT     (10)
 
 static int secinit_compose_request(struct obd_import *imp,
                                    char *buf, int bufsize,
@@ -577,7 +577,7 @@ void gss_cred_set_ctx(struct ptlrpc_cred *cred, struct gss_cl_ctx *ctx)
         write_lock(&gss_ctx_lock);
         old = gcred->gc_ctx;
         gcred->gc_ctx = ctx;
-        cred->pc_flags |= PTLRPC_CRED_UPTODATE;
+        set_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags);
         write_unlock(&gss_ctx_lock);
         if (old)
                 gss_put_ctx(old);
@@ -715,6 +715,8 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         int                         res;
         ENTRY;
 
+        might_sleep();
+
         /* any flags means it has been handled, do nothing */
         if (cred->pc_flags & PTLRPC_CRED_FLAGS_MASK)
                 RETURN(0);
@@ -780,7 +782,7 @@ again:
          * administrator via lctl etc.
          */
         if (cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) {
-                CWARN("cred %p("LPU64"/%u) was set flags %x unexpectedly\n",
+                CWARN("cred %p("LPU64"/%u) was set flags %lx unexpectedly\n",
                       cred, cred->pc_pag, cred->pc_uid, cred->pc_flags);
                 cred->pc_flags |= PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR;
                 gss_unhash_msg_nolock(gss_new);
@@ -839,7 +841,7 @@ waiting:
                 res = -EINTR;
         } else if (res == 0) {
                 CERROR("cred %p: upcall timedout\n", cred);
-                cred->pc_flags |= PTLRPC_CRED_DEAD;
+                set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
                 res = -ETIMEDOUT;
         } else
                 res = 0;
@@ -906,7 +908,7 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         if (rc || gss_err) {
                 CERROR("parse init downcall: rpc %d, gss 0x%x\n", rc, gss_err);
                 if (rc != -ERESTART || gss_err != 0)
-                        cred->pc_flags |= PTLRPC_CRED_ERROR;
+                        set_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags);
                 if (rc == 0)
                         rc = -EPERM;
                 goto err_out;
@@ -918,7 +920,7 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
 
         return 0;
 err_out:
-        cred->pc_flags |= PTLRPC_CRED_DEAD;
+        set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
         return rc;
 }
 #endif
@@ -1327,21 +1329,24 @@ static void destroy_gss_context(struct ptlrpc_cred *cred)
         int                      replen, rc;
         ENTRY;
 
-        /* cred's refcount is 0, steal one */
-        atomic_inc(&cred->pc_refcount);
+        imp = cred->pc_sec->ps_import;
+        LASSERT(imp);
 
-        if (!(cred->pc_flags & PTLRPC_CRED_UPTODATE)) {
+        if (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags) ||
+            !test_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags)) {
                 CDEBUG(D_SEC, "Destroy dead cred %p(%u@%s)\n",
                        cred, cred->pc_uid, imp->imp_target_uuid.uuid);
-                atomic_dec(&cred->pc_refcount);
                 EXIT;
                 return;
         }
 
+        might_sleep();
+
+        /* cred's refcount is 0, steal one */
+        atomic_inc(&cred->pc_refcount);
+
         gcred = container_of(cred, struct gss_cred, gc_base);
         gcred->gc_ctx->gc_proc = PTLRPC_GSS_PROC_DESTROY;
-        imp = cred->pc_sec->ps_import;
-        LASSERT(imp);
 
         CDEBUG(D_SEC, "client destroy gss cred %p(%u@%s)\n",
                gcred, cred->pc_uid, imp->imp_target_uuid.uuid);
@@ -1528,12 +1533,12 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
         }
 
         if (err || gss_err) {
-                cred->pc_flags |= PTLRPC_CRED_DEAD;
+                set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
                 if (err != -ERESTART || gss_err != 0)
-                        cred->pc_flags |= PTLRPC_CRED_ERROR;
+                        set_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags);
                 CERROR("cred %p: rpc err %d, gss err 0x%x, fatal %d\n",
-                        cred, err, gss_err,
-                        ((cred->pc_flags & PTLRPC_CRED_ERROR) != 0));
+                       cred, err, gss_err,
+                       (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags) != 0));
         } else {
                 CDEBUG(D_SEC, "get initial ctx:\n");
                 gss_cred_set_ctx(cred, ctx);
@@ -1768,7 +1773,7 @@ struct ptlrpc_cred * gss_create_cred(struct ptlrpc_sec *sec,
         atomic_set(&cred->pc_refcount, 0);
         cred->pc_sec = sec;
         cred->pc_ops = &gss_credops;
-        cred->pc_expire = get_seconds() + GSS_CRED_EXPIRE;
+        cred->pc_expire = 0;
         cred->pc_flags = 0;
         cred->pc_pag = vcred->vc_pag;
         cred->pc_uid = vcred->vc_uid;
diff --git a/lustre/sec/sec.c b/lustre/sec/sec.c
index 070724ccd2f6b0e13b8c722b0d1a826add6ab5be..5701a7d37bbe6bb24bc2434e38a7e08ba3678007 100644
--- a/lustre/sec/sec.c
+++ b/lustre/sec/sec.c
@@ -135,16 +135,29 @@ void ptlrpcs_init_credcache(struct ptlrpc_sec *sec)
         sec->ps_nextgc = get_seconds() + (sec->ps_expire >> 1);
 }
 
-static void ptlrpcs_cred_destroy(struct ptlrpc_cred *cred)
+/*
+ * return 1 means we should also destroy the sec structure.
+ * normally return 0
+ */
+static int ptlrpcs_cred_destroy(struct ptlrpc_cred *cred)
 {
         struct ptlrpc_sec *sec = cred->pc_sec;
+        int rc = 0;
 
         LASSERT(cred->pc_sec);
         LASSERT(atomic_read(&cred->pc_refcount) == 0);
         LASSERT(list_empty(&cred->pc_hash));
 
         cred->pc_ops->destroy(cred);
-        atomic_dec(&sec->ps_credcount);
+
+        /* spinlock to protect against ptlrpcs_sec_put() */
+        LASSERT(atomic_read(&sec->ps_credcount));
+        spin_lock(&sec->ps_lock);
+        if (atomic_dec_and_test(&sec->ps_credcount) &&
+            !atomic_read(&sec->ps_refcount))
+                rc = 1;
+        spin_unlock(&sec->ps_lock);
+        return rc;
 }
 
 static void ptlrpcs_destroy_credlist(struct list_head *head)
@@ -159,29 +172,55 @@ static void ptlrpcs_destroy_credlist(struct list_head *head)
 }
 
 static
-int ptlrpcs_cred_unlink_expired(struct ptlrpc_cred *cred,
-                                struct list_head *freelist)
+int cred_check_dead(struct ptlrpc_cred *cred,
+                    struct list_head *freelist, int removal)
 {
-        LASSERT(cred->pc_sec);
+        /* here we do the exact thing as asked. but an alternative
+         * way is remove dead entries immediately without be asked
+         * remove, since dead entry will not lead to further rpcs.
+         */
+        if (unlikely(ptlrpcs_cred_is_dead(cred))) {
+                /* don't try to destroy a busy entry */
+                if (atomic_read(&cred->pc_refcount))
+                        return 1;
+                goto out;
+        }
 
-        /* only unlink non-busy entries */
+        /* a busy non-dead entry is considered as "good" one.
+         * Note in a very busy client where cred always busy, we
+         * will not be able to find the expire here, but some other
+         * part will, e.g. checking during refresh, or got error
+         * notification from server, etc. We don't touch busy cred
+         * here is because a busy cred's flag might be changed at
+         * anytime by the owner, we don't want to compete with them.
+         */
         if (atomic_read(&cred->pc_refcount) != 0)
                 return 0;
+
         /* expire is 0 means never expire. a newly created gss cred
          * which during upcall also has 0 expiration
          */
         if (cred->pc_expire == 0)
                 return 0;
+
         /* check real expiration */
         if (time_after(cred->pc_expire, get_seconds()))
                 return 0;
 
-        LASSERT((cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) ==
-                PTLRPC_CRED_UPTODATE);
-        CWARN("cred %p: get expired, unlink\n", cred);
+        /* although we'v checked the bit right above, there's still
+         * possibility that somebody else set the bit elsewhere.
+         */
+        ptlrpcs_cred_expire(cred);
+
+out:
+        if (removal) {
+                LASSERT(atomic_read(&cred->pc_refcount) >= 0);
+                LASSERT(cred->pc_sec);
+                LASSERT(spin_is_locked(&cred->pc_sec->ps_lock));
+                LASSERT(freelist);
 
-        list_del(&cred->pc_hash);
-        list_add(&cred->pc_hash, freelist);
+                list_move(&cred->pc_hash, freelist);
+        }
         return 1;
 }
 
@@ -196,37 +235,38 @@ void ptlrpcs_credcache_gc(struct ptlrpc_sec *sec,
         CDEBUG(D_SEC, "do gc on sec %s\n", sec->ps_type->pst_name);
         for (i = 0; i < PTLRPC_CREDCACHE_NR; i++) {
                 list_for_each_entry_safe(cred, n, &sec->ps_credcache[i],
-                                         pc_hash) {
-                        if (cred->pc_flags & (PTLRPC_CRED_DEAD |
-                                              PTLRPC_CRED_ERROR)) {
-                                LASSERT(atomic_read(&cred->pc_refcount));
-                                continue;
-                        }
-                        ptlrpcs_cred_unlink_expired(cred, freelist);
-                }
+                                         pc_hash)
+                        cred_check_dead(cred, freelist, 1);
         }
         sec->ps_nextgc = get_seconds() + sec->ps_expire;
         EXIT;
 }
 
 /*
- * grace: mark cred DEAD, allow graceful destroy like notify
- *        server side, etc.
- * force: flush all entries, otherwise only free ones be flushed.
+ * @uid: which user. "-1" means flush all.
+ * @grace: mark cred DEAD, allow graceful destroy like notify
+ *         server side, etc.
+ * @force: flush all entries, otherwise only free ones be flushed.
  */
 static
-int ptlrpcs_flush_credcache(struct ptlrpc_sec *sec, int grace, int force)
+int flush_credcache(struct ptlrpc_sec *sec, uid_t uid,
+                    int grace, int force)
 {
         struct ptlrpc_cred *cred, *n;
         LIST_HEAD(freelist);
         int i, busy = 0;
         ENTRY;
 
+        might_sleep_if(grace);
+
         spin_lock(&sec->ps_lock);
         for (i = 0; i < PTLRPC_CREDCACHE_NR; i++) {
                 list_for_each_entry_safe(cred, n, &sec->ps_credcache[i],
                                          pc_hash) {
                         LASSERT(atomic_read(&cred->pc_refcount) >= 0);
+
+                        if (uid != -1 && uid != cred->pc_uid)
+                                continue;
                         if (atomic_read(&cred->pc_refcount)) {
                                 busy = 1;
                                 if (!force)
@@ -235,12 +275,14 @@ int ptlrpcs_flush_credcache(struct ptlrpc_sec *sec, int grace, int force)
                         } else
                                 list_move(&cred->pc_hash, &freelist);
 
-                        cred->pc_flags |= PTLRPC_CRED_DEAD;
+                        set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
                         if (!grace)
-                                cred->pc_flags &= ~PTLRPC_CRED_UPTODATE;
+                                clear_bit(PTLRPC_CRED_UPTODATE_BIT,
+                                          &cred->pc_flags);
                 }
         }
         spin_unlock(&sec->ps_lock);
+
         ptlrpcs_destroy_credlist(&freelist);
         RETURN(busy);
 }
@@ -256,33 +298,32 @@ int ptlrpcs_cred_get_hash(__u64 pag)
         return (pag & PTLRPC_CREDCACHE_MASK);
 }
 
+/*
+ * return an uptodate or newly created cred entry.
+ */
 static
 struct ptlrpc_cred * cred_cache_lookup(struct ptlrpc_sec *sec,
                                        struct vfs_cred *vcred,
-                                       int create)
+                                       int create, int remove_dead)
 {
         struct ptlrpc_cred *cred, *new = NULL, *n;
         LIST_HEAD(freelist);
         int hash, found = 0;
         ENTRY;
 
+        might_sleep();
+
         hash = ptlrpcs_cred_get_hash(vcred->vc_pag);
 
 retry:
         spin_lock(&sec->ps_lock);
+
         /* do gc if expired */
-        if (time_after(get_seconds(), sec->ps_nextgc))
+        if (remove_dead && time_after(get_seconds(), sec->ps_nextgc))
                 ptlrpcs_credcache_gc(sec, &freelist);
 
         list_for_each_entry_safe(cred, n, &sec->ps_credcache[hash], pc_hash) {
-                /* for DEAD and ERROR entries, its final put will
-                 * release them, so we simply skip here.
-                 */
-                if (cred->pc_flags & (PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR)) {
-                        LASSERT(atomic_read(&cred->pc_refcount));
-                        continue;
-                }
-                if (ptlrpcs_cred_unlink_expired(cred, &freelist))
+                if (cred_check_dead(cred, &freelist, remove_dead))
                         continue;
                 if (cred->pc_ops->match(cred, vcred)) {
                         found = 1;
@@ -327,7 +368,7 @@ struct ptlrpc_cred * ptlrpcs_cred_lookup(struct ptlrpc_sec *sec,
         struct ptlrpc_cred *cred;
         ENTRY;
 
-        cred = cred_cache_lookup(sec, vcred, 0);
+        cred = cred_cache_lookup(sec, vcred, 0, 1);
         RETURN(cred);
 }
 
@@ -342,7 +383,7 @@ static struct ptlrpc_cred *get_cred(struct ptlrpc_sec *sec)
         vcred.vc_pag = (__u64) current->uid;
         vcred.vc_uid = current->uid;
 
-        return cred_cache_lookup(sec, &vcred, 1);
+        return cred_cache_lookup(sec, &vcred, 1, 1);
 }
 
 int ptlrpcs_req_get_cred(struct ptlrpc_request *req)
@@ -373,19 +414,19 @@ int ptlrpcs_check_cred(struct obd_import *imp)
         struct ptlrpc_cred *cred;
         ENTRY;
 
+        might_sleep();
 again:
         cred = get_cred(imp->imp_sec);
         if (!cred)
                 RETURN(0);
 
         if (ptlrpcs_cred_is_uptodate(cred)) {
-                if (!ptlrpcs_cred_check_expire(cred)) {
-                        ptlrpcs_cred_put(cred, 1);
-                        RETURN(0);
-                } else {
-                        ptlrpcs_cred_put(cred, 1);
-                        goto again;
-                }
+                /* get_cred() has done expire checking, so we don't
+                 * expect it could expire so quickly, and actually
+                 * we don't care.
+                 */
+                ptlrpcs_cred_put(cred, 1);
+                RETURN(0);
         }
 
         ptlrpcs_cred_refresh(cred);
@@ -415,27 +456,51 @@ void ptlrpcs_cred_put(struct ptlrpc_cred *cred, int sync)
 {
         struct ptlrpc_sec *sec = cred->pc_sec;
 
-        LASSERT(cred);
         LASSERT(sec);
         LASSERT(atomic_read(&cred->pc_refcount));
 
         spin_lock(&sec->ps_lock);
-        if (atomic_dec_and_test(&cred->pc_refcount) && sync &&
-            cred->pc_flags & (PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR)) {
+
+        /* this has to be protected by ps_lock, because cred cache
+         * management code might increase ref against a 0-refed cred.
+         */
+        if (!atomic_dec_and_test(&cred->pc_refcount)) {
+                spin_unlock(&sec->ps_lock);
+                return;
+        }
+
+        /* if sec already unused, we have to destroy the cred (prevent it
+         * hanging there for ever)
+         */
+        if (atomic_read(&sec->ps_refcount) == 0) {
+                if (!test_and_set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags))
+                        CWARN("cred %p: force expire on a unused sec\n", cred);
+                list_del_init(&cred->pc_hash);
+        } else if (unlikely(sync && ptlrpcs_cred_is_dead(cred)))
                 list_del_init(&cred->pc_hash);
-                ptlrpcs_cred_destroy(cred);
-                if (!atomic_read(&sec->ps_credcount) &&
-                    !atomic_read(&sec->ps_refcount)) {
-                        CWARN("put last cred on a dead sec %p(%s), "
-                              "also destroy the sec\n", sec,
-                               sec->ps_type->pst_name);
-                        spin_unlock(&sec->ps_lock);
 
-                        ptlrpcs_sec_destroy(sec);
-                        return;
-                }
+        if (!list_empty(&cred->pc_hash)) {
+                spin_unlock(&sec->ps_lock);
+                return;
         }
+
+        /* if required async, and we reached here, we have to clear
+         * the UPTODATE bit, thus no rpc is needed in destroy procedure.
+         */
+        if (!sync)
+                clear_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags);
+
         spin_unlock(&sec->ps_lock);
+
+        /* destroy this cred */
+        if (!ptlrpcs_cred_destroy(cred))
+                return;
+
+        LASSERT(!atomic_read(&sec->ps_credcount));
+        LASSERT(!atomic_read(&sec->ps_refcount));
+
+        CWARN("sec %p(%s), put last cred, also destroy the sec\n",
+              sec, sec->ps_type->pst_name);
 }
 
 void ptlrpcs_req_drop_cred(struct ptlrpc_request *req)
@@ -446,10 +511,8 @@ void ptlrpcs_req_drop_cred(struct ptlrpc_request *req)
         LASSERT(req->rq_cred);
 
         if (req->rq_cred) {
-                /* We'd like to not use 'sync' mode, but might cause
-                 * some cred leak. Need more thinking here. FIXME
-                 */
-                ptlrpcs_cred_put(req->rq_cred, 1);
+                /* this could be called with spinlock hold, use async mode */
+                ptlrpcs_cred_put(req->rq_cred, 0);
                 req->rq_cred = NULL;
         } else
                 CDEBUG(D_SEC, "req %p have no cred\n", req);
@@ -467,7 +530,7 @@ int ptlrpcs_req_replace_dead_cred(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(cred);
-        LASSERT(cred->pc_flags & PTLRPC_CRED_DEAD);
+        LASSERT(test_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags));
 
         ptlrpcs_cred_get(cred);
         ptlrpcs_req_drop_cred(req);
@@ -486,7 +549,10 @@ int ptlrpcs_req_replace_dead_cred(struct ptlrpc_request *req)
 
 /*
  * since there's no lock on the cred, its status could be changed
- * by other threads at any time, we allow this race.
+ * by other threads at any time, we allow this race. If an uptodate
+ * cred turn to dead quickly under us, we don't know and continue
+ * using it, that's fine. if necessary the later error handling code
+ * will catch it.
  */
 int ptlrpcs_req_refresh_cred(struct ptlrpc_request *req)
 {
@@ -495,17 +561,15 @@ int ptlrpcs_req_refresh_cred(struct ptlrpc_request *req)
 
         LASSERT(cred);
 
-        if (ptlrpcs_cred_is_uptodate(cred)) {
-                if (!ptlrpcs_cred_check_expire(cred))
-                        RETURN(0);
-        }
+        if (!ptlrpcs_cred_check_uptodate(cred))
+                RETURN(0);
 
-        if (cred->pc_flags & PTLRPC_CRED_ERROR) {
+        if (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags)) {
                 req->rq_ptlrpcs_err = 1;
                 RETURN(-EPERM);
         }
 
-        if (cred->pc_flags & PTLRPC_CRED_DEAD) {
+        if (test_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags)) {
                 if (ptlrpcs_req_replace_dead_cred(req) == 0) {
                         LASSERT(cred != req->rq_cred);
                         CDEBUG(D_SEC, "req %p: replace cred %p => %p\n",
@@ -521,8 +585,9 @@ int ptlrpcs_req_refresh_cred(struct ptlrpc_request *req)
         }
 
         ptlrpcs_cred_refresh(cred);
+
         if (!ptlrpcs_cred_is_uptodate(cred)) {
-                if (cred->pc_flags & PTLRPC_CRED_ERROR)
+                if (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags))
                         req->rq_ptlrpcs_err = 1;
 
                 CERROR("req %p: failed to refresh cred %p, fatal %d\n",
@@ -712,10 +777,17 @@ static void ptlrpcs_sec_destroy(struct ptlrpc_sec *sec)
 
 void ptlrpcs_sec_put(struct ptlrpc_sec *sec)
 {
+        int ncred;
+
         if (atomic_dec_and_test(&sec->ps_refcount)) {
-                ptlrpcs_flush_credcache(sec, 1, 1);
+                flush_credcache(sec, -1, 1, 1);
 
-                if (atomic_read(&sec->ps_credcount) == 0) {
+                /* this spinlock is protect against ptlrpcs_cred_destroy() */
+                spin_lock(&sec->ps_lock);
+                ncred = atomic_read(&sec->ps_credcount);
+                spin_unlock(&sec->ps_lock);
+
+                if (ncred == 0) {
                         ptlrpcs_sec_destroy(sec);
                 } else {
                         CWARN("sec %p(%s) is no usage while %d cred still "
@@ -728,7 +800,7 @@ void ptlrpcs_sec_put(struct ptlrpc_sec *sec)
 
 void ptlrpcs_sec_invalidate_cache(struct ptlrpc_sec *sec)
 {
-        ptlrpcs_flush_credcache(sec, 0, 1);
+        flush_credcache(sec, -1, 0, 1);
 }
 
 int sec_alloc_reqbuf(struct ptlrpc_sec *sec,
@@ -952,6 +1024,16 @@ void ptlrpcs_import_drop_sec(struct obd_import *imp)
         EXIT;
 }
 
+void ptlrpcs_import_flush_creds(struct obd_import *imp, uid_t uid)
+{
+        LASSERT(imp);
+
+        class_import_get(imp);
+        if (imp->imp_sec)
+                flush_credcache(imp->imp_sec, uid, 1, 1);
+        class_import_put(imp);
+}
+
 int __init ptlrpc_sec_init(void)
 {
         int rc;
@@ -986,6 +1068,7 @@ EXPORT_SYMBOL(ptlrpcs_sec_put);
 EXPORT_SYMBOL(ptlrpcs_sec_invalidate_cache);
 EXPORT_SYMBOL(ptlrpcs_import_get_sec);
 EXPORT_SYMBOL(ptlrpcs_import_drop_sec);
+EXPORT_SYMBOL(ptlrpcs_import_flush_creds);
 EXPORT_SYMBOL(ptlrpcs_cred_lookup);
 EXPORT_SYMBOL(ptlrpcs_cred_put);
 EXPORT_SYMBOL(ptlrpcs_req_get_cred);
diff --git a/lustre/sec/sec_null.c b/lustre/sec/sec_null.c
index 3761be50ffd9faf8755e7c322b95979fe5fa3129..a05485c9a78ad240306c9790dec9590dd1b1378a 100644
--- a/lustre/sec/sec_null.c
+++ b/lustre/sec/sec_null.c
@@ -38,27 +38,23 @@
 
 static int null_cred_refresh(struct ptlrpc_cred *cred)
 {
-        ENTRY;
-        LASSERT(cred->pc_flags & PTLRPC_CRED_UPTODATE);
-        RETURN(0);
+        LASSERT(test_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags));
+        return 0;
 }
 
 static int null_cred_match(struct ptlrpc_cred *cred,
                            struct vfs_cred *vcred)
 {
-        ENTRY;
-        RETURN(1);
+        return 1;
 }
 
 static int null_cred_sign(struct ptlrpc_cred *cred,
                           struct ptlrpc_request *req)
 {
         struct ptlrpcs_wire_hdr *hdr = buf_to_sec_hdr(req->rq_reqbuf);
-        ENTRY;
 
         hdr->sec_len = cpu_to_le32(0);
-
-        RETURN(0);
+        return 0;
 }
 
 static int null_cred_verify(struct ptlrpc_cred *cred,
@@ -68,7 +64,7 @@ static int null_cred_verify(struct ptlrpc_cred *cred,
 
         if (hdr->sec_len != 0) {
                 CERROR("security payload %u not zero\n", hdr->sec_len);
-                RETURN(-EPROTO);
+                return -EPROTO;
         }
 
         req->rq_repmsg = (struct lustre_msg *)(hdr + 1);
@@ -76,7 +72,7 @@ static int null_cred_verify(struct ptlrpc_cred *cred,
         CDEBUG(D_SEC, "set repmsg at %p, len %d\n",
                req->rq_repmsg, req->rq_replen);
 
-        RETURN(0);
+        return 0;
 }
 
 static void null_cred_destroy(struct ptlrpc_cred *cred)
@@ -145,7 +141,7 @@ struct ptlrpc_cred* null_create_cred(struct ptlrpc_sec *sec,
         atomic_set(&cred->pc_refcount, 0);
         cred->pc_sec = sec;
         cred->pc_ops = &null_credops;
-        cred->pc_expire = (-1UL >> 1); /* never expire */
+        cred->pc_expire = 0;
         cred->pc_flags = PTLRPC_CRED_UPTODATE;
         cred->pc_pag = vcred->vc_pag;
         cred->pc_uid = vcred->vc_uid;
diff --git a/lustre/utils/lctl.c b/lustre/utils/lctl.c
index e2e9e7375984cde88c087d90600f6734f73e8c5f..8ff3dcc27ce65b120078e9ea442a7e1eae1ab621 100644
--- a/lustre/utils/lctl.c
+++ b/lustre/utils/lctl.c
@@ -25,8 +25,12 @@
 
 
 
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 #include <stdlib.h>
 #include <stdio.h>
+#include <errno.h>
 #include <portals/ptlctl.h>
 #include "obdctl.h"
 #include "parser.h"
@@ -45,6 +49,75 @@ static int jt_opt_ignore_errors(int argc, char **argv) {
         return 0;
 }
 
+/*
+ * XXX Should not belong to here
+ */
+static int flush_cred_ioctl(char *mp)
+{
+        int fd, rc;
+
+        fd = open(mp, O_RDONLY);
+        if (fd == -1) {
+                fprintf(stderr, "flush_cred_ioctl: error open %s: %s\n",
+                        mp, strerror(errno));
+                return -1;
+        }
+
+        rc = ioctl(fd, LL_IOC_FLUSH_CRED);
+        if (rc == -1) {
+                fprintf(stderr, "flush_cred_ioctl: error ioctl %s: %s\n",
+                        mp, strerror(errno));
+        }
+
+        close(fd);
+        return rc;
+}
+
+static int jt_flush_cred(int argc, char **argv)
+{
+        FILE   *proc;
+        char    procline[PATH_MAX], *line;
+        int     i, rc = 0;
+
+        /* no args means search all lustre mountpoints */
+        if (argc < 2) {
+                proc = fopen("/proc/mounts", "r");
+                if (!proc) {
+                        fprintf(stderr, "%s: can't open /proc/mounts\n",
+                                jt_cmdname(argv[0]));
+                        return -1;
+                }
+
+                while ((line = fgets(procline, PATH_MAX, proc)) != NULL) {
+                        char dev[PATH_MAX];
+                        char mp[PATH_MAX];
+                        char fs[PATH_MAX];
+
+                        if (sscanf(line, "%s %s %s", dev, mp, fs) != 3) {
+                                fprintf(stderr, "%s: unexpected format in "
+                                                "/proc/mounts\n",
+                                        jt_cmdname(argv[0]));
+                                return -1;
+                        }
+
+                        if (strcmp(fs, "lustre") &&
+                            strcmp(fs, "lustre_lite"))
+                                continue;
+
+                        if (flush_cred_ioctl(mp))
+                                rc = -1;
+                }
+        } else {
+                /* follow the exact flush sequence as supplied */
+                for (i = 1; i < argc; i++) {
+                        if (flush_cred_ioctl(argv[i]))
+                                rc = -1;
+                }
+        }
+
+        return rc;
+}
+
 command_t cmdlist[] = {
         /* Metacommands */
         {"--device", jt_opt_device, 0,
@@ -261,6 +334,10 @@ command_t cmdlist[] = {
          "usage: lsync\n"},  
         {"cache_off", jt_obd_cache_off, 0,
          "usage: lsync\n"},  
+/*
+        {"obd_flush_cred", jt_obd_flush_cred, 0,
+         "usage: obd_flush_cred [all]\n"},
+*/
         /*snap operations*/
         {"snap_add", jt_obd_snap_add, 0, 
          "usage: snap_add <dev_name> <snap_name>\n"}, 
@@ -289,6 +366,11 @@ command_t cmdlist[] = {
          "remove one log from catalog, erase it from disk.\n"
          "usage: llog_remove <catalog id|catalog name> <log id>"},
 
+        /* Misc commands */
+        {"flush_cred", jt_flush_cred, 0,
+         "flush the client side credential.\n"
+         "usage: flush_cred [mountpoint]..."},
+
         /* Debug commands */
         {"======== debug =========", jt_noop, 0, "debug"},
         {"debug_daemon", jt_dbg_debug_daemon, 0,