From 0cbcad593a5024897228acb2bc0cd49304431620 Mon Sep 17 00:00:00 2001
From: yury <yury>
Date: Wed, 25 Jun 2008 22:14:03 +0000
Subject: [PATCH] b=15863 r=shadow,wangdi - fixes missed llcd_put() and wrong
 flag passed to llog_cleanup_commit_master() what again led to missing some
 llcd to be decrefed in cleanup. Fixes test_42 from replay-single.sh; - some
 comments to understand llcd live cycle better next time.

---
 lustre/obdfilter/filter.c    |  2 +-
 lustre/ptlrpc/recov_thread.c | 84 +++++++++++++++++++++++++-----------
 2 files changed, 60 insertions(+), 26 deletions(-)

diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c
index 12acb77f11..101c2346a1 100644
--- a/lustre/obdfilter/filter.c
+++ b/lustre/obdfilter/filter.c
@@ -2232,7 +2232,7 @@ static int filter_llog_finish(struct obd_device *obd, int count)
 
         if (obd->u.filter.fo_lcm) { 
                 llog_cleanup_commit_master((struct llog_commit_master *)
-                                           obd->u.filter.fo_lcm, 0);
+                                           obd->u.filter.fo_lcm, 1);
                 OBD_FREE(obd->u.filter.fo_lcm, 
                          sizeof(struct llog_commit_master));
                 obd->u.filter.fo_lcm = NULL;
diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c
index 1417bd2c6d..90af251497 100644
--- a/lustre/ptlrpc/recov_thread.c
+++ b/lustre/ptlrpc/recov_thread.c
@@ -129,14 +129,49 @@ static void llcd_put(struct llog_canceld_ctxt *llcd)
 static void llcd_send(struct llog_canceld_ctxt *llcd)
 {
         if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
-        spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+                spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
                 list_add_tail(&llcd->llcd_list,
                               &llcd->llcd_lcm->lcm_llcd_pending);
-        spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+                spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
         }
         cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
 }
 
+/**
+ * Grab llcd and assign it to passed @ctxt. Also set up backward link
+ * and get ref on @ctxt.
+ */
+static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt)
+{
+        struct llog_canceld_ctxt *llcd;
+
+        LASSERT_SEM_LOCKED(&ctxt->loc_sem);
+        llcd = llcd_grab(ctxt->loc_lcm);
+        if (llcd == NULL)
+                return NULL;
+
+        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
+        ctxt->loc_llcd = llcd;
+
+        CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+        return llcd;
+}
+
+/**
+ * Put llcd in passed @ctxt. Set ->loc_llcd to NULL.
+ */
+static void ctxt_llcd_put(struct llog_ctxt *ctxt)
+{
+        mutex_down(&ctxt->loc_sem);
+        if (ctxt->loc_llcd != NULL) {
+                CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+                llcd_put(ctxt->loc_llcd);
+                ctxt->loc_llcd = NULL;
+        }
+        ctxt->loc_imp = NULL;
+        mutex_up(&ctxt->loc_sem);
+}
+
 /* deleted objects have a commit callback that cancels the MDS
  * log record for the deletion.  The commit callback calls this
  * function
@@ -152,16 +187,16 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         LASSERT(ctxt);
 
         mutex_down(&ctxt->loc_sem);
+        llcd = ctxt->loc_llcd;
+
         if (ctxt->loc_imp == NULL) {
                 CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
                 GOTO(out, rc = 0);
         }
 
-        llcd = ctxt->loc_llcd;
-
         if (count > 0 && cookies != NULL) {
                 if (llcd == NULL) {
-                        llcd = llcd_grab(ctxt->loc_lcm);
+                        llcd = ctxt_llcd_grab(ctxt);
                         if (llcd == NULL) {
                                 CERROR("couldn't get an llcd - dropped "LPX64
                                        ":%x+%u\n",
@@ -170,8 +205,6 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                                        cookies->lgc_index);
                                 GOTO(out, rc = -ENOMEM);
                         }
-                        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
-                        ctxt->loc_llcd = llcd;
                 }
 
                 memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, 
@@ -200,19 +233,20 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         ENTRY;
 
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
-                CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n",
-                       ctxt->loc_llcd, ctxt);
-                mutex_down(&ctxt->loc_sem);
-                if (ctxt->loc_llcd != NULL) {
-                        llcd_put(ctxt->loc_llcd);
-                        ctxt->loc_llcd = NULL;
-                }
-                ctxt->loc_imp = NULL;
-                mutex_up(&ctxt->loc_sem);
+                CDEBUG(D_RPCTRACE,"reverse import disconnect\n");
+                /* 
+                 * We put llcd because it is not going to sending list and
+                 * thus, its refc will not be handled. We will handle it here.
+                 */
+                ctxt_llcd_put(ctxt);
         } else {
+                /* 
+                 * Sending cancel. This means that ctxt->loc_llcd wil be
+                 * put on sending list in llog_obd_repl_cancel() and in
+                 * this case recovery thread will take care of it refc.
+                 */
                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
         }
-
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_obd_repl_sync);
@@ -576,15 +610,16 @@ static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
         mutex_down(&llpa.llpa_sem);
         llpa.llpa_cb = handle;
         llpa.llpa_arg = arg;
-        llpa.llpa_ctxt = llog_ctxt_get(ctxt); //llog_group_get_ctxt(ctxt->loc_olg, ctxt->loc_idx);
+        llpa.llpa_ctxt = llog_ctxt_get(ctxt);
         if (!llpa.llpa_ctxt) {
                 up(&llpa.llpa_sem);
                 RETURN(-ENODEV);
         }
         rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
-        if (rc < 0)
+        if (rc < 0) {
+                llog_ctxt_put(ctxt);
                 CERROR("error starting log_process_thread: %d\n", rc);
-        else {
+        } else {
                 CDEBUG(D_HA, "log_process_thread: %d\n", rc);
                 rc = 0;
         }
@@ -608,20 +643,19 @@ int llog_repl_connect(struct llog_ctxt *ctxt, int count,
 
         mutex_down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
-        llcd = llcd_grab(ctxt->loc_lcm);
+        llcd = ctxt_llcd_grab(ctxt);
         if (llcd == NULL) {
                 CERROR("couldn't get an llcd\n");
                 mutex_up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
-        llcd->llcd_ctxt = llog_ctxt_get(ctxt);
-        ctxt->loc_llcd = llcd;
         mutex_up(&ctxt->loc_sem);
 
         rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
-        if (rc != 0)
+        if (rc != 0) {
+                ctxt_llcd_put(ctxt);
                 CERROR("error recovery process: %d\n", rc);
-
+        }
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_repl_connect);
-- 
GitLab