From de919862e119e4fb15ccc95c3a17de6cf314dae4 Mon Sep 17 00:00:00 2001
From: adilger <adilger>
Date: Wed, 16 May 2007 15:50:24 +0000
Subject: [PATCH] Branch b1_6 Description: MDS fails to start if a duplicate
 client export is detected Details    : in some rare cases it was possible for
 a client to connect to 	     an MDS multiple times.  Upon recovery the
 MDS would detect this 	     and fail during startup.  Handle this more
 gracefully. b=11818 b=10479 i=adilger (nic original patch) i=scjody

---
 lustre/ChangeLog          |  9 +++++-
 lustre/mds/mds_fs.c       | 62 ++++++++++++++++++++++-----------------
 lustre/obdfilter/filter.c | 37 ++++++++++++-----------
 3 files changed, 63 insertions(+), 45 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index b4d6ddb743..c9682652e0 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -92,8 +92,15 @@ Severity   : enhancement
 Bugzilla   : 10768
 Description: 64-bit inode version
 Details:   : Add a on-disk 64-bit inode version for ext3 to track changes made
-	     to the inode. This will be required for version-based recovery.	      
+	     to the inode. This will be required for version-based recovery.
 
+Severity   : normal
+Frequency  : rare
+Bugzilla   : 11818
+Description: MDS fails to start if a duplicate client export is detected
+Details    : in some rare cases it was possible for a client to connect to
+	     an MDS multiple times.  Upon recovery the MDS would detect this
+	     and fail during startup.  Handle this more gracefully.
 
 --------------------------------------------------------------------------------
 
diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c
index 3c386d46f1..f79285de50 100644
--- a/lustre/mds/mds_fs.c
+++ b/lustre/mds/mds_fs.c
@@ -55,7 +55,7 @@ static int mds_export_stats_init(struct obd_device *obd, struct obd_export *exp)
         int rc, num_stats;
 
         rc = lprocfs_exp_setup(exp);
-        if (rc) 
+        if (rc)
                 return rc;
         num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
                      LPROC_MDS_LAST - 1;
@@ -212,7 +212,8 @@ int mds_client_free(struct obd_export *exp)
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
                                          sizeof(zero_mcd), &off,
-                                         (!exp->exp_libclient || exp->exp_need_sync));
+                                         (!exp->exp_libclient ||
+                                          exp->exp_need_sync));
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
@@ -304,7 +305,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
                 }
                 /* COMPAT_146 */
                 /* Assume old last_rcvd format unless I_C_LR is set */
-                if (!(lsd->lsd_feature_incompat & 
+                if (!(lsd->lsd_feature_incompat &
                       cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
                         lsd->lsd_mount_count = lsd->lsd_compat14;
                 /* end COMPAT_146 */
@@ -326,7 +327,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
         }
 
         lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
-        
+
         mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
 
         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
@@ -399,26 +400,33 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
                        le64_to_cpu(mcd->mcd_last_xid));
 
                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
-                if (IS_ERR(exp))
-                        GOTO(err_client, rc = PTR_ERR(exp));
-
-                med = &exp->exp_mds_data;
-                med->med_mcd = mcd;
-                rc = mds_client_add(obd, exp, cl_idx);
-                LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
-
-
-                mcd = NULL;
-	
-                spin_lock(&exp->exp_lock);
-                exp->exp_replay_needed = 1;
-                exp->exp_connecting = 0;
-                spin_unlock(&exp->exp_lock);
-
-                obd->obd_recoverable_clients++;
-                obd->obd_max_recoverable_clients++;
-                class_export_put(exp);
+                if (IS_ERR(exp)) {
+                        if (PTR_ERR(exp) == -EALREADY) {
+                                /* export already exists, zero out this one */
+                                mcd->mcd_uuid[0] = '\0';
+                        } else {
+                                GOTO(err_client, rc = PTR_ERR(exp));
+                        }
+                } else {
+                        med = &exp->exp_mds_data;
+                        med->med_mcd = mcd;
+                        rc = mds_client_add(obd, exp, cl_idx);
+                        /* can't fail for existing client */
+                        LASSERTF(rc == 0, "rc = %d\n", rc);
+
+                        mcd = NULL;
+
+                        spin_lock(&exp->exp_lock);
+                        exp->exp_replay_needed = 1;
+                        exp->exp_connecting = 0;
+                        spin_unlock(&exp->exp_lock);
+
+                        obd->obd_recoverable_clients++;
+                        obd->obd_max_recoverable_clients++;
+                        class_export_put(exp);
+                }
 
+                /* Need to check last_rcvd even for duplicated exports. */
                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
                        cl_idx, last_transno);
 
@@ -444,7 +452,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
         }
 
         mds->mds_mount_count = mount_count + 1;
