diff --git a/lustre/include/lustre_cfg.h b/lustre/include/lustre_cfg.h
index 89c5447543479887d8dd01201fb124b5d4ad2409..54325193e07bb51a16cd86d7e8efc1b6455169af 100644
--- a/lustre/include/lustre_cfg.h
+++ b/lustre/include/lustre_cfg.h
@@ -58,7 +58,7 @@ enum lcfg_command_type {
         LCFG_LOV_ADD_INA    = 0x00ce013,
         LCFG_ADD_MDC        = 0x00cf014,
         LCFG_DEL_MDC        = 0x00cf015,
-        LCFG_SEC_FLAVOR     = 0x00ce016,
+        LCFG_SPTLRPC_CONF   = 0x00cf016,
 };
 
 struct lustre_cfg_bufs {
@@ -256,9 +256,8 @@ static inline int lustre_cfg_sanity_check(void *buf, int len)
         RETURN(0);
 }
 
-/* default value for nllu/nllg for llite */
-#define NOBODY_UID      99
-#define NOBODY_GID      99
+#include <lustre/lustre_user.h>
+
 #define INVALID_UID     (-1)
 
 #endif // _LUSTRE_CFG_H
diff --git a/lustre/ptlrpc/gss/gss_bulk.c b/lustre/ptlrpc/gss/gss_bulk.c
index 7d4864c49d0a49666bfdb638726feb588eeb040a..9f829adca1180950b73900e86b184bea9c36bc6d 100644
--- a/lustre/ptlrpc/gss/gss_bulk.c
+++ b/lustre/ptlrpc/gss/gss_bulk.c
@@ -57,6 +57,7 @@ int do_bulk_privacy(struct gss_ctx *gctx,
 {
         struct crypto_tfm  *tfm;
         struct scatterlist  sg, sg2, *sgd;
+        unsigned int        blksize;
         int                 i, rc;
         __u8                local_iv[sizeof(bsd->bsd_iv)];
 
@@ -68,12 +69,6 @@ int do_bulk_privacy(struct gss_ctx *gctx,
         if (alg == BULK_PRIV_ALG_NULL)
                 return 0;
 
-        if (encrypt)
-                get_random_bytes(bsd->bsd_iv, sizeof(bsd->bsd_iv));
-
-        /* compute the secret iv */
-        lgss_plain_encrypt(gctx, sizeof(local_iv), bsd->bsd_iv, local_iv);
-
         tfm = crypto_alloc_tfm(sptlrpc_bulk_priv_alg2name(alg),
                                sptlrpc_bulk_priv_alg2flags(alg));
         if (tfm == NULL) {
@@ -82,12 +77,25 @@ int do_bulk_privacy(struct gss_ctx *gctx,
                 return -ENOMEM;
         }
 
+        blksize = crypto_tfm_alg_blocksize(tfm);
+        LASSERT(blksize <= sizeof(local_iv));
+
+        if (encrypt)
+                get_random_bytes(bsd->bsd_iv, sizeof(bsd->bsd_iv));
+
+        /* compute the secret iv */
+        rc = lgss_plain_encrypt(gctx, 0,
+                                sizeof(local_iv), bsd->bsd_iv, local_iv);
+        if (rc) {
+                CERROR("failed to compute secret iv: %d\n", rc);
+                goto out;
+        }
+
         rc = crypto_cipher_setkey(tfm, local_iv, sizeof(local_iv));
         if (rc) {
                 CERROR("Failed to set key for TFM %s: %d\n",
                        sptlrpc_bulk_priv_alg2name(alg), rc);
-                crypto_free_tfm(tfm);
-                return rc;
+                goto out;
         }
 
         for (i = 0; i < desc->bd_iov_count; i++) {
@@ -119,12 +127,12 @@ int do_bulk_privacy(struct gss_ctx *gctx,
                  */
         }
 
-        crypto_free_tfm(tfm);
-
         if (encrypt)
                 bsd->bsd_priv_alg = alg;
 
-        return 0;
+out:
+        crypto_free_tfm(tfm);
+        return rc;
 }
 
 int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
@@ -134,14 +142,13 @@ int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
         struct gss_cli_ctx              *gctx;
         struct lustre_msg               *msg;
         struct ptlrpc_bulk_sec_desc     *bsdr;
-        struct sec_flavor_config        *conf;
         int                              offset, rc;
         ENTRY;
 
-        LASSERT(SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor));
+        LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-        switch (SEC_FLAVOR_SVC(req->rq_sec_flavor)) {
+        switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
                 LASSERT(req->rq_reqbuf->lm_bufcount >= 3);
                 msg = req->rq_reqbuf;
@@ -163,23 +170,22 @@ int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
         }
 
         /* make checksum */
-        conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
-        rc = bulk_csum_cli_request(desc, req->rq_bulk_read, conf->sfc_bulk_csum,
-                                   msg, offset);
+        rc = bulk_csum_cli_request(desc, req->rq_bulk_read,
+                                   req->rq_flvr.sf_bulk_csum, msg, offset);
         if (rc) {
                 CERROR("client bulk %s: failed to generate checksum: %d\n",
                        req->rq_bulk_read ? "read" : "write", rc);
                 RETURN(rc);
         }
 
-        if (conf->sfc_bulk_priv == BULK_PRIV_ALG_NULL)
+        if (req->rq_flvr.sf_bulk_priv == BULK_PRIV_ALG_NULL)
                 RETURN(0);
 
         /* previous bulk_csum_cli_request() has verified bsdr is good */
         bsdr = lustre_msg_buf(msg, offset, 0);
 
         if (req->rq_bulk_read) {
-                bsdr->bsd_priv_alg = conf->sfc_bulk_priv;
+                bsdr->bsd_priv_alg = req->rq_flvr.sf_bulk_priv;
                 RETURN(0);
         }
 
@@ -194,7 +200,7 @@ int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
         LASSERT(gctx->gc_mechctx);
 
         rc = do_bulk_privacy(gctx->gc_mechctx, desc, 1,
-                             conf->sfc_bulk_priv, bsdr);
+                             req->rq_flvr.sf_bulk_priv, bsdr);
         if (rc)
                 CERROR("bulk write: client failed to encrypt pages\n");
 
@@ -211,10 +217,10 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
         int                              roff, voff, rc;
         ENTRY;
 
-        LASSERT(SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor));
+        LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-        switch (SEC_FLAVOR_SVC(req->rq_sec_flavor)) {
+        switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
                 vmsg = req->rq_repbuf;
                 voff = vmsg->lm_bufcount - 1;
@@ -286,6 +292,7 @@ int gss_svc_unwrap_bulk(struct ptlrpc_request *req,
         ENTRY;
 
         LASSERT(req->rq_svc_ctx);
+        LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_write);
 
         grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
@@ -322,6 +329,7 @@ int gss_svc_wrap_bulk(struct ptlrpc_request *req,
         ENTRY;
 
         LASSERT(req->rq_svc_ctx);
+        LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read);
 
         grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
diff --git a/lustre/ptlrpc/gss/gss_internal.h b/lustre/ptlrpc/gss/gss_internal.h
index 67083c124f01fa50f99c73ce208f42c945f144d4..6e98242f14cd8f927fcac335b36e364df100e3b6 100644
--- a/lustre/ptlrpc/gss/gss_internal.h
+++ b/lustre/ptlrpc/gss/gss_internal.h
@@ -36,6 +36,7 @@ typedef struct rawobj_buf_s {
         __u8           *buf;
 } rawobj_buf_t;
 
+int rawobj_empty(rawobj_t *obj);
 int rawobj_alloc(rawobj_t *obj, char *buf, int len);
 void rawobj_free(rawobj_t *obj);
 int rawobj_equal(rawobj_t *a, rawobj_t *b);
@@ -109,6 +110,11 @@ enum ptlrpc_gss_tgt {
         LUSTRE_GSS_TGT_MGS              = 2,
 };
 
+enum ptlrpc_gss_header_flags {
+        LUSTRE_GSS_PACK_BULK            = 1,
+        LUSTRE_GSS_PACK_USER            = 2,
+};
+
 static inline
 __u32 import_to_gss_svc(struct obd_import *imp)
 {
@@ -126,7 +132,9 @@ __u32 import_to_gss_svc(struct obd_import *imp)
  * following 3 header must have the same size and offset
  */
 struct gss_header {
-        __u32                   gh_version;     /* gss version */
+        __u8                    gh_version;     /* gss version */
+        __u8                    gh_sp;          /* sec part */
+        __u16                   gh_pad0;
         __u32                   gh_flags;       /* wrap flags */
         __u32                   gh_proc;        /* proc */
         __u32                   gh_seq;         /* sequence */
@@ -138,7 +146,9 @@ struct gss_header {
 };
 
 struct gss_rep_header {
-        __u32                   gh_version;
+        __u8                    gh_version;
+        __u8                    gh_sp;
+        __u16                   gh_pad0;
         __u32                   gh_flags;
         __u32                   gh_proc;
         __u32                   gh_major;
@@ -150,7 +160,9 @@ struct gss_rep_header {
 };
 
 struct gss_err_header {
-        __u32                   gh_version;
+        __u8                    gh_version;
+        __u8                    gh_sp;
+        __u16                   gh_pad0;
         __u32                   gh_flags;
         __u32                   gh_proc;
         __u32                   gh_major;
@@ -166,6 +178,7 @@ struct gss_err_header {
  * used later by server.
  */
 struct gss_wire_ctx {
+        __u32                   gw_flags;
         __u32                   gw_proc;
         __u32                   gw_seq;
         __u32                   gw_svc;
@@ -177,6 +190,13 @@ struct gss_wire_ctx {
                                          PTLRPC_GSS_MAX_HANDLE_SIZE)
 
 
+static inline __u64 gss_handle_to_u64(rawobj_t *handle)
+{
+        if (handle->len != PTLRPC_GSS_MAX_HANDLE_SIZE)
+                return -1;
+        return *((__u64 *) handle->data);
+}
+
 #define GSS_SEQ_WIN                     (2048)
 #define GSS_SEQ_WIN_MAIN                GSS_SEQ_WIN
 #define GSS_SEQ_WIN_BACK                (128)
@@ -200,15 +220,17 @@ struct gss_svc_seq_data {
 };
 
 struct gss_svc_ctx {
-        unsigned int            gsc_usr_root:1,
-                                gsc_usr_mds:1,
-                                gsc_remote:1;
+        struct gss_ctx         *gsc_mechctx;
+        struct gss_svc_seq_data gsc_seqdata;
+        rawobj_t                gsc_rvs_hdl;
+        __u32                   gsc_rvs_seq;
         uid_t                   gsc_uid;
         gid_t                   gsc_gid;
         uid_t                   gsc_mapped_uid;
-        rawobj_t                gsc_rvs_hdl;
-        struct gss_svc_seq_data gsc_seqdata;
-        struct gss_ctx         *gsc_mechctx;
+        unsigned int            gsc_usr_root:1,
+                                gsc_usr_mds:1,
+                                gsc_remote:1,
+                                gsc_reverse:1;
 };
 
 struct gss_svc_reqctx {
@@ -242,6 +264,8 @@ struct gss_cli_ctx {
         atomic_t                gc_seq;
         rawobj_t                gc_handle;
         struct gss_ctx         *gc_mechctx;
+        /* handle for the buddy svc ctx */
+        rawobj_t                gc_svc_handle;
 };
 
 struct gss_cli_ctx_keyring {
@@ -270,10 +294,6 @@ struct gss_sec_pipefs {
 
 struct gss_sec_keyring {
         struct gss_sec          gsk_base;
-        /*
-         * unique sec_id.
-         */
-        int                     gsk_id;
         /*
          * all contexts listed here. access is protected by sec spinlock.
          */
@@ -337,6 +357,13 @@ struct gss_svc_reqctx *gss_svc_ctx2reqctx(struct ptlrpc_svc_ctx *ctx)
         return container_of(ctx, struct gss_svc_reqctx, src_base);
 }
 
+static inline
+struct gss_svc_ctx *gss_svc_ctx2gssctx(struct ptlrpc_svc_ctx *ctx)
+{
+        LASSERT(ctx);
+        return gss_svc_ctx2reqctx(ctx)->src_ctx;
+}
+
 /* sec_gss.c */
 int gss_cli_ctx_match(struct ptlrpc_cli_ctx *ctx, struct vfs_cred *vcred);
 int gss_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize);
@@ -381,9 +408,9 @@ int gss_sec_create_common(struct gss_sec *gsec,
                           struct ptlrpc_sec_policy *policy,
                           struct obd_import *imp,
                           struct ptlrpc_svc_ctx *ctx,
-                          __u32 flavor,
-                          unsigned long flags);
+                          struct sptlrpc_flavor *sf);
 void gss_sec_destroy_common(struct gss_sec *gsec);
+void gss_sec_kill(struct ptlrpc_sec *sec);
 
 int gss_cli_ctx_init_common(struct ptlrpc_sec *sec,
                             struct ptlrpc_cli_ctx *ctx,
@@ -395,7 +422,6 @@ int gss_cli_ctx_fini_common(struct ptlrpc_sec *sec,
 void gss_cli_ctx_flags2str(unsigned long flags, char *buf, int bufsize);
 
 /* gss_keyring.c */
-extern struct ptlrpc_sec_policy gss_policy_keyring;
 int  __init gss_init_keyring(void);
 void __exit gss_exit_keyring(void);
 
@@ -438,6 +464,9 @@ __u64 gss_get_next_ctx_index(void);
 int gss_svc_upcall_install_rvs_ctx(struct obd_import *imp,
                                    struct gss_sec *gsec,
                                    struct gss_cli_ctx *gctx);
+int gss_svc_upcall_expire_rvs_ctx(rawobj_t *handle);
+int gss_svc_upcall_dup_handle(rawobj_t *handle, struct gss_svc_ctx *ctx);
+int gss_svc_upcall_update_sequence(rawobj_t *handle, __u32 seq);
 int gss_svc_upcall_handle_init(struct ptlrpc_request *req,
                                struct gss_svc_reqctx *grctx,
                                struct gss_wire_ctx *gw,
diff --git a/lustre/ptlrpc/gss/gss_rawobj.c b/lustre/ptlrpc/gss/gss_rawobj.c
index 75343c96921c2b5df68e4cac9c51bc850b5e054e..5765f41a7251f40d1caa4e328b3189cc4820167c 100644
--- a/lustre/ptlrpc/gss/gss_rawobj.c
+++ b/lustre/ptlrpc/gss/gss_rawobj.c
@@ -34,6 +34,12 @@
 
 #include "gss_internal.h"
 
+int rawobj_empty(rawobj_t *obj)
+{
+        LASSERT(equi(obj->len, obj->data));
+        return (obj->len == 0);
+}
+
 int rawobj_alloc(rawobj_t *obj, char *buf, int len)
 {
         LASSERT(obj);
diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c
index e223d5f20741a6df5c7fa4f893aa4d06e70a655c..05295b89e3c6091d5554c5aaef85ced1cfa27f47 100644
--- a/lustre/ptlrpc/layout.c
+++ b/lustre/ptlrpc/layout.c
@@ -330,8 +330,7 @@ static const struct req_msg_field *mds_setxattr_client[] = {
 };
 
 static const struct req_msg_field *mds_setxattr_server[] = {
-        &RMF_PTLRPC_BODY,
-        &RMF_EADATA
+        &RMF_PTLRPC_BODY
 };
 
 static const struct req_msg_field *mds_getattr_server[] = {
diff --git a/lustre/ptlrpc/sec_gc.c b/lustre/ptlrpc/sec_gc.c
index 96ce6ef0ba50341fcd3bf8eb08eb4b67eb17df2e..930a7dccc40941ec63a41eed59c9846d92869512 100644
--- a/lustre/ptlrpc/sec_gc.c
+++ b/lustre/ptlrpc/sec_gc.c
@@ -27,6 +27,8 @@
 
 #ifndef __KERNEL__
 #include <liblustre.h>
+#else
+#include <libcfs/libcfs.h>
 #endif
 
 #include <obd_support.h>
@@ -51,11 +53,11 @@ static atomic_t sec_gc_wait_del = ATOMIC_INIT(0);
 
 void sptlrpc_gc_add_sec(struct ptlrpc_sec *sec)
 {
-        if (!list_empty(&sec->ps_gc_list)) {
-                CERROR("sec %p(%s) already in gc list\n",
-                       sec, sec->ps_policy->sp_name);
-                return;
-        }
+        LASSERT(sec->ps_policy->sp_cops->gc_ctx);
+        LASSERT(sec->ps_gc_interval > 0);
+        LASSERT(list_empty(&sec->ps_gc_list));
+
+        sec->ps_gc_next = cfs_time_current_sec() + sec->ps_gc_interval;
 
         spin_lock(&sec_gc_list_lock);
         list_add_tail(&sec_gc_list, &sec->ps_gc_list);
@@ -72,14 +74,17 @@ void sptlrpc_gc_del_sec(struct ptlrpc_sec *sec)
 
         might_sleep();
 
+        /* signal before list_del to make iteration in gc thread safe */
+        atomic_inc(&sec_gc_wait_del);
+
         spin_lock(&sec_gc_list_lock);
         list_del_init(&sec->ps_gc_list);
         spin_unlock(&sec_gc_list_lock);
 
         /* barrier */
-        atomic_inc(&sec_gc_wait_del);
         mutex_down(&sec_gc_mutex);
         mutex_up(&sec_gc_mutex);
+
         atomic_dec(&sec_gc_wait_del);
 
         CDEBUG(D_SEC, "del sec %p(%s)\n", sec, sec->ps_policy->sp_name);
@@ -127,7 +132,7 @@ static void sec_process_ctx_list(void)
 
 static void sec_do_gc(struct ptlrpc_sec *sec)
 {
-        cfs_time_t      now = cfs_time_current_sec();
+        LASSERT(sec->ps_policy->sp_cops->gc_ctx);
 
         if (unlikely(sec->ps_gc_next == 0)) {
                 CWARN("sec %p(%s) has 0 gc time\n",
@@ -135,19 +140,13 @@ static void sec_do_gc(struct ptlrpc_sec *sec)
                 return;
         }
 
-        if (unlikely(sec->ps_policy->sp_cops->gc_ctx == NULL)) {
-                CWARN("sec %p(%s) is not prepared for gc\n",
-                      sec, sec->ps_policy->sp_name);
-                return;
-        }
-
         CDEBUG(D_SEC, "check on sec %p(%s)\n", sec, sec->ps_policy->sp_name);
 
-        if (time_after(sec->ps_gc_next, now))
+        if (cfs_time_after(sec->ps_gc_next, cfs_time_current_sec()))
                 return;
 
         sec->ps_policy->sp_cops->gc_ctx(sec);
-        sec->ps_gc_next = now + sec->ps_gc_interval;
+        sec->ps_gc_next = cfs_time_current_sec() + sec->ps_gc_interval;
 }
 
 static int sec_gc_main(void *arg)
@@ -155,24 +154,30 @@ static int sec_gc_main(void *arg)
         struct ptlrpc_thread *thread = (struct ptlrpc_thread *) arg;
         struct l_wait_info    lwi;
 
-        cfs_daemonize("sptlrpc_ctx_gc");
+        cfs_daemonize("sptlrpc_gc");
 
         /* Record that the thread is running */
         thread->t_flags = SVC_RUNNING;
         cfs_waitq_signal(&thread->t_ctl_waitq);
 
         while (1) {
-                struct ptlrpc_sec *sec, *next;
+                struct ptlrpc_sec *sec;
 
                 thread->t_flags &= ~SVC_SIGNAL;
                 sec_process_ctx_list();
 again:
+                /* go through sec list do gc.
+                 * FIXME here we iterate through the whole list each time which
+                 * is not optimal. we perhaps want to use balanced binary tree
+                 * to trace each sec as order of expiry time.
+                 * another issue here is we wakeup as fixed interval instead of
+                 * according to each sec's expiry time */
                 mutex_down(&sec_gc_mutex);
-                list_for_each_entry_safe(sec, next, &sec_gc_list, ps_gc_list) {
+                list_for_each_entry(sec, &sec_gc_list, ps_gc_list) {
                         /* if someone is waiting to be deleted, let it
                          * proceed as soon as possible. */
                         if (atomic_read(&sec_gc_wait_del)) {
-                                CWARN("deletion pending, retry\n");
+                                CWARN("deletion pending, start over\n");
                                 mutex_up(&sec_gc_mutex);
                                 goto again;
                         }
@@ -181,6 +186,9 @@ again:
                 }
                 mutex_up(&sec_gc_mutex);
 
+                /* check ctx list again before sleep */
+                sec_process_ctx_list();
+
                 lwi = LWI_TIMEOUT(SEC_GC_INTERVAL * HZ, NULL, NULL);
                 l_wait_event(thread->t_ctl_waitq,
                              thread->t_flags & (SVC_STOPPING | SVC_SIGNAL),
diff --git a/lustre/ptlrpc/sec_lproc.c b/lustre/ptlrpc/sec_lproc.c
index df81d9c7341fbd0b61723e8708bdc356ab7baa8a..115eae6c75f0814e0a9922f01736dd29f6c0a212 100644
--- a/lustre/ptlrpc/sec_lproc.c
+++ b/lustre/ptlrpc/sec_lproc.c
@@ -56,6 +56,8 @@ void sec_flags2str(unsigned long flags, char *buf, int bufsize)
                 strncat(buf, "reverse,", bufsize);
         if (flags & PTLRPC_SEC_FL_ROOTONLY)
                 strncat(buf, "rootonly,", bufsize);
+        if (flags & PTLRPC_SEC_FL_UDESC)
+                strncat(buf, "udesc,", bufsize);
         if (flags & PTLRPC_SEC_FL_BULK)
                 strncat(buf, "bulk,", bufsize);
         if (buf[0] == '\0')
@@ -65,64 +67,102 @@ void sec_flags2str(unsigned long flags, char *buf, int bufsize)
 
 }
 
-int sptlrpc_lprocfs_rd(char *page, char **start, off_t off, int count,
-                       int *eof, void *data)
+static int sptlrpc_info_lprocfs_seq_show(struct seq_file *seq, void *v)
 {
-        struct obd_device        *obd = data;
-        struct sec_flavor_config *conf = &obd->u.cli.cl_sec_conf;
-        struct ptlrpc_sec        *sec = NULL;
-        char                      flags_str[32];
-        int                       written;
-
-        if (obd == NULL)
-                return 0;
-
-        LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_OSC_NAME) == 0 ||
-                strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0 ||
-                strcmp(obd->obd_type->typ_name, LUSTRE_MGC_NAME) == 0);
-        LASSERT(conf->sfc_bulk_csum < BULK_CSUM_ALG_MAX);
-        LASSERT(conf->sfc_bulk_priv < BULK_PRIV_ALG_MAX);
-
-        if (obd->u.cli.cl_import)
-                sec = obd->u.cli.cl_import->imp_sec;
-
-        if (sec == NULL) {
-                written = snprintf(page, count, "\n");
+        struct obd_device *dev = seq->private;
+        struct client_obd *cli = &dev->u.cli;
+        struct ptlrpc_sec *sec = NULL;
+        char               flags_str[32];
+
+        LASSERT(strcmp(dev->obd_type->typ_name, LUSTRE_OSC_NAME) == 0 ||
+                strcmp(dev->obd_type->typ_name, LUSTRE_MDC_NAME) == 0 ||
+                strcmp(dev->obd_type->typ_name, LUSTRE_MGC_NAME) == 0);
+
+        if (cli->cl_import)
+                sec = sptlrpc_import_sec_ref(cli->cl_import);
+        if (sec == NULL)
                 goto out;
+
+        sec_flags2str(sec->ps_flvr.sf_flags, flags_str, sizeof(flags_str));
+
+        seq_printf(seq, "rpc flavor:    %s\n",
+                   sptlrpc_rpcflavor2name(sec->ps_flvr.sf_rpc));
+        seq_printf(seq, "bulk flavor:   %s/%s\n",
+                   sptlrpc_bulk_csum_alg2name(sec->ps_flvr.sf_bulk_csum),
+                   sptlrpc_bulk_priv_alg2name(sec->ps_flvr.sf_bulk_priv));
+        seq_printf(seq, "flags:         %s\n", flags_str);
+        seq_printf(seq, "id:            %d\n", sec->ps_id);
+        seq_printf(seq, "refcount:      %d\n", atomic_read(&sec->ps_refcount));
+        seq_printf(seq, "nctx:          %d\n", atomic_read(&sec->ps_nctx));
+        seq_printf(seq, "gc internal    %ld\n", sec->ps_gc_interval);
+        seq_printf(seq, "gc next        %ld\n",
+                   sec->ps_gc_interval ?
+                   sec->ps_gc_next - cfs_time_current_sec() : 0);
+
+        sptlrpc_sec_put(sec);
+out:
+        return 0;
+}
+LPROC_SEQ_FOPS_RO(sptlrpc_info_lprocfs);
+
+static int sptlrpc_ctxs_lprocfs_seq_show(struct seq_file *seq, void *v)
+{
+        struct obd_device *dev = seq->private;
+        struct client_obd *cli = &dev->u.cli;
+        struct ptlrpc_sec *sec = NULL;
+
+        LASSERT(strcmp(dev->obd_type->typ_name, LUSTRE_OSC_NAME) == 0 ||
+                strcmp(dev->obd_type->typ_name, LUSTRE_MDC_NAME) == 0 ||
+                strcmp(dev->obd_type->typ_name, LUSTRE_MGC_NAME) == 0);
+
+        if (cli->cl_import)
+                sec = sptlrpc_import_sec_ref(cli->cl_import);
+        if (sec == NULL)
+                goto out;
+
+        if (sec->ps_policy->sp_cops->display)
+                sec->ps_policy->sp_cops->display(sec, seq);
+
+        sptlrpc_sec_put(sec);
+out:
+        return 0;
+}
+LPROC_SEQ_FOPS_RO(sptlrpc_ctxs_lprocfs);
+
+int sptlrpc_lprocfs_cliobd_attach(struct obd_device *dev)
+{
+        int     rc;
+
+        if (strcmp(dev->obd_type->typ_name, LUSTRE_OSC_NAME) != 0 &&
+            strcmp(dev->obd_type->typ_name, LUSTRE_MDC_NAME) != 0 &&
+            strcmp(dev->obd_type->typ_name, LUSTRE_MGC_NAME) != 0) {
+                CERROR("can't register lproc for obd type %s\n",
+                       dev->obd_type->typ_name);
+                return -EINVAL;
         }
 
-        sec_flags2str(sec->ps_flags, flags_str, sizeof(flags_str));
-
-        written = snprintf(page, count,
-                        "rpc msg flavor:        %s\n"
-                        "bulk checksum:         %s\n"
-                        "bulk encrypt:          %s\n"
-                        "flags:                 %s\n"
-                        "ctx cache busy         %d\n"
-                        "gc interval            %lu\n"
-                        "gc next                %ld\n",
-                        sptlrpc_flavor2name(sec->ps_flavor),
-                        sptlrpc_bulk_csum_alg2name(conf->sfc_bulk_csum),
-                        sptlrpc_bulk_priv_alg2name(conf->sfc_bulk_priv),
-                        flags_str,
-                        atomic_read(&sec->ps_busy),
-                        sec->ps_gc_interval,
-                        sec->ps_gc_interval ?
-                                sec->ps_gc_next - cfs_time_current_sec() : 0
-                          );
-
-        if (sec->ps_policy->sp_cops->display) {
-                written += sec->ps_policy->sp_cops->display(
-                                        sec, page + written, count - written);
+        rc = lprocfs_obd_seq_create(dev, "srpc.info", 0444,
+                                    &sptlrpc_info_lprocfs_fops, dev);
+        if (rc) {
+                CERROR("create proc entry srpc.info for %s: %d\n",
+                       dev->obd_name, rc);
+                return rc;
         }
 
-out:
-        return written;
+        rc = lprocfs_obd_seq_create(dev, "srpc.contexts", 0444,
+                                    &sptlrpc_ctxs_lprocfs_fops, dev);
+        if (rc) {
+                CERROR("create proc entry srpc.contexts for %s: %d\n",
+                       dev->obd_name, rc);
+                return rc;
+        }
+
+        return 0;
 }
-EXPORT_SYMBOL(sptlrpc_lprocfs_rd);
+EXPORT_SYMBOL(sptlrpc_lprocfs_cliobd_attach);
 
 static struct lprocfs_vars sptlrpc_lprocfs_vars[] = {
-        { "enc_pool", sptlrpc_proc_read_enc_pool, NULL, NULL },
+        { "encrypt_page_pools", sptlrpc_proc_read_enc_pool, NULL, NULL },
         { NULL }
 };
 
@@ -152,6 +192,11 @@ void sptlrpc_lproc_fini(void)
 
 #else /* !__KERNEL__ */
 
+int sptlrpc_lprocfs_cliobd_attach(struct obd_device *dev)
+{
+        return 0;
+}
+
 int sptlrpc_lproc_init(void)
 {
         return 0;
diff --git a/lustre/ptlrpc/sec_plain.c b/lustre/ptlrpc/sec_plain.c
index e15a69eeb32a87fb42c2d0147d092abc1722ffc1..5c75f66e1e6fde9f3ab40396020830544ac448e4 100644
--- a/lustre/ptlrpc/sec_plain.c
+++ b/lustre/ptlrpc/sec_plain.c
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2006 Cluster File Systems, Inc.
+ * Copyright (C) 2006-2007 Cluster File Systems, Inc.
  *   Author: Eric Mei <ericm@clusterfs.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
@@ -34,12 +34,61 @@
 #include <lustre_net.h>
 #include <lustre_sec.h>
 
+struct plain_sec {
+        struct ptlrpc_sec       pls_base;
+        rwlock_t                pls_lock;
+        struct ptlrpc_cli_ctx  *pls_ctx;
+};
+
+static inline struct plain_sec *sec2plsec(struct ptlrpc_sec *sec)
+{
+        return container_of(sec, struct plain_sec, pls_base);
+}
+
 static struct ptlrpc_sec_policy plain_policy;
 static struct ptlrpc_ctx_ops    plain_ctx_ops;
-static struct ptlrpc_sec        plain_sec;
-static struct ptlrpc_cli_ctx    plain_cli_ctx;
 static struct ptlrpc_svc_ctx    plain_svc_ctx;
 
+/*
+ * flavor flags (maximum 8 flags)
+ */
+#define PLAIN_WFLVR_FLAGS_OFFSET        (12)
+#define PLAIN_WFLVR_FLAG_BULK           (1 << (0 + PLAIN_WFLVR_FLAGS_OFFSET))
+#define PLAIN_WFLVR_FLAG_USER           (1 << (1 + PLAIN_WFLVR_FLAGS_OFFSET))
+
+#define PLAIN_WFLVR_HAS_BULK(wflvr)      \
+        (((wflvr) & PLAIN_WFLVR_FLAG_BULK) != 0)
+#define PLAIN_WFLVR_HAS_USER(wflvr)      \
+        (((wflvr) & PLAIN_WFLVR_FLAG_USER) != 0)
+
+#define PLAIN_WFLVR_TO_RPC(wflvr)       \
+        ((wflvr) & ((1 << PLAIN_WFLVR_FLAGS_OFFSET) - 1))
+
+/*
+ * similar to null sec, temporarily use the third byte of lm_secflvr to identify
+ * the source sec part.
+ */
+static inline
+void plain_encode_sec_part(struct lustre_msg *msg, enum lustre_sec_part sp)
+{
+        msg->lm_secflvr |= (((__u32) sp) & 0xFF) << 16;
+}
+
+static inline
+enum lustre_sec_part plain_decode_sec_part(struct lustre_msg *msg)
+{
+        return (msg->lm_secflvr >> 16) & 0xFF;
+}
+
+/*
+ * for simplicity, plain policy rpc use fixed layout.
+ */
+#define PLAIN_PACK_SEGMENTS             (3)
+
+#define PLAIN_PACK_MSG_OFF              (0)
+#define PLAIN_PACK_USER_OFF             (1)
+#define PLAIN_PACK_BULK_OFF             (2)
+
 /****************************************
  * cli_ctx apis                         *
  ****************************************/
@@ -52,13 +101,26 @@ int plain_ctx_refresh(struct ptlrpc_cli_ctx *ctx)
         return 0;
 }
 
+static
+int plain_ctx_validate(struct ptlrpc_cli_ctx *ctx)
+{
+        return 0;
+}
+
 static
 int plain_ctx_sign(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
         struct lustre_msg_v2 *msg = req->rq_reqbuf;
         ENTRY;
 
-        msg->lm_secflvr = req->rq_sec_flavor;
+        msg->lm_secflvr = req->rq_flvr.sf_rpc;
+        if (req->rq_pack_bulk)
+                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_BULK;
+        if (req->rq_pack_udesc)
+                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_USER;
+
+        plain_encode_sec_part(msg, ctx->cc_sec->ps_part);
+
         req->rq_reqdata_len = lustre_msg_size_v2(msg->lm_bufcount,
                                                  msg->lm_buflens);
         RETURN(0);
@@ -68,23 +130,37 @@ static
 int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
         struct lustre_msg *msg = req->rq_repbuf;
+        __u16              wflvr;
         ENTRY;
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
-                if (msg->lm_bufcount != 2) {
-                        CERROR("Protocol error: invalid buf count %d\n",
-                               msg->lm_bufcount);
-                        RETURN(-EPROTO);
-                }
+        if (msg->lm_bufcount != PLAIN_PACK_SEGMENTS) {
+                CERROR("unexpected reply buf count %u\n", msg->lm_bufcount);
+                RETURN(-EPROTO);
+        }
 
-                if (bulk_sec_desc_unpack(msg, 1)) {
-                        CERROR("Mal-formed bulk checksum reply\n");
-                        RETURN(-EINVAL);
-                }
+        wflvr = WIRE_FLVR_RPC(msg->lm_secflvr);
+
+        /* expect no user desc in reply */
+        if (PLAIN_WFLVR_HAS_USER(wflvr)) {
+                CERROR("Unexpected udesc flag in reply\n");
+                RETURN(-EPROTO);
+        }
+
+        /* whether we sent with bulk or not, we expect the same in reply */
+        if (!equi(req->rq_pack_bulk == 1, PLAIN_WFLVR_HAS_BULK(wflvr))) {
+                CERROR("%s bulk checksum in reply\n",
+                       req->rq_pack_bulk ? "Missing" : "Unexpected");
+                RETURN(-EPROTO);
         }
 
-        req->rq_repmsg = lustre_msg_buf(msg, 0, 0);
-        req->rq_replen = msg->lm_buflens[0];
+        if (req->rq_pack_bulk &&
+            bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF)) {
+                CERROR("Mal-formed bulk checksum reply\n");
+                RETURN(-EINVAL);
+        }
+
+        req->rq_repmsg = lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0);
+        req->rq_replen = msg->lm_buflens[PLAIN_PACK_MSG_OFF];
         RETURN(0);
 }
 
@@ -93,17 +169,13 @@ int plain_cli_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                         struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
-        struct sec_flavor_config *conf;
-
-        LASSERT(req->rq_import);
-        LASSERT(SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor));
-        LASSERT(req->rq_reqbuf->lm_bufcount >= 2);
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
-        conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
         return bulk_csum_cli_request(desc, req->rq_bulk_read,
-                                     conf->sfc_bulk_csum,
+                                     req->rq_flvr.sf_bulk_csum,
                                      req->rq_reqbuf,
-                                     req->rq_reqbuf->lm_bufcount - 1);
+                                     PLAIN_PACK_BULK_OFF);
 }
 
 static
@@ -111,15 +183,13 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                           struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
-        LASSERT(SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor));
-        LASSERT(req->rq_reqbuf->lm_bufcount >= 2);
-        LASSERT(req->rq_repbuf->lm_bufcount >= 2);
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
+        LASSERT(req->rq_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
         return bulk_csum_cli_reply(desc, req->rq_bulk_read,
-                                   req->rq_reqbuf,
-                                   req->rq_reqbuf->lm_bufcount - 1,
-                                   req->rq_repbuf,
-                                   req->rq_repbuf->lm_bufcount - 1);
+                                   req->rq_reqbuf, PLAIN_PACK_BULK_OFF,
+                                   req->rq_repbuf, PLAIN_PACK_BULK_OFF);
 }
 
 /****************************************
@@ -127,40 +197,178 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
  ****************************************/
 
 static
-struct ptlrpc_sec* plain_create_sec(struct obd_import *imp,
-                                    struct ptlrpc_svc_ctx *ctx,
-                                    __u32 flavor,
-                                    unsigned long flags)
+struct ptlrpc_cli_ctx *plain_sec_install_ctx(struct plain_sec *plsec)
 {
-        ENTRY;
-        LASSERT(SEC_FLAVOR_POLICY(flavor) == SPTLRPC_POLICY_PLAIN);
-        RETURN(&plain_sec);
+        struct ptlrpc_cli_ctx  *ctx, *ctx_new;
+
+        OBD_ALLOC_PTR(ctx_new);
+
+        write_lock(&plsec->pls_lock);
+
+        ctx = plsec->pls_ctx;
+        if (ctx) {
+                atomic_inc(&ctx->cc_refcount);
+
+                if (ctx_new)
+                        OBD_FREE_PTR(ctx_new);
+        } else if (ctx_new) {
+                ctx = ctx_new;
+
+                atomic_set(&ctx->cc_refcount, 1); /* for cache */
+                ctx->cc_sec = &plsec->pls_base;
+                ctx->cc_ops = &plain_ctx_ops;
+                ctx->cc_expire = 0;
+                ctx->cc_flags = PTLRPC_CTX_CACHED | PTLRPC_CTX_UPTODATE;
+                ctx->cc_vcred.vc_uid = 0;
+                spin_lock_init(&ctx->cc_lock);
+                INIT_LIST_HEAD(&ctx->cc_req_list);
+                INIT_LIST_HEAD(&ctx->cc_gc_chain);
+
+                plsec->pls_ctx = ctx;
+                atomic_inc(&plsec->pls_base.ps_nctx);
+                atomic_inc(&plsec->pls_base.ps_refcount);
+
+                atomic_inc(&ctx->cc_refcount); /* for caller */
+        }
+
+        write_unlock(&plsec->pls_lock);
+
+        return ctx;
 }
 
 static
 void plain_destroy_sec(struct ptlrpc_sec *sec)
 {
+        struct plain_sec       *plsec = sec2plsec(sec);
         ENTRY;
-        LASSERT(sec == &plain_sec);
+
+        LASSERT(sec->ps_policy == &plain_policy);
+        LASSERT(sec->ps_import);
+        LASSERT(atomic_read(&sec->ps_refcount) == 0);
+        LASSERT(atomic_read(&sec->ps_nctx) == 0);
+        LASSERT(plsec->pls_ctx == NULL);
+
+        class_import_put(sec->ps_import);
+
+        OBD_FREE_PTR(plsec);
         EXIT;
 }
 
+static
+void plain_kill_sec(struct ptlrpc_sec *sec)
+{
+        sec->ps_dying = 1;
+}
+
+static
+struct ptlrpc_sec *plain_create_sec(struct obd_import *imp,
+                                    struct ptlrpc_svc_ctx *svc_ctx,
+                                    struct sptlrpc_flavor *sf)
+{
+        struct plain_sec       *plsec;
+        struct ptlrpc_sec      *sec;
+        struct ptlrpc_cli_ctx  *ctx;
+        ENTRY;
+
+        LASSERT(RPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN);
+
+        if (sf->sf_bulk_priv != BULK_PRIV_ALG_NULL) {
+                CERROR("plain policy don't support bulk encryption: %u\n",
+                       sf->sf_bulk_priv);
+                RETURN(NULL);
+        }
+
+        OBD_ALLOC_PTR(plsec);
+        if (plsec == NULL)
+                RETURN(NULL);
+
+        /*
+         * initialize plain_sec
+         */
+        plsec->pls_lock = RW_LOCK_UNLOCKED;
+        plsec->pls_ctx = NULL;
+
+        sec = &plsec->pls_base;
+        sec->ps_policy = &plain_policy;
+        atomic_set(&sec->ps_refcount, 0);
+        atomic_set(&sec->ps_nctx, 0);
+        sec->ps_id = sptlrpc_get_next_secid();
+        sec->ps_import = class_import_get(imp);
+        sec->ps_flvr = *sf;
+        sec->ps_lock = SPIN_LOCK_UNLOCKED;
+        INIT_LIST_HEAD(&sec->ps_gc_list);
+        sec->ps_gc_interval = 0;
+        sec->ps_gc_next = 0;
+
+        /* install ctx immediately if this is a reverse sec */
+        if (svc_ctx) {
+                ctx = plain_sec_install_ctx(plsec);
+                if (ctx == NULL) {
+                        plain_destroy_sec(sec);
+                        RETURN(NULL);
+                }
+                sptlrpc_cli_ctx_put(ctx, 1);
+        }
+
+        RETURN(sec);
+}
+
 static
 struct ptlrpc_cli_ctx *plain_lookup_ctx(struct ptlrpc_sec *sec,
                                         struct vfs_cred *vcred,
                                         int create, int remove_dead)
 {
+        struct plain_sec       *plsec = sec2plsec(sec);
+        struct ptlrpc_cli_ctx  *ctx;
         ENTRY;
-        atomic_inc(&plain_cli_ctx.cc_refcount);
-        RETURN(&plain_cli_ctx);
+
+        read_lock(&plsec->pls_lock);
+        ctx = plsec->pls_ctx;
+        if (ctx)
+                atomic_inc(&ctx->cc_refcount);
+        read_unlock(&plsec->pls_lock);
+
+        if (unlikely(ctx == NULL))
+                ctx = plain_sec_install_ctx(plsec);
+
+        RETURN(ctx);
+}
+
+static
+void plain_release_ctx(struct ptlrpc_sec *sec,
+                       struct ptlrpc_cli_ctx *ctx, int sync)
+{
+        LASSERT(atomic_read(&sec->ps_refcount) > 0);
+        LASSERT(atomic_read(&sec->ps_nctx) > 0);
+        LASSERT(atomic_read(&ctx->cc_refcount) == 0);
+        LASSERT(ctx->cc_sec == sec);
+
+        OBD_FREE_PTR(ctx);
+
+        atomic_dec(&sec->ps_nctx);
+        sptlrpc_sec_put(sec);
 }
 
 static
 int plain_flush_ctx_cache(struct ptlrpc_sec *sec,
-                          uid_t uid,
-                          int grace, int force)
+                          uid_t uid, int grace, int force)
 {
-        return 0;
+        struct plain_sec       *plsec = sec2plsec(sec);
+        struct ptlrpc_cli_ctx  *ctx;
+        ENTRY;
+
+        /* do nothing unless caller want to flush for 'all' */
+        if (uid != -1)
+                RETURN(0);
+
+        write_lock(&plsec->pls_lock);
+        ctx = plsec->pls_ctx;
+        plsec->pls_ctx = NULL;
+        write_unlock(&plsec->pls_lock);
+
+        if (ctx)
+                sptlrpc_cli_ctx_put(ctx, 1);
+        RETURN(0);
 }
 
 static
@@ -168,24 +376,24 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
                        struct ptlrpc_request *req,
                        int msgsize)
 {
-        struct sec_flavor_config *conf;
-        int bufcnt = 1, buflens[2], alloc_len;
+        int buflens[PLAIN_PACK_SEGMENTS] = { 0, };
+        int alloc_len;
         ENTRY;
 
-        buflens[0] = msgsize;
+        buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
-        if (SEC_FLAVOR_HAS_USER(req->rq_sec_flavor))
-                buflens[bufcnt++] = sptlrpc_current_user_desc_size();
+        if (req->rq_pack_udesc)
+                buflens[PLAIN_PACK_USER_OFF] = sptlrpc_current_user_desc_size();
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
+        if (req->rq_pack_bulk) {
                 LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-                conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
-                buflens[bufcnt++] = bulk_sec_desc_size(conf->sfc_bulk_csum, 1,
-                                                       req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
+                                                req->rq_flvr.sf_bulk_csum, 1,
+                                                req->rq_bulk_read);
         }
 
-        alloc_len = lustre_msg_size_v2(bufcnt, buflens);
+        alloc_len = lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
 
         if (!req->rq_reqbuf) {
                 LASSERT(!req->rq_pool);
@@ -202,11 +410,11 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
                 memset(req->rq_reqbuf, 0, alloc_len);
         }
 
-        lustre_init_msg_v2(req->rq_reqbuf, bufcnt, buflens, NULL);
+        lustre_init_msg_v2(req->rq_reqbuf, PLAIN_PACK_SEGMENTS, buflens, NULL);
         req->rq_reqmsg = lustre_msg_buf_v2(req->rq_reqbuf, 0, 0);
 
-        if (SEC_FLAVOR_HAS_USER(req->rq_sec_flavor))
-                sptlrpc_pack_user_desc(req->rq_reqbuf, 1);
+        if (req->rq_pack_udesc)
+                sptlrpc_pack_user_desc(req->rq_reqbuf, PLAIN_PACK_USER_OFF);
 
         RETURN(0);
 }
@@ -221,6 +429,8 @@ void plain_free_reqbuf(struct ptlrpc_sec *sec,
                 req->rq_reqbuf = NULL;
                 req->rq_reqbuf_len = 0;
         }
+
+        req->rq_reqmsg = NULL;
         EXIT;
 }
 
@@ -229,21 +439,21 @@ int plain_alloc_repbuf(struct ptlrpc_sec *sec,
                        struct ptlrpc_request *req,
                        int msgsize)
 {
-        struct sec_flavor_config *conf;
-        int bufcnt = 1, buflens[2], alloc_len;
+        int buflens[PLAIN_PACK_SEGMENTS] = { 0, };
+        int alloc_len;
         ENTRY;
 
-        buflens[0] = msgsize;
+        buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
+        if (req->rq_pack_bulk) {
                 LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-                conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
-                buflens[bufcnt++] = bulk_sec_desc_size(conf->sfc_bulk_csum, 0,
-                                                       req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
+                                                req->rq_flvr.sf_bulk_csum, 0,
+                                                req->rq_bulk_read);
         }
 
-        alloc_len = lustre_msg_size_v2(bufcnt, buflens);
+        alloc_len = lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
         alloc_len = size_roundup_power2(alloc_len);
 
         OBD_ALLOC(req->rq_repbuf, alloc_len);
@@ -262,6 +472,8 @@ void plain_free_repbuf(struct ptlrpc_sec *sec,
         OBD_FREE(req->rq_repbuf, req->rq_repbuf_len);
         req->rq_repbuf = NULL;
         req->rq_repbuf_len = 0;
+
+        req->rq_repmsg = NULL;
         EXIT;
 }
 
@@ -275,10 +487,10 @@ int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
         int                     newmsg_size, newbuf_size;
         ENTRY;
 
-        /* embedded msg always at seg 0 */
         LASSERT(req->rq_reqbuf);
         LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
-        LASSERT(lustre_msg_buf(req->rq_reqbuf, 0, 0) == req->rq_reqmsg);
+        LASSERT(lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_MSG_OFF, 0) ==
+                req->rq_reqmsg);
 
         /* compute new embedded msg size.  */
         oldsize = req->rq_reqmsg->lm_buflens[segment];
@@ -288,11 +500,11 @@ int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
         req->rq_reqmsg->lm_buflens[segment] = oldsize;
 
         /* compute new wrapper msg size.  */
-        oldsize = req->rq_reqbuf->lm_buflens[0];
-        req->rq_reqbuf->lm_buflens[0] = newmsg_size;
+        oldsize = req->rq_reqbuf->lm_buflens[PLAIN_PACK_MSG_OFF];
+        req->rq_reqbuf->lm_buflens[PLAIN_PACK_MSG_OFF] = newmsg_size;
         newbuf_size = lustre_msg_size_v2(req->rq_reqbuf->lm_bufcount,
                                          req->rq_reqbuf->lm_buflens);
-        req->rq_reqbuf->lm_buflens[0] = oldsize;
+        req->rq_reqbuf->lm_buflens[PLAIN_PACK_MSG_OFF] = oldsize;
 
         /* request from pool should always have enough buffer */
         LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newbuf_size);
@@ -309,10 +521,12 @@ int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
                 OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
                 req->rq_reqbuf = newbuf;
                 req->rq_reqbuf_len = newbuf_size;
-                req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf, 0, 0);
+                req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf,
+                                                PLAIN_PACK_MSG_OFF, 0);
         }
 
-        _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, 0, newmsg_size);
+        _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, PLAIN_PACK_MSG_OFF,
+                                     newmsg_size);
         _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize);
 
         req->rq_reqlen = newmsg_size;
@@ -332,46 +546,43 @@ static
 int plain_accept(struct ptlrpc_request *req)
 {
         struct lustre_msg *msg = req->rq_reqbuf;
-        int                bufcnt = 1;
         ENTRY;
 
-        LASSERT(SEC_FLAVOR_POLICY(req->rq_sec_flavor) == SPTLRPC_POLICY_PLAIN);
+        LASSERT(RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) == SPTLRPC_POLICY_PLAIN);
 
-        if (SEC_FLAVOR_RPC(req->rq_sec_flavor) != SPTLRPC_FLVR_PLAIN) {
-                CERROR("Invalid flavor 0x%x\n", req->rq_sec_flavor);
-                return SECSVC_DROP;
+        if (msg->lm_bufcount < PLAIN_PACK_SEGMENTS) {
+                CERROR("unexpected request buf count %u\n", msg->lm_bufcount);
+                RETURN(SECSVC_DROP);
         }
 
-        if (SEC_FLAVOR_HAS_USER(req->rq_sec_flavor)) {
-                if (msg->lm_bufcount < ++bufcnt) {
-                        CERROR("Protocal error: too small buf count %d\n",
-                               msg->lm_bufcount);
-                        RETURN(SECSVC_DROP);
-                }
+        if (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_PLAIN) {
+                CERROR("Invalid rpc flavor %x\n", req->rq_flvr.sf_rpc);
+                RETURN(SECSVC_DROP);
+        }
+
+        req->rq_sp_from = plain_decode_sec_part(msg);
 
-                if (sptlrpc_unpack_user_desc(msg, bufcnt - 1)) {
+        if (PLAIN_WFLVR_HAS_USER(msg->lm_secflvr)) {
+                if (sptlrpc_unpack_user_desc(msg, PLAIN_PACK_USER_OFF)) {
                         CERROR("Mal-formed user descriptor\n");
                         RETURN(SECSVC_DROP);
                 }
 
-                req->rq_user_desc = lustre_msg_buf(msg, bufcnt - 1, 0);
+                req->rq_pack_udesc = 1;
+                req->rq_user_desc = lustre_msg_buf(msg, PLAIN_PACK_USER_OFF, 0);
         }
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
-                if (msg->lm_bufcount != ++bufcnt) {
-                        CERROR("Protocal error: invalid buf count %d\n",
-                               msg->lm_bufcount);
-                        RETURN(SECSVC_DROP);
-                }
-
-                if (bulk_sec_desc_unpack(msg, bufcnt - 1)) {
+        if (PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr)) {
+                if (bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF)) {
                         CERROR("Mal-formed bulk checksum request\n");
                         RETURN(SECSVC_DROP);
                 }
+
+                req->rq_pack_bulk = 1;
         }
 
-        req->rq_reqmsg = lustre_msg_buf(msg, 0, 0);
-        req->rq_reqlen = msg->lm_buflens[0];
+        req->rq_reqmsg = lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0);
+        req->rq_reqlen = msg->lm_buflens[PLAIN_PACK_MSG_OFF];
 
         req->rq_svc_ctx = &plain_svc_ctx;
         atomic_inc(&req->rq_svc_ctx->sc_refcount);
@@ -382,26 +593,26 @@ int plain_accept(struct ptlrpc_request *req)
 static
 int plain_alloc_rs(struct ptlrpc_request *req, int msgsize)
 {
-        struct ptlrpc_reply_state *rs;
+        struct ptlrpc_reply_state   *rs;
         struct ptlrpc_bulk_sec_desc *bsd;
-        int bufcnt = 1, buflens[2];
-        int rs_size = sizeof(*rs);
+        int                          buflens[PLAIN_PACK_SEGMENTS] = { 0, };
+        int                          rs_size = sizeof(*rs);
         ENTRY;
 
         LASSERT(msgsize % 8 == 0);
 
-        buflens[0] = msgsize;
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor) &&
-            (req->rq_bulk_read || req->rq_bulk_write)) {
+        buflens[PLAIN_PACK_MSG_OFF] = msgsize;
+
+        if (req->rq_pack_bulk && (req->rq_bulk_read || req->rq_bulk_write)) {
                 bsd = lustre_msg_buf(req->rq_reqbuf,
-                                     req->rq_reqbuf->lm_bufcount - 1,
-                                     sizeof(*bsd));
+                                     PLAIN_PACK_BULK_OFF, sizeof(*bsd));
                 LASSERT(bsd);
 
-                buflens[bufcnt++] = bulk_sec_desc_size(bsd->bsd_csum_alg, 0,
-                                                       req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
+                                                        bsd->bsd_csum_alg, 0,
+                                                        req->rq_bulk_read);
         }
-        rs_size += lustre_msg_size_v2(bufcnt, buflens);
+        rs_size += lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
 
         rs = req->rq_reply_state;
 
@@ -421,8 +632,8 @@ int plain_alloc_rs(struct ptlrpc_request *req, int msgsize)
         rs->rs_repbuf = (struct lustre_msg *) (rs + 1);
         rs->rs_repbuf_len = rs_size - sizeof(*rs);
 
-        lustre_init_msg_v2(rs->rs_repbuf, bufcnt, buflens, NULL);
-        rs->rs_msg = lustre_msg_buf_v2(rs->rs_repbuf, 0, 0);
+        lustre_init_msg_v2(rs->rs_repbuf, PLAIN_PACK_SEGMENTS, buflens, NULL);
+        rs->rs_msg = lustre_msg_buf_v2(rs->rs_repbuf, PLAIN_PACK_MSG_OFF, 0);
 
         req->rq_reply_state = rs;
         RETURN(0);
@@ -452,12 +663,16 @@ int plain_authorize(struct ptlrpc_request *req)
         LASSERT(rs);
         LASSERT(msg);
 
-        if (req->rq_replen != msg->lm_buflens[0])
-                len = lustre_shrink_msg(msg, 0, req->rq_replen, 1);
+        if (req->rq_replen != msg->lm_buflens[PLAIN_PACK_MSG_OFF])
+                len = lustre_shrink_msg(msg, PLAIN_PACK_MSG_OFF,
+                                        req->rq_replen, 1);
         else
                 len = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
 
-        msg->lm_secflvr = req->rq_sec_flavor;
+        msg->lm_secflvr = req->rq_flvr.sf_rpc;
+        if (req->rq_pack_bulk)
+                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_BULK;
+
         rs->rs_repdata_len = len;
         RETURN(0);
 }
