diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index b81e34d286ff98b557a19d40fe0c910ffc0eedaa..66ebc179b5ea5febe7f3678d71ca3bc4457de969 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -285,6 +285,10 @@ struct llog_commit_master { * Thread control structure. Used for control commit thread. */ struct ptlrpcd_ctl lcm_pc; + /** + * Busy resources waitq + */ + cfs_waitq_t lcm_waitq; /** * Commit thread name buffer. Only used for thread start. */ diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 188574b3df5332f5d6e37c9340938bbf6b506b70..60a4aaf1d320180ee477e153352f6d3375150425 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -106,6 +106,7 @@ static struct llog_canceld_ctxt *llcd_alloc(void) */ static void llcd_free(struct llog_canceld_ctxt *llcd) { + LASSERT(atomic_read(&llcd_count) > 0); OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); atomic_dec(&llcd_count); } @@ -319,11 +320,16 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) */ static void llcd_put(struct llog_ctxt *ctxt) { + struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; + lcm = ctxt->loc_lcm; llcd = llcd_detach(ctxt); if (llcd) llcd_free(llcd); + + if (atomic_read(&lcm->lcm_count) == 0) + cfs_waitq_signal(&lcm->lcm_waitq); } /** @@ -352,8 +358,6 @@ static int llcd_push(struct llog_ctxt *ctxt) return rc; } -static atomic_t llog_tcount = ATOMIC_INIT(0); - /** * Start recovery thread which actually deals llcd sending. This * is all ptlrpc standard thread based so there is not much of work @@ -371,8 +375,6 @@ int llog_recov_thread_start(struct llog_commit_master *lcm) RETURN(rc); } lcm->lcm_set = lcm->lcm_pc.pc_set; - atomic_inc(&llog_tcount); - RETURN(rc); } EXPORT_SYMBOL(llog_recov_thread_start); @@ -382,8 +384,9 @@ EXPORT_SYMBOL(llog_recov_thread_start); */ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) { + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); ENTRY; - + /** * Let all know that we're stopping. This will also make * llcd_send() refuse any new llcds. @@ -395,13 +398,13 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) * for processing now. */ ptlrpcd_stop(&lcm->lcm_pc, force); - + /* - * No llcds on this @lcm should left. + * Wait for llcd number == 0. Note, this is infinite wait. + * All other parts should make sure that no lost llcd is left. */ - LASSERTF(atomic_read(&lcm->lcm_count) == 0, - "Busy llcds found on lcm %p - (%d)\n", - lcm, atomic_read(&lcm->lcm_count)); + l_wait_event(lcm->lcm_waitq, + atomic_read(&lcm->lcm_count) == 0, &lwi); EXIT; } EXPORT_SYMBOL(llog_recov_thread_stop); @@ -420,13 +423,13 @@ struct llog_commit_master *llog_recov_thread_init(char *name) RETURN(NULL); /* - * Try to create threads with unique names and user id. + * Try to create threads with unique names. */ snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), - "ll_log_commit_%s_%02d", name, - atomic_read(&llog_tcount)); + "ll_log_commit_%s", name); strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name)); + cfs_waitq_init(&lcm->lcm_waitq); atomic_set(&lcm->lcm_count, 0); rc = llog_recov_thread_start(lcm); if (rc) { @@ -452,8 +455,8 @@ void llog_recov_thread_fini(struct llog_commit_master *lcm, int force) } EXPORT_SYMBOL(llog_recov_thread_fini); -static int llog_obd_repl_generic(struct llog_ctxt *ctxt, - void *handle, void *arg) +static int llog_recov_thread_replay(struct llog_ctxt *ctxt, + void *cb, void *arg) { struct obd_device *obd = ctxt->loc_obd; struct llog_process_cat_args *lpca; @@ -470,7 +473,7 @@ static int llog_obd_repl_generic(struct llog_ctxt *ctxt, if (!lpca) RETURN(-ENOMEM); - lpca->lpca_cb = handle; + lpca->lpca_cb = cb; lpca->lpca_arg = arg; /* @@ -495,40 +498,27 @@ static int llog_obd_repl_generic(struct llog_ctxt *ctxt, RETURN(rc); } -int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, +int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, struct llog_logid *logid, struct llog_gen *gen, struct obd_uuid *uuid) { - struct llog_canceld_ctxt *llcd; int rc; ENTRY; - mutex_down(&ctxt->loc_sem); - /* - * Send back cached llcd before recovery from llog if we have any. + * Send back cached llcd from llog before recovery if we have any. + * This is void is nothing cached is found there. */ - if (ctxt->loc_llcd) { - CWARN("Llcd %p:%p is not empty\n", ctxt->loc_llcd, ctxt); - mutex_up(&ctxt->loc_sem); - llog_sync(ctxt, NULL); - mutex_down(&ctxt->loc_sem); - } - - llcd = llcd_get(ctxt); - if (!llcd) { - mutex_up(&ctxt->loc_sem); - RETURN(-ENOMEM); - } + llog_sync(ctxt, NULL); + /* + * Start recovery in separate thread. + */ + mutex_down(&ctxt->loc_sem); ctxt->loc_gen = *gen; - - rc = llog_obd_repl_generic(ctxt, ctxt->llog_proc_cb, logid); - if (rc != 0) { - llcd_put(ctxt); - CERROR("Error recovery process: %d\n", rc); - } + rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid); mutex_up(&ctxt->loc_sem); + RETURN(rc); } EXPORT_SYMBOL(llog_obd_repl_connect); @@ -542,6 +532,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags) { + struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; int rc = 0; ENTRY; @@ -549,6 +540,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, LASSERT(ctxt != NULL); mutex_down(&ctxt->loc_sem); + lcm = ctxt->loc_lcm; /* * Let's check if we have all structures alive. We also check for @@ -564,7 +556,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, GOTO(out, rc = -ENODEV); } - if (test_bit(LLOG_LCM_FL_EXIT, &ctxt->loc_lcm->lcm_flags)) { + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) { CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", ctxt); GOTO(out, rc = -ENODEV); @@ -580,6 +572,12 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, llcd = llcd_get(ctxt); if (!llcd) GOTO(out, rc = -ENOMEM); + /* + * Allocation is successful, let's check for stop + * flag again to fall back as soon as possible. + */ + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + GOTO(out, rc = -ENODEV); } /* @@ -593,6 +591,12 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, llcd = llcd_get(ctxt); if (!llcd) GOTO(out, rc = -ENOMEM); + /* + * Allocation is successful, let's check for stop + * flag again to fall back as soon as possible. + */ + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + GOTO(out, rc = -ENODEV); } /* @@ -611,6 +615,8 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, } EXIT; out: + if (rc) + llcd_put(ctxt); mutex_up(&ctxt->loc_sem); return rc; } @@ -621,16 +627,17 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) int rc = 0; ENTRY; + mutex_down(&ctxt->loc_sem); if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { CDEBUG(D_RPCTRACE, "Reverse import disconnect\n"); /* * Check for llcd which might be left attached to @ctxt. * Let's kill it. */ - mutex_down(&ctxt->loc_sem); llcd_put(ctxt); mutex_up(&ctxt->loc_sem); } else { + mutex_up(&ctxt->loc_sem); rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); } RETURN(rc); @@ -670,8 +677,6 @@ int llog_recov_init(void) */ void llog_recov_fini(void) { - int count; - /* * Kill llcd cache when thread is stopped and we're sure no * llcd in use left. @@ -681,9 +686,9 @@ void llog_recov_fini(void) * In 2.6.22 cfs_mem_cache_destroy() will not return error * for busy resources. Let's check it another way. */ - count = atomic_read(&llcd_count); - LASSERTF(count == 0, "Can't destroy llcd cache! Number of " - "busy llcds: %d\n", count); + LASSERTF(atomic_read(&llcd_count) == 0, + "Can't destroy llcd cache! Number of " + "busy llcds: %d\n", atomic_read(&llcd_count)); cfs_mem_cache_destroy(llcd_cache); llcd_cache = NULL; }