-        lsd->lsd_mount_count = lsd->lsd_compat14 = 
+        lsd->lsd_mount_count = lsd->lsd_compat14 =
                 cpu_to_le64(mds->mds_mount_count);
 
         /* save it, so mount count and last_transno is current */
@@ -583,7 +591,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
         if (IS_ERR(file)) {
                 rc = PTR_ERR(file);
-                CERROR("cannot open/create %s file: rc = %d\n", HEALTH_CHECK, rc);
+                CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
                 GOTO(err_lov_objid, rc = PTR_ERR(file));
         }
         mds->mds_health_check_filp = file;
@@ -601,7 +609,7 @@ err_pop:
         return rc;
 
 err_health_check:
-        if (mds->mds_health_check_filp && 
+        if (mds->mds_health_check_filp &&
             filp_close(mds->mds_health_check_filp, 0))
                 CERROR("can't close %s after error\n", HEALTH_CHECK);
 err_lov_objid:
@@ -654,7 +662,7 @@ int mds_fs_cleanup(struct obd_device *obd)
                 rc = filp_close(mds->mds_health_check_filp, 0);
                 mds->mds_health_check_filp = NULL;
                 if (rc)
-                        CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK, rc);
+                        CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
         }
         if (mds->mds_objects_dir != NULL) {
                 l_dput(mds->mds_objects_dir);
diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c
index 6974d870e9..2c78cdfbce 100644
--- a/lustre/obdfilter/filter.c
+++ b/lustre/obdfilter/filter.c
@@ -165,9 +165,9 @@ static int filter_export_stats_init(struct obd_device *obd,
         if (obd_uuid_equals(&exp->exp_client_uuid, &obd->obd_uuid))
                 /* Self-export gets no proc entry */
                 RETURN(0);
-        
+
         rc = lprocfs_exp_setup(exp);
-        if (rc) 
+        if (rc)
                 RETURN(rc);
 
         /* Create a per export proc entry for brw_stats */
@@ -210,7 +210,7 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp,
         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
 
         /* Self-export */
-        if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid) == 0) 
+        if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid) == 0)
                 RETURN(0);
 
         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
@@ -335,7 +335,8 @@ static int filter_client_free(struct obd_export *exp)
                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
                                          sizeof(zero_fcd), &off,
-                                         (!exp->exp_libclient || exp->exp_need_sync));
+                                         (!exp->exp_libclient ||
+                                          exp->exp_need_sync));
 
                 if (rc == 0)
                         /* update server's transno */
@@ -665,7 +666,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 filter->fo_subdir_count = le16_to_cpu(fsd->lsd_subdir_count);
                 /* COMPAT_146 */
                 /* Assume old last_rcvd format unless I_C_LR is set */