@@ -467,18 +682,21 @@ int plain_svc_unwrap_bulk(struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
         struct ptlrpc_reply_state      *rs = req->rq_reply_state;
-        int                             voff, roff;
 
         LASSERT(rs);
-
-        voff = req->rq_reqbuf->lm_bufcount - 1;
-        roff = rs->rs_repbuf->lm_bufcount - 1;
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount >= PLAIN_PACK_SEGMENTS);
+        LASSERT(rs->rs_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
         return bulk_csum_svc(desc, req->rq_bulk_read,
-                             lustre_msg_buf(req->rq_reqbuf, voff, 0),
-                             lustre_msg_buflen(req->rq_reqbuf, voff),
-                             lustre_msg_buf(rs->rs_repbuf, roff, 0),
-                             lustre_msg_buflen(rs->rs_repbuf, roff));
+                             lustre_msg_buf(req->rq_reqbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(req->rq_reqbuf,
+                                               PLAIN_PACK_BULK_OFF),
+                             lustre_msg_buf(rs->rs_repbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(rs->rs_repbuf,
+                                               PLAIN_PACK_BULK_OFF));
 }
 
 static
@@ -486,22 +704,26 @@ int plain_svc_wrap_bulk(struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
         struct ptlrpc_reply_state      *rs = req->rq_reply_state;
-        int                             voff, roff;
 
         LASSERT(rs);
-
-        voff = req->rq_reqbuf->lm_bufcount - 1;
-        roff = rs->rs_repbuf->lm_bufcount - 1;
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount >= PLAIN_PACK_SEGMENTS);
+        LASSERT(rs->rs_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
         return bulk_csum_svc(desc, req->rq_bulk_read,
-                             lustre_msg_buf(req->rq_reqbuf, voff, 0),
-                             lustre_msg_buflen(req->rq_reqbuf, voff),
-                             lustre_msg_buf(rs->rs_repbuf, roff, 0),
-                             lustre_msg_buflen(rs->rs_repbuf, roff));
+                             lustre_msg_buf(req->rq_reqbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(req->rq_reqbuf,
+                                               PLAIN_PACK_BULK_OFF),
+                             lustre_msg_buf(rs->rs_repbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(rs->rs_repbuf,
+                                               PLAIN_PACK_BULK_OFF));
 }
 
 static struct ptlrpc_ctx_ops plain_ctx_ops = {
         .refresh                = plain_ctx_refresh,
+        .validate               = plain_ctx_validate,
         .sign                   = plain_ctx_sign,
         .verify                 = plain_ctx_verify,
         .wrap_bulk              = plain_cli_wrap_bulk,
@@ -511,7 +733,9 @@ static struct ptlrpc_ctx_ops plain_ctx_ops = {
 static struct ptlrpc_sec_cops plain_sec_cops = {
         .create_sec             = plain_create_sec,
         .destroy_sec            = plain_destroy_sec,
+        .kill_sec               = plain_kill_sec,
         .lookup_ctx             = plain_lookup_ctx,
+        .release_ctx            = plain_release_ctx,
         .flush_ctx_cache        = plain_flush_ctx_cache,
         .alloc_reqbuf           = plain_alloc_reqbuf,
         .alloc_repbuf           = plain_alloc_repbuf,
@@ -531,50 +755,19 @@ static struct ptlrpc_sec_sops plain_sec_sops = {
 
 static struct ptlrpc_sec_policy plain_policy = {
         .sp_owner               = THIS_MODULE,
-        .sp_name                = "sec.plain",
+        .sp_name                = "plain",
         .sp_policy              = SPTLRPC_POLICY_PLAIN,
         .sp_cops                = &plain_sec_cops,
         .sp_sops                = &plain_sec_sops,
 };
 
-static
-void plain_init_internal(void)
-{
-        static HLIST_HEAD(__list);
-
-        plain_sec.ps_policy = &plain_policy;
-        atomic_set(&plain_sec.ps_refcount, 1);     /* always busy */
-        plain_sec.ps_import = NULL;
-        plain_sec.ps_flavor = SPTLRPC_FLVR_PLAIN;
-        plain_sec.ps_flags = 0;
-        spin_lock_init(&plain_sec.ps_lock);
-        atomic_set(&plain_sec.ps_busy, 1);         /* for "plain_cli_ctx" */
-        CFS_INIT_LIST_HEAD(&plain_sec.ps_gc_list);
-        plain_sec.ps_gc_interval = 0;
-        plain_sec.ps_gc_next = 0;
-
-        hlist_add_head(&plain_cli_ctx.cc_cache, &__list);
-        atomic_set(&plain_cli_ctx.cc_refcount, 1);    /* for hash */
-        plain_cli_ctx.cc_sec = &plain_sec;
-        plain_cli_ctx.cc_ops = &plain_ctx_ops;
-        plain_cli_ctx.cc_expire = 0;
-        plain_cli_ctx.cc_flags = PTLRPC_CTX_CACHED | PTLRPC_CTX_ETERNAL |
-                                 PTLRPC_CTX_UPTODATE;
-        plain_cli_ctx.cc_vcred.vc_uid = 0;
-        spin_lock_init(&plain_cli_ctx.cc_lock);
-        CFS_INIT_LIST_HEAD(&plain_cli_ctx.cc_req_list);
-        CFS_INIT_LIST_HEAD(&plain_cli_ctx.cc_gc_chain);
-}
-
 int sptlrpc_plain_init(void)
 {
         int rc;
 
-        plain_init_internal();
-
         rc = sptlrpc_register_policy(&plain_policy);
         if (rc)
-                CERROR("failed to register sec.plain: %d\n", rc);
+                CERROR("failed to register: %d\n", rc);
 
         return rc;
 }
@@ -585,5 +778,5 @@ void sptlrpc_plain_fini(void)
 
         rc = sptlrpc_unregister_policy(&plain_policy);
         if (rc)
-                CERROR("cannot unregister sec.plain: %d\n", rc);
+                CERROR("cannot unregister: %d\n", rc);
 }
diff --git a/lustre/tests/sanity-gss.sh b/lustre/tests/sanity-gss.sh
index 4e3111aca4e1b8b9768383386d4b4801087ae029..56370ce968e0e2779ba5ee313a1b80521b337a47 100644
--- a/lustre/tests/sanity-gss.sh
+++ b/lustre/tests/sanity-gss.sh
@@ -9,15 +9,12 @@ set -e
 
 ONLY=${ONLY:-"$*"}
 # bug number for skipped test:
-ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-""}
+ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"$SANITY_GSS_EXCEPT"}
 # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
-if [ "x$GSS_PIPEFS" != "xy" ]; then
-    ALWAYS_EXCEPT="$ALWAYS_EXCEPT 4"
-fi
 
-[ "$SLOW" = "no" ] && EXCEPT="$EXCEPT"
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="100 101"
 
-# Tests that fail on uml, maybe elsewhere, FIXME
+# Tests that fail on uml
 CPU=`awk '/model/ {print $4}' /proc/cpuinfo`
 [ "$CPU" = "UML" ] && EXCEPT="$EXCEPT"
 
@@ -28,17 +25,16 @@ esac
 
 SRCDIR=`dirname $0`
 export PATH=$PWD/$SRCDIR:$SRCDIR:$SRCDIR/../utils:$SRCDIR/../utils/gss:$PATH:/sbin
+export NAME=${NAME:-local}
+SAVE_PWD=$PWD
 
-TMP=${TMP:-/tmp}
+CLEANUP=${CLEANUP:-""}
+SETUP=${SETUP:-""}
 
-CHECKSTAT=${CHECKSTAT:-"checkstat -v"}
-CREATETEST=${CREATETEST:-createtest}
-LFS=${LFS:-lfs}
-LCTL=${LCTL:-lctl}
-MEMHOG=${MEMHOG:-memhog}
-DIRECTIO=${DIRECTIO:-directio}
-ACCEPTOR_PORT=${ACCEPTOR_PORT:-988}
-UMOUNT=${UMOUNT:-"umount -d"}
+LUSTRE=${LUSTRE:-`dirname $0`/..}
+. $LUSTRE/tests/test-framework.sh
+init_test_env $@
+. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 
 if [ $UID -ne 0 ]; then
     echo "Warning: running as non-root uid $UID"
@@ -49,96 +45,316 @@ else
     RUNAS=${RUNAS:-"runas -u $RUNAS_ID"}
 
     # $RUNAS_ID may get set incorrectly somewhere else
-    if [ $RUNAS_ID -eq 0 ]; then
-        echo "Error: \$RUNAS_ID set to 0, but \$UID is also 0!"
-        exit 1
-    fi
+    [ $RUNAS_ID -eq 0 ] && error "\$RUNAS_ID set to 0, but \$UID is also 0!"
 fi
 
-SANITYLOG=${SANITYLOG:-/tmp/sanity-gss.log}
-
-export NAME=${NAME:-local}
-
-SAVE_PWD=$PWD
-
-export SEC=${SEC:-krb5p}
-export KRB5_CCACHE_DIR=/tmp
-export KRB5_CRED=$KRB5_CCACHE_DIR/krb5cc_$RUNAS_ID
-export KRB5_CRED_SAVE=$KRB5_CCACHE_DIR/krb5cc.sanity.save
+# remove $SEC, we'd like to control everything by ourselves
+unset SEC
 
 #
-# check pre-set $SEC
+# global variables of this sanity
 #
-case "x$SEC" in
-    xkrb5*)
-        echo "Using ptlrpc security flavor $SEC"
-        ;;
-    *)
-        echo "SEC=$SEC is invalid, it has to be gss/krb5 flavor"
-        exit 1
-        ;;
-esac
+KRB5_CCACHE_DIR=/tmp
+KRB5_CRED=$KRB5_CCACHE_DIR/krb5cc_$RUNAS_ID
+KRB5_CRED_SAVE=$KRB5_CCACHE_DIR/krb5cc.sanity.save
+CLICOUNT=2
+cnt_mdt2ost=0
+cnt_mdt2mdt=0
+cnt_cli2ost=0
+cnt_cli2mdt=0
+cnt_all2ost=0
+cnt_all2mdt=0
+cnt_all2all=0
+DBENCH_PID=0
+PROC_CLI="srpc.info"
+
+# set manually
+GSS=true
+GSS_KRB5=true
+
+# we want double mount
+MOUNT_2=${MOUNT_2:-"yes"}
+cleanup_and_setup_lustre
+
+rm -rf $DIR/${TESTSUITE}/[df][0-9]*
+rm -rf $DIR/[df][0-9]*
+
+check_runas_id $RUNAS_ID $RUNAS
 
-LUSTRE=${LUSTRE:-`dirname $0`/..}
-. $LUSTRE/tests/test-framework.sh
-init_test_env $@
-. ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
+build_test_filter
 
 prepare_krb5_creds() {
-    rm -f $CRED_SAVE
+    rm -f $KRB5_CRED_SAVE
     $RUNAS krb5_login.sh || exit 1
     [ -f $KRB5_CRED ] || exit 2
     cp $KRB5_CRED $KRB5_CRED_SAVE
 }
 
-cleanup() {
-    echo -n "cln.."
-    cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; }
+combination()
+{
+    local M=$1
+    local N=$2
+    local R=1
+
+    if [ $M -lt $N ]; then
+        R=0
+    else
+        N=$((N + 1))
+        while [ $N -le $M ]; do
+            R=$((R * N))
+            N=$((N + 1))
+        done
+    fi
+
+    echo $R
+    return 0
 }
-CLEANUP=${CLEANUP:-:}
 
-setup() {
-    echo -n "mnt.."
-    load_modules
-    setupall || exit 10
-    echo "done"
+calc_connection_cnt() {
+    # MDT->MDT = 2 * C(M, 2)
+    # MDT->OST = M * O
+    # CLI->OST = C * O
+    # CLI->MDT = C * M
+    comb_m2=$(combination $MDSCOUNT 2)
+
+    cnt_mdt2mdt=$((comb_m2 * 2))
+    cnt_mdt2ost=$((MDSCOUNT * OSTCOUNT))
+    cnt_cli2ost=$((CLICOUNT * OSTCOUNT))
+    cnt_cli2mdt=$((CLICOUNT * MDSCOUNT))
+    cnt_all2ost=$((cnt_mdt2ost + cnt_cli2ost))
+    cnt_all2mdt=$((cnt_mdt2mdt + cnt_cli2mdt))
+    cnt_all2all=$((cnt_mdt2ost + cnt_mdt2mdt + cnt_cli2ost + cnt_cli2mdt))
 }
-SETUP=${SETUP:-:}
 
-trace() {
-    log "STARTING: $*"
-    strace -o $TMP/$1.strace -ttt $*
-    RC=$?
-    log "FINISHED: $*: rc $RC"
-    return 1
+set_rule()
+{
+    local tgt=$1
+    local net=$2
+    local dir=$3
+    local flavor=$4
+    local cmd="$tgt.srpc.flavor"
+
+    if [ $net == "any" ]; then
+        net="default"
+    fi
+    cmd="$cmd.$net"
+
+    if [ $dir != "any" ]; then
+        cmd="$cmd.$dir"
+    fi
+
+    cmd="$cmd=$flavor"
+    log "Setting sptlrpc rule: $cmd"
+    do_facet mgs "$LCTL conf_param $cmd"
+}
+
+count_flvr()
+{
+    output=$1
+    flavor=$2
+
+    echo "$output" | grep rpc | grep $flavor | wc -l
 }
-TRACE=${TRACE:-""}
 
-check_kernel_version() {
-    VERSION_FILE=$LPROC/kernel_version
-    WANT_VER=$1
-    [ ! -f $VERSION_FILE ] && echo "can't find kernel version" && return 1
-    GOT_VER=`cat $VERSION_FILE`
-    [ $GOT_VER -ge $WANT_VER ] && return 0
-    log "test needs at least kernel version $WANT_VER, running $GOT_VER"
+flvr_cnt_cli2mdt()
+{
+    local flavor=$1
+
+    output=`do_facet client cat $LPROC/mdc/*-MDT*-mdc-*/$PROC_CLI 2>/dev/null`
+    count_flvr "$output" $flavor
+}
+
+flvr_cnt_cli2ost()
+{
+    local flavor=$1
+
+    output=`do_facet client cat $LPROC/osc/*OST*-osc-[^M][^D][^T]*/$PROC_CLI 2>/dev/null`
+    count_flvr "$output" $flavor
+}
+
+flvr_cnt_mdt2mdt()
+{
+    local flavor=$1
+    local cnt=0
+
+    if [ $MDSCOUNT -le 1 ]; then
+        echo 0
+        return
+    fi
+
+    for num in `seq $MDSCOUNT`; do
+        output=`do_facet mds$num cat $LPROC/mdc/*-MDT*-mdc[0-9]*/$PROC_CLI 2>/dev/null`
+        tmpcnt=`count_flvr "$output" $flavor`
+        cnt=$((cnt + tmpcnt))
+    done
+    echo $cnt;
+}
+
+flvr_cnt_mdt2ost()
+{
+    local flavor=$1
+    local cnt=0
+
+    for num in `seq $MDSCOUNT`; do
+        output=`do_facet mds$num cat $LPROC/osc/*OST*-osc-MDT*/$PROC_CLI 2>/dev/null`
+        tmpcnt=`count_flvr "$output" $flavor`
+        cnt=$((cnt + tmpcnt))
+    done
+    echo $cnt;
+}
+
+do_check_flavor()
+{
+    local dir=$1        # from to
+    local flavor=$2     # flavor expected
+    local res=0
+
+    if [ $dir == "cli2mdt" ]; then
+        res=`flvr_cnt_cli2mdt $flavor`
+    elif [ $dir == "cli2ost" ]; then
+        res=`flvr_cnt_cli2ost $flavor`
+    elif [ $dir == "mdt2mdt" ]; then
+        res=`flvr_cnt_mdt2mdt $flavor`
+    elif [ $dir == "mdt2ost" ]; then
+        res=`flvr_cnt_mdt2ost $flavor`
+    elif [ $dir == "all2ost" ]; then
+        res1=`flvr_cnt_mdt2ost $flavor`
+        res2=`flvr_cnt_cli2ost $flavor`
+        res=$((res1 + res2))
+    elif [ $dir == "all2mdt" ]; then
+        res1=`flvr_cnt_mdt2mdt $flavor`
+        res2=`flvr_cnt_cli2mdt $flavor`
+        res=$((res1 + res2))
+    elif [ $dir == "all2all" ]; then
+        res1=`flvr_cnt_mdt2ost $flavor`
+        res2=`flvr_cnt_cli2ost $flavor`
+        res3=`flvr_cnt_mdt2mdt $flavor`
+        res4=`flvr_cnt_cli2mdt $flavor`
+        res=$((res1 + res2 + res3 + res4))
+    fi
+
+    echo $res
+}
+
+wait_flavor()
+{
+    local dir=$1        # from to
+    local flavor=$2     # flavor expected
+    local expect=$3     # number expected
+    local res=0
+
+    for ((i=0;i<20;i++)); do
+        echo -n "checking..."
+        res=$(do_check_flavor $dir $flavor)
+        if [ $res -eq $expect ]; then
+            echo "found $res $flavor connections of $dir, OK"
+            return 0
+        else
+            echo "found $res $flavor connections of $dir, not ready ($expect)"
+            sleep 4
+        fi
+    done
+
+    echo "Error checking $flavor of $dir: expect $expect, actual $res"
     return 1
 }
 
-_basetest() {
-    echo $*
+restore_to_default_flavor()
+{
+    local proc=$LPROC/mgs/MGS/live/$FSNAME
+
+    echo "restoring to default flavor..."
+
+    nrule=`do_facet mgs cat $proc 2>/dev/null | grep ".srpc.flavor." | wc -l`
+
+    # remove all existing rules if any
+    if [ $nrule -ne 0 ]; then
+        echo "$nrule existing rules"
+        for rule in `do_facet mgs cat $proc 2>/dev/null | grep ".srpc.flavor."`; do
+            echo "remove rule: $rule"
+            spec=`echo $rule | awk -F = '{print $1}'`
+            do_facet mgs "$LCTL conf_param $spec="
+        done
+    fi
+
+    # verify no rules left
+    nrule=`do_facet mgs cat $proc 2>/dev/null | grep ".srpc.flavor." | wc -l`
+    [ $nrule -ne 0 ] && error "still $nrule rules left"
+
+    # wait for default flavor to be applied
+    # currently default flavor for all connections are 'null'
+    wait_flavor all2all null $cnt_all2all
+    echo "now at default flavor settings"
 }
 
-[ "$SANITYLOG" ] && rm -f $SANITYLOG || true
+set_flavor_all()
+{
+    local flavor=$1
 
+    echo "setting all flavor to $flavor"
 
-prepare_krb5_creds
-build_test_filter
-umask 077
+    res=$(do_check_flavor all2all $flavor)
+    if [ $res -eq $cnt_all2all ]; then
+        echo "already have total $res $flavor connections"
+        return
+    fi
 
-# setup filesystem
-formatall
-setupall
-chmod a+rwx $MOUNT
+    echo "found $res $flavor out of total $cnt_all2all connections"
+    restore_to_default_flavor
+
+    set_rule $FSNAME any any $flavor
+    wait_flavor all2all $flavor $cnt_all2all
+}
+
+start_dbench()
+{
+    NPROC=`cat /proc/cpuinfo 2>/dev/null | grep ^processor | wc -l`
+    [ $NPROC -lt 2 ] && NPROC=2
+    sh rundbench $NPROC 1>/dev/null &
+    DBENCH_PID=$!
+    sleep 2
+
+    num=`ps --no-headers -p $DBENCH_PID 2>/dev/null | wc -l`
+    if [ $num -ne 1 ]; then
+        error "failed to start dbench $NPROC"
+    else
+        echo "started dbench with $NPROC processes at background"
+    fi
+
+    return 0
+}
+
+check_dbench()
+{
+    num=`ps --no-headers -p $DBENCH_PID 2>/dev/null | wc -l`
+    if [ $num -eq 0 ]; then
+        echo "dbench $DBENCH_PID already finished"
+        wait $DBENCH_PID || error "dbench $PID exit with error"
+        start_dbench
+    elif [ $num -ne 1 ]; then
+        killall -9 dbench
+        error "found $num instance of pid $DBENCH_PID ???"
+    fi
+
+    return 0
+}
+
+stop_dbench()
+{
+    for ((;;)); do
+        killall dbench 2>/dev/null
+        num=`ps --no-headers -p $DBENCH_PID | wc -l`
+        if [ $num -eq 0 ]; then
+            echo "dbench finished"
+            break
+        fi
+        echo "dbench $DBENCH_PID is still running, waiting 2s..."
+        sleep 2
+    done
+
+    wait $DBENCH_PID || true
+    sync || true
+}
 
 restore_krb5_cred() {
     cp $KRB5_CRED_SAVE $KRB5_CRED
@@ -146,37 +362,102 @@ restore_krb5_cred() {
     chmod 0600 $KRB5_CRED
 }
 
+check_multiple_gss_daemons() {
+    local facet=$1
+    local gssd=$2
+    local gssd_name=`basename $gssd`
+
+    for ((i=0;i<10;i++)); do
+        do_facet $facet "$gssd -v &"
+    done
+
+    # wait daemons entering "stable" status
+    sleep 5
+
+    num=`do_facet $facet ps -o cmd -C $gssd_name | grep $gssd_name | wc -l`
+    echo "$num instance(s) of $gssd_name are running"
+
+    if [ $num -ne 1 ]; then
+        error "$gssd_name not unique"
+    fi
+}
+
+prepare_krb5_creds
+calc_connection_cnt
+umask 077
+
+test_0() {
+    local my_facet=mds
+
+    echo "bring up gss daemons..."
+    start_gss_daemons
+
+    echo "check with someone already running..."
+    check_multiple_gss_daemons $my_facet $LSVCGSSD
+    if $GSS_PIPEFS; then
+        check_multiple_gss_daemons $my_facet $LGSSD
+    fi
+
+    echo "check with someone run & finished..."
+    do_facet $my_facet killall -q -2 lgssd lsvcgssd || true
+    sleep 5 # wait fully exit
+    check_multiple_gss_daemons $my_facet $LSVCGSSD
+    if $GSS_PIPEFS; then
+        check_multiple_gss_daemons $my_facet $LGSSD
+    fi
+
+    echo "check refresh..."
+    do_facet $my_facet killall -q -2 lgssd lsvcgssd || true
+    sleep 5 # wait fully exit
+    do_facet $my_facet ipcrm -S 0x3b92d473
+    check_multiple_gss_daemons $my_facet $LSVCGSSD
+    if $GSS_PIPEFS; then
+        do_facet $my_facet ipcrm -S 0x3a92d473
+        check_multiple_gss_daemons $my_facet $LGSSD
+    fi
+}
+run_test 0 "start multiple gss daemons"
+
+set_flavor_all krb5p
+
 test_1() {
+    local file=$DIR/$tfile
+
+    chmod 0777 $DIR || error "chmod $DIR failed"
     # access w/o cred
     $RUNAS kdestroy
-    $RUNAS touch $MOUNT/f1 && error "unexpected success"
+    $RUNAS touch $file && error "unexpected success"
 
     # access w/ cred
     restore_krb5_cred
-    $RUNAS touch $MOUNT/f1 || error "should not fail"
-    [ -f $MOUNT/f1 ] || error "$MOUNT/f1 not found"
+    $RUNAS touch $file || error "should not fail"
+    [ -f $file ] || error "$file not found"
 }
 run_test 1 "access with or without krb5 credential"
 
 test_2() {
+    local file1=$DIR/$tfile-1
+    local file2=$DIR/$tfile-2
+
+    chmod 0777 $DIR || error "chmod $DIR failed"
     # current access should be ok
-    $RUNAS touch $MOUNT/f2_1 || error "can't touch $MOUNT/f2_1"
-    [ -f $MOUNT/f2_1 ] || error "$MOUNT/f2_1 not found"
+    $RUNAS touch $file1 || error "can't touch $file1"
+    [ -f $file1 ] || error "$file1 not found"
 
     # cleanup all cred/ctx and touch
     $RUNAS kdestroy
     $RUNAS $LFS flushctx || error "can't flush ctx"
-    $RUNAS touch $MOUNT/f2_2 && error "unexpected success"
+    $RUNAS touch $file2 && error "unexpected success"
 
     # restore and touch
     restore_krb5_cred
-    $RUNAS touch $MOUNT/f2_2 || error "should not fail"
-    [ -f $MOUNT/f2_2 ] || error "$MOUNT/f2_2 not found"
+    $RUNAS touch $file2 || error "should not fail"
+    [ -f $file2 ] || error "$file2 not found"
 }
 run_test 2 "lfs flushctx"
 
 test_3() {
-    local file=$MOUNT/f3
+    local file=$DIR/$tfile
 
     # create file
     echo "aaaaaaaaaaaaaaaaa" > $file
@@ -215,9 +496,12 @@ test_3() {
 run_test 3 "local cache under DLM lock"
 
 test_4() {
-    local file1=$MOUNT/f4_1
-    local file2=$MOUNT/f4_2
+    local file1=$DIR/$tfile-1
+    local file2=$DIR/$tfile-2
 
+    ! $GSS_PIPEFS && skip "pipefs not used" && return
+
+    chmod 0777 $DIR || error "chmod $DIR failed"
     # current access should be ok
     $RUNAS touch $file1 || error "can't touch $file1"
     [ -f $file1 ] || error "$file1 not found"
@@ -246,10 +530,11 @@ test_4() {
 run_test 4 "lgssd dead, operations should wait timeout and fail"
 
 test_5() {
-    local file1=$MOUNT/f5_1
-    local file2=$MOUNT/f5_2
-    local wait_time=`expr $TIMEOUT + $TIMEOUT`
+    local file1=$DIR/$tfile-1
+    local file2=$DIR/$tfile-2
+    local wait_time=$((TIMEOUT + TIMEOUT / 2))
 
+    chmod 0777 $DIR || error "chmod $DIR failed"
     # current access should be ok
     $RUNAS touch $file1 || error "can't touch $file1"
     [ -f $file1 ] || error "$file1 not found"
@@ -282,26 +567,15 @@ test_5() {
 run_test 5 "lsvcgssd dead, operations lead to recovery"
 
 test_6() {
-    NPROC=`cat /proc/cpuinfo 2>/dev/null | grep ^processor | wc -l`
-    [ $NPROC -ne 0 ] || NPROC=2
-
-    echo "starting dbench $NPROC"
-    sh rundbench $NPROC &
-    RUNPID=$!
-
-    for ((n=0;;n++)); do
-        sleep 2
-        num=`ps --no-headers -p $RUNPID | wc -l`
-        [ $num -ne 0 ] || break
-        echo "flush ctx ..."
-        $LFS flushctx
-    done
-    wait $RUNPID || error "dbench detect error"
+    mkdir $DIR/d6 || error "mkdir $DIR/d6 failed"
+    cp -a /etc/* $DIR/d6/ || error "cp failed"
+    ls -l $DIR/d6/* > /dev/null || error "ls failed"
+    rm -rf $DIR2/d6/* || error "rm failed"
 }
-run_test 6 "recoverable from losing context"
+run_test 6 "test basic DLM callback works"
 
 test_7() {
-    local tdir=$MOUNT/dir7
+    local tdir=$DIR/d7
     local num_osts
 
     #
@@ -333,74 +607,298 @@ test_7() {
 }
 run_test 7 "exercise enlarge_reqbuf()"
 
-check_multiple_gss_daemons() {
-    local facet=$1
-    local gssd=$2
-    local gssd_name=`basename $gssd`
+test_90() {
+    if [ "$SLOW" = "no" ]; then
+        total=10
+    else
+        total=60
+    fi
 
-    for ((i=0;i<10;i++)); do
-        do_facet $facet "$gssd -v &"
+    start_dbench
+
+    for ((n=0;n<$total;n++)); do
+        sleep 2
+        check_dbench
+        echo "flush ctx ($n/$total) ..."
+        $LFS flushctx
     done
+    check_dbench
+    stop_dbench
+}
+run_test 90 "recoverable from losing contexts under load"
 
-    # wait daemons entering "stable" status
-    sleep 5
+test_99() {
+    local nrule_old=0
+    local nrule_new=0
+    local max=32
 
-    num=`do_facet $facet ps -o cmd -C $gssd_name | grep $gssd_name | wc -l`
-    echo "$num instance(s) of $gssd_name are running"
+    #
+    # general rules
+    #
+    nrule_old=`do_facet mgs cat $LPROC/mgs/MGS/live/$FSNAME 2>/dev/null \
+               | grep "$FSNAME.srpc.flavor." | wc -l`
+    echo "original general rules: $nrule_old"
 
-    if [ $num -ne 1 ]; then
-        error "$gssd_name not unique"
+    for ((i = $nrule_old; i < $max; i++)); do
+        set_rule $FSNAME elan$i any krb5n || error "set rule $i"
+    done
+    set_rule $FSNAME elan100 any krb5n && error "set $max rule should fail"
+    for ((i = $nrule_old; i < $max; i++)); do
+        set_rule $FSNAME elan$i any || error "remove rule $i"
+    done
+
+    nrule_new=`do_facet mgs cat $LPROC/mgs/MGS/live/$FSNAME 2>/dev/null \
+               | grep "$FSNAME.srpc.flavor." | wc -l`
+    if [ $nrule_new != $nrule_old ]; then
+        error "general rule: $nrule_new != $nrule_old"
     fi
+
+    #
+    # target-specific rules
+    #
+    nrule_old=`do_facet mgs cat $LPROC/mgs/MGS/live/$FSNAME 2>/dev/null \
+               | grep "$FSNAME-MDT0000.srpc.flavor." | wc -l`
+    echo "original target rules: $nrule_old"
+
+    for ((i = $nrule_old; i < $max; i++)); do
+        set_rule $FSNAME-MDT0000 elan$i any krb5i || error "set rule $i"
+    done
+    set_rule $FSNAME-MDT0000 elan100 any krb5i && error "set $max rule should fail"
+    for ((i = $nrule_old; i < $max; i++)); do
+        set_rule $FSNAME-MDT0000 elan$i any || error "remove rule $i"
+    done
+
+    nrule_new=`do_facet mgs cat $LPROC/mgs/MGS/live/$FSNAME 2>/dev/null \
+               | grep "$FSNAME-MDT0000.srpc.flavor." | wc -l`
+    if [ $nrule_new != $nrule_old ]; then
+        error "general rule: $nrule_new != $nrule_old"
+    fi
+}
+run_test 99 "maximum sptlrpc rules limitation"
+
+error_dbench()
+{
+    local err_str=$1
+
+    killall -9 dbench
+    sleep 1
+
+    error $err_str
 }
 
 test_100() {
-    local facet=mds
+    # started from default flavors
+    restore_to_default_flavor
 
-    # cleanup everything at first
-    cleanupall
+    # running dbench background
+    start_dbench
 
-    echo "bring up gss daemons..."
-    start_gss_daemons
+    #
+    # all: null -> krb5n -> krb5a -> krb5i -> krb5p -> plain
+    #
+    set_rule $FSNAME any any krb5n
+    wait_flavor all2all krb5n $cnt_all2all || error_dbench "1"
+    check_dbench
 
-    echo "check with someone already running..."
-    check_multiple_gss_daemons $facet $LSVCGSSD
-    if [ "x$GSS_PIPEFS" == "xy" ]; then
-        check_multiple_gss_daemons $facet $LGSSD
-    fi
+    set_rule $FSNAME any any krb5a
+    wait_flavor all2all krb5a $cnt_all2all || error_dbench "2"
+    check_dbench
 
-    echo "check with someone run & finished..."
-    do_facet $facet killall -q -2 lgssd lsvcgssd || true
-    sleep 5 # wait fully exit
-    check_multiple_gss_daemons $facet $LSVCGSSD
-    if [ "x$GSS_PIPEFS" == "xy" ]; then
-        check_multiple_gss_daemons $facet $LGSSD
-    fi
+    set_rule $FSNAME any any krb5i
+    wait_flavor all2all krb5i $cnt_all2all || error_dbench "3"
+    check_dbench
 
-    echo "check refresh..."
-    do_facet $facet killall -q -2 lgssd lsvcgssd || true
-    sleep 5 # wait fully exit
-    do_facet $facet ipcrm -S 0x3b92d473
-    check_multiple_gss_daemons $facet $LSVCGSSD
-    if [ "x$GSS_PIPEFS" == "xy" ]; then
-        do_facet $facet ipcrm -S 0x3a92d473
-        check_multiple_gss_daemons $facet $LGSSD
+    set_rule $FSNAME any any krb5p
+    wait_flavor all2all krb5p $cnt_all2all || error_dbench "4"
+    check_dbench
+
+    set_rule $FSNAME any any plain
+    wait_flavor all2all plain $cnt_all2all || error_dbench "5"
+    check_dbench
+
+    #
+    # M - M: krb5a
+    # C - M: krb5i
+    # M - O: krb5p
+    # C - O: krb5n
+    #
+    set_rule $FSNAME any mdt2mdt krb5a
+    wait_flavor mdt2mdt krb5a $cnt_mdt2mdt || error_dbench "6"
+    check_dbench
+
+    set_rule $FSNAME any cli2mdt krb5i
+    wait_flavor cli2mdt krb5i $cnt_cli2mdt || error_dbench "7"
+    check_dbench
+
+    set_rule $FSNAME any mdt2ost krb5p
+    wait_flavor mdt2ost krb5p $cnt_mdt2ost || error_dbench "8"
+    check_dbench
+
+    set_rule $FSNAME any cli2ost krb5n
+    wait_flavor cli2ost krb5n $cnt_cli2ost || error_dbench "9"
+    check_dbench
+
+    #
+    # * - MDT0: krb5p
+    # * - OST0: krb5i
+    #
+    # nothing should be changed because they are override by above dir rules
+    #
+    set_rule $FSNAME-MDT0000 any any krb5p
+    set_rule $FSNAME-OST0000 any any krb5i
+    wait_flavor mdt2mdt krb5a $cnt_mdt2mdt || error_dbench "10"
+    wait_flavor cli2mdt krb5i $cnt_cli2mdt || error_dbench "11"
+    check_dbench
+    wait_flavor mdt2ost krb5p $cnt_mdt2ost || error_dbench "12"
+    wait_flavor cli2ost krb5n $cnt_cli2ost || error_dbench "13"
+
+    #
+    # delete all dir-specific rules
+    #
+    set_rule $FSNAME any mdt2mdt
+    set_rule $FSNAME any cli2mdt
+    set_rule $FSNAME any mdt2ost
+    set_rule $FSNAME any cli2ost
+    wait_flavor mdt2mdt krb5p $((MDSCOUNT - 1)) || error_dbench "14"
+    wait_flavor cli2mdt krb5p $CLICOUNT || error_dbench "15"
+    check_dbench
+    wait_flavor mdt2ost krb5i $MDSCOUNT || error_dbench "16"
+    wait_flavor cli2ost krb5i $CLICOUNT || error_dbench "17"
+    check_dbench
+
+    #
+    # remove:
+    #  * - MDT0: krb5p
+    #  * - OST0: krb5i
+    #
+    set_rule $FSNAME-MDT0000 any any
+    set_rule $FSNAME-OST0000 any any || error_dbench "18"
+    wait_flavor all2all plain $cnt_all2all || error_dbench "19"
+    check_dbench
+
+    stop_dbench
+}
+run_test 100 "change security flavor on the fly under load"
+
+switch_sec_test()
+{
+    local count=$1
+    local flavor0=$2
+    local flavor1=$3
+    local flavor2=$4
+    local df_pid=0
+    local wait_time=$((TIMEOUT + TIMEOUT / 4))
+    local num
+
+    #
+    # stop gss daemon, then switch to flavor1 (which should be a gss flavor),
+    # and run a 'df' which should hanging, wait the request timeout and
+    # resend, then switch the flavor to another one. To exercise the code of
+    # switching ctx/sec for a resend request.
+    #
+    echo ">>>>>>>>>>>>>>> Testing $flavor0 -> $flavor1 -> $flavor2..."
+
+    echo "(0) set base flavor $flavor0"
+    set_rule $FSNAME any cli2mdt $flavor0
+    wait_flavor cli2mdt $flavor0 $count
+    df $MOUNT
+    if [ $? -ne 0 ]; then
+        error "initial df failed"
     fi
 
     stop_gss_daemons
+    sleep 1
+
+    echo "(1) $flavor0 -> $flavor1"
+    set_rule $FSNAME any cli2mdt $flavor1
+    wait_flavor cli2mdt $flavor1 $count
+    df $MOUNT &
+    df_pid=$!
+    sleep 1
+
+    echo "waiting $wait_time seconds for df ($df_pid)"
+    sleep $wait_time
+    num=`ps --no-headers -p $df_pid 2>/dev/null | wc -l`
+    [ $num -eq 1 ] || error "df already ended ($num)"
+    echo "process $df_pid is still hanging there... OK"
+
+    echo "(2) set end flavor $flavor2"
+    set_rule $FSNAME any cli2mdt $flavor2
+    wait_flavor cli2mdt $flavor2 $count
+    start_gss_daemons
+    wait $df_pid || error "df returned error"
 }
-run_test 100 "start multiple gss daemons"
 
-TMPDIR=$OLDTMPDIR
-TMP=$OLDTMP
-HOME=$OLDHOME
+test_101()
+{
+    # started from default flavors
+    restore_to_default_flavor
+
+    switch_sec_test $cnt_cli2mdt null krb5n null
+    switch_sec_test $cnt_cli2mdt null krb5a null
+    switch_sec_test $cnt_cli2mdt null krb5i null
+    switch_sec_test $cnt_cli2mdt null krb5p null
+    switch_sec_test $cnt_cli2mdt null krb5i plain
+    switch_sec_test $cnt_cli2mdt plain krb5p plain
+    switch_sec_test $cnt_cli2mdt plain krb5n krb5a
+    switch_sec_test $cnt_cli2mdt krb5a krb5i krb5p
+    switch_sec_test $cnt_cli2mdt krb5p krb5a krb5n
+    switch_sec_test $cnt_cli2mdt krb5n krb5p krb5i
+}
+run_test 101 "switch ctx as well as sec for resending request"
 
-log "cleanup: ======================================================"
-if [ "`mount | grep ^$NAME`" ]; then
-    rm -rf $DIR/[Rdfs][1-9]*
-fi
+error_102()
+{
+    local err_str=$1
 
-cleanupall -f || error "cleanup failed"
+    killall -9 dbench
+    sleep 1
 
+    error $err_str
+}
+
+test_102() {
+    # started from default flavors
+    restore_to_default_flavor
+
+    # run dbench background
+    start_dbench
+
+    echo "Testing null->krb5n->krb5a->krb5i->krb5p->plain->null"
+    set_rule $FSNAME any any krb5n
+    set_rule $FSNAME any any krb5a
+    set_rule $FSNAME any any krb5i
+    set_rule $FSNAME any any krb5p
+    set_rule $FSNAME any any plain
+    set_rule $FSNAME any any null
+
+    check_dbench
+    wait_flavor all2all null $cnt_all2all || error_dbench "1"
+    check_dbench
+
+    echo "waiting for 15s and check again"
+    sleep 15
+    check_dbench
+
+    echo "Testing null->krb5i->null->krb5i->null..."
+    for ((i=0; i<10; i++)); do
+        set_rule $FSNAME any any krb5i
+        set_rule $FSNAME any any null
+    done
+    set_rule $FSNAME any any krb5i
+
+    check_dbench
+    wait_flavor all2all krb5i $cnt_all2all || error_dbench "2"
+    check_dbench
+
+    echo "waiting for 15s and check again"
+    sleep 15
+    check_dbench
+
+    stop_dbench
+}
+run_test 102 "survive from insanely fast flavor switch"
 
-echo '=========================== finished ==============================='
-[ -f "$SANITYLOG" ] && cat $SANITYLOG && exit 1 || true
+equals_msg `basename $0`: test complete, cleaning up
+check_and_cleanup_lustre
+[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
diff --git a/lustre/utils/gss/svcgssd_proc.c b/lustre/utils/gss/svcgssd_proc.c
index 745268a2f7d3c6508b68374c7a36bc796e5573d4..280aa2eca2efa7796b1bb692f7f4644331b92cbf 100644
--- a/lustre/utils/gss/svcgssd_proc.c
+++ b/lustre/utils/gss/svcgssd_proc.c
@@ -301,7 +301,9 @@ get_ids(gss_name_t client_name, gss_OID mech, struct svc_cred *cred,
 {
 	u_int32_t	maj_stat, min_stat;
 	gss_buffer_desc	name;
-	char		*sname, *realm, *slash;
+	char		*sname, *host, *realm;
+	const int	namebuf_size = 512;
+	char		namebuf[namebuf_size];
 	int		res = -1;
 	gss_OID		name_type = GSS_C_NO_OID;
 	struct passwd	*pw;
@@ -323,8 +325,6 @@ get_ids(gss_name_t client_name, gss_OID mech, struct svc_cred *cred,
 		return -1;
 	}
 	memcpy(sname, name.value, name.length);
-	printerr(1, "%s: authenticated %s from %016llx\n",
-		 lustre_svc_name[lustre_svc], sname, nid);
 	gss_release_buffer(&min_stat, &name);
 
 	if (lustre_svc == LUSTRE_GSS_SVC_MDS)
@@ -333,64 +333,91 @@ get_ids(gss_name_t client_name, gss_OID mech, struct svc_cred *cred,
 		cred->cr_mapped_uid = -1;
 
         realm = strchr(sname, '@');
-        if (!realm) {
-                printerr(0, "WARNNING: principal %s contains no realm name\n",
-			 sname);
-                cred->cr_remote = (mds_local_realm != NULL);
-        } else {
+	if (realm)
                 *realm++ = '\0';
-                if (!mds_local_realm)
-                        cred->cr_remote = 1;
-                else
-                        cred->cr_remote =
-                                (strcasecmp(mds_local_realm, realm) != 0);
-        }
 
-        if (cred->cr_remote) {
-                if (cred->cr_mapped_uid != -1)
-                        res = 0;
-		else if (lustre_svc == LUSTRE_GSS_SVC_OSS &&
-			 strcmp(sname, "lustre_root") == 0)
-                        res = 0;
-                else
-                        printerr(0, "principal %s is remote without mapping\n",
-                                 sname);
+        host = strchr(sname, '/');
+        if (host)
+                *host++ = '\0';
+
+	if (strcmp(sname, GSSD_SERVICE_OSS) == 0) {
+		printerr(0, "forbid "GSSD_SERVICE_OSS" as user name\n");
 		goto out_free;
-        }
+	}
+
+	/* 1. check host part */
+	if (host) {
+		if (lnet_nid2hostname(nid, namebuf, namebuf_size)) {
+			printerr(0, "ERROR: failed to resolve hostname for "
+				 "%s/%s@%s from %016llx\n",
+				 sname, host, realm, nid);
+			goto out_free;
+		}
+
+		if (strcasecmp(host, namebuf)) {
+			printerr(0, "ERROR: %s/%s@s claimed hostname doesn't "
+				 "match %s, nid %016llx\n", sname, host, realm,
+				 namebuf, nid);
+			goto out_free;
+		}
+	} else {
+		if (!strcmp(sname, GSSD_SERVICE_MDS)) {
+			printerr(0, "ERROR: "GSSD_SERVICE_MDS"@%s from %016llx "
+				 "doesn't bind with hostname\n",
+				 realm ? realm : "", nid);
+			goto out_free;
+		}
+	}
 
-        slash = strchr(sname, '/');
-        if (slash)
-                *slash = '\0';
+	/* 2. check realm */
+	if (!realm) {
+		/* just deny it
+                cred->cr_remote = (mds_local_realm != NULL);
+		*/
+                printerr(0, "ERROR: %s%s%s have no realm name\n",
+			 sname, host ? "/" : "", host ? "host" : "");
+		goto out_free;
+	}
+
+	if (!mds_local_realm || strcasecmp(mds_local_realm, realm)) {
+		cred->cr_remote = 1;
+
+		if (cred->cr_mapped_uid == -1)
+                        printerr(0, "ERROR: %s from %016llx is remote but "
+				 "without mapping\n", sname, nid);
+		/* mapped, skip user checking */
+		goto out_free;
+	}
 
+	/* 3. check user */
         if (!(pw = getpwnam(sname))) {
-                /* If client use machine credential, we map it to root, which
-                 * will subject to further mapping by root-squash in kernel.
-                 *
-                 * MDS service keytab is treated as special user, also mapped
-                 * to root. OSS service keytab can't be used as a user.
-                 */
+                /* map lustre_root/lustre_mds to root user, which is subject
+		 * to further mapping by root-squash in kernel. */
                 if (!strcmp(sname, LUSTRE_ROOT_NAME)) {
-                        printerr(2, "lustre_root principal, resolve to uid 0\n");
                         cred->cr_uid = 0;
                         cred->cr_usr_root = 1;
                 } else if (!strcmp(sname, GSSD_SERVICE_MDS)) {
-                        printerr(2, "mds service principal, resolve to uid 0\n");
                         cred->cr_uid = 0;
                         cred->cr_usr_mds = 1;
                 } else {
-                        cred->cr_uid = -1;
                         if (cred->cr_mapped_uid == -1) {
-                                printerr(0, "invalid user %s\n", sname);
+                                printerr(0, "ERROR: invalid user, %s/%s@%s "
+					 "from %016llx\n", sname, host,
+					 realm, nid);
                                 goto out_free;
                         }
-                        printerr(2, "user %s mapped to %u\n",
-                                 sname, cred->cr_mapped_uid);
                 }
+		printerr(2, "user %s from %016llx is mapped to %u\n",
+			 sname, nid, cred->cr_mapped_uid);
         } else {
+		/* note: a mapped local user will go to here too */
                 cred->cr_uid = pw->pw_uid;
                 printerr(2, "%s resolve to uid %u\n", sname, cred->cr_uid);
         }
 
+	printerr(1, "%s: authenticated %s%s%s@%s from %016llx\n",
+		 lustre_svc_name[lustre_svc], sname,
+		 host ? "/" : "", host ? host : "", realm, nid);
         res = 0;
 out_free:
         free(sname);