-                if (!(fsd->lsd_feature_incompat & 
+                if (!(fsd->lsd_feature_incompat &
                       cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
                         fsd->lsd_last_transno = fsd->lsd_compat14;
                 /* end COMPAT_146 */
@@ -760,7 +761,8 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                         fed->fed_fcd = fcd;
                         filter_export_stats_init(obd, exp);
                         rc = filter_client_add(obd, exp, cl_idx);
-                        LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+                        /* can't fail for existing client */
+                        LASSERTF(rc == 0, "rc = %d\n", rc);
 
                         fcd = NULL;
 
@@ -887,7 +889,7 @@ static int filter_prep_groups(struct obd_device *obd)
         }
         filter->fo_dentry_O = O_dentry;
         cleanup_phase = 1; /* O_dentry */
-        
+
         OBD_ALLOC(filter->fo_last_objids, FILTER_GROUPS * sizeof(__u64));
         if (filter->fo_last_objids == NULL)
                 GOTO(cleanup, rc = -ENOMEM);
@@ -1633,7 +1635,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                 CERROR("Using old MDS mount method\n");
                 mnt = ll_kern_mount(lustre_cfg_string(lcfg, 2),
                                     MS_NOATIME|MS_NODIRATIME,
-                                    lustre_cfg_string(lcfg, 1), option);    
+                                    lustre_cfg_string(lcfg, 1), option);
                 if (IS_ERR(mnt)) {
                         rc = PTR_ERR(mnt);
                         LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
@@ -1717,7 +1719,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         } else {
                 str = "no UUID";
         }
-        
+
         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
 
         if (obd->obd_recovering) {
@@ -1951,7 +1953,7 @@ static int filter_connect_internal(struct obd_export *exp,
                 struct filter_obd *filter = &exp->exp_obd->u.filter;
                 struct lr_server_data *lsd = filter->fo_fsd;
                 int index = le32_to_cpu(lsd->lsd_ost_index);
-                
+
                 if (!(lsd->lsd_feature_compat &
                       cpu_to_le32(OBD_COMPAT_OST))) {
                         /* this will only happen on the first connect */
@@ -1973,8 +1975,9 @@ static int filter_connect_internal(struct obd_export *exp,
                 data->ocd_brw_size = 65536;
         } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
                 data->ocd_brw_size = min(data->ocd_brw_size,
-                                         (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
-                LASSERT(data->ocd_brw_size); 
+                                         (__u32)(PTLRPC_MAX_BRW_PAGES <<
+                                                 CFS_PAGE_SHIFT));
+                LASSERT(data->ocd_brw_size);
         }
 
         /* FIXME: Do the same with the MDS UUID and fsd_peeruuid.
@@ -2461,8 +2464,8 @@ out_unlock:
         /* trigger quota release */
         if (ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
                 unsigned int cur_ids[MAXQUOTAS] = {oa->o_uid, oa->o_gid};
-                int rc2 = lquota_adjust(filter_quota_interface_ref, exp->exp_obd, cur_ids,
-                                        orig_ids, rc, FSFILT_OP_SETATTR);
+                int rc2 = lquota_adjust(filter_quota_interface_ref,exp->exp_obd,
+                                        cur_ids, orig_ids,rc,FSFILT_OP_SETATTR);
                 CDEBUG(rc2 ? D_ERROR : D_QUOTA,
                        "filter adjust qunit. (rc:%d)\n", rc2);
         }
@@ -2904,7 +2907,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
         }
         *num = i;
 
-        CDEBUG(D_HA, "%s: created %d objects for group "LPU64": "LPU64" rc %d\n",
+        CDEBUG(D_HA,"%s: created %d objects for group "LPU64": "LPU64" rc %d\n",
                obd->obd_name, i, group, filter->fo_last_objids[group], rc);
 
         RETURN(rc);
@@ -3355,14 +3358,14 @@ static struct dentry *filter_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
         return filter_fid2dentry(data, NULL, gr, id);
 }
 
-static int filter_process_config(struct obd_device *obd, obd_count len, void *buf)
+static int filter_process_config(struct obd_device *obd,obd_count len,void *buf)
 {
         struct lustre_cfg *lcfg = buf;
         struct lprocfs_static_vars lvars;
         int rc = 0;
 
         lprocfs_init_vars(filter, &lvars);
-        
+
         rc = class_process_proc_param(PARAM_OST, lvars.obd_vars, lcfg, obd);
 
         return(rc);
-- 
GitLab