diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index cdca3f6c3dec18826dcff6595059985824a18db1..1238ae94158ba6bb279d6fdd8b2e7521a35a803a 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -640,7 +640,7 @@ struct ptlrpc_service { struct ptlrpcd_ctl { /** - * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_STOP_FORCE) + * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_FORCE) */ unsigned long pc_flags; /** @@ -695,10 +695,11 @@ enum ptlrpcd_ctl_flags { */ LIOD_STOP = 1 << 1, /** - * Ptlrpc thread stop force flag. This will cause also - * aborting any inflight rpcs handled by thread. + * Ptlrpc thread force flag (only stop force so far). + * This will cause aborting any inflight rpcs handled + * by thread if LIOD_STOP is specified. */ - LIOD_STOP_FORCE = 1 << 2 + LIOD_FORCE = 1 << 2 }; /* ptlrpc/events.c */ diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index a5e7297c05d852e22816fcd24e713d89eb0416bc..2102677313b21474db915e54b0f8fdc1179bb6e6 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -769,8 +769,9 @@ struct obd_import *class_import_get(struct obd_import *import) LASSERT(atomic_read(&import->imp_refcount) >= 0); LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); atomic_inc(&import->imp_refcount); - CDEBUG(D_INFO, "import %p refcount=%d\n", import, - atomic_read(&import->imp_refcount)); + CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import, + atomic_read(&import->imp_refcount), + import->imp_obd->obd_name); return import; } EXPORT_SYMBOL(class_import_get); @@ -783,13 +784,12 @@ void class_import_put(struct obd_import *import) LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); LASSERT(list_empty(&import->imp_zombie_chain)); - CDEBUG(D_INFO, "import %p refcount=%d\n", import, - atomic_read(&import->imp_refcount) - 1); + CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import, + atomic_read(&import->imp_refcount) - 1, + import->imp_obd->obd_name); if (atomic_dec_and_test(&import->imp_refcount)) { - CDEBUG(D_INFO, "final put import %p\n", import); - spin_lock(&obd_zombie_impexp_lock); list_add(&import->imp_zombie_chain, &obd_zombie_imports); spin_unlock(&obd_zombie_impexp_lock); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 552f1b57c6a949c2d14e5414ac1bc3fbb953720d..c2fccef5a7959f1931775891700f6ed784a83586 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2011,6 +2011,7 @@ static int filter_llog_finish(struct obd_device *obd, int count) * not come being marked rq_no_resend = 1. */ llog_sync(ctxt, NULL); + /* * Balance class_import_get() called in llog_receptor_accept(). * This is safe to do here, as llog is already synchronized and diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 528cf937b911fcb2da91e6de696258ee657da1c2..f425e516aa75e362e844c5e25848aa76f514c671 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -2243,24 +2243,24 @@ void ptlrpc_abort_inflight(struct obd_import *imp) void ptlrpc_abort_set(struct ptlrpc_request_set *set) { - struct list_head *tmp, *n; + struct list_head *tmp, *pos; LASSERT(set != NULL); - list_for_each_safe(tmp, n, &set->set_requests) { + list_for_each_safe(pos, tmp, &set->set_requests) { struct ptlrpc_request *req = - list_entry(tmp, struct ptlrpc_request, rq_set_chain); + list_entry(pos, struct ptlrpc_request, rq_set_chain); - spin_lock (&req->rq_lock); + spin_lock(&req->rq_lock); if (req->rq_phase != RQ_PHASE_RPC) { - spin_unlock (&req->rq_lock); + spin_unlock(&req->rq_lock); continue; } req->rq_err = 1; req->rq_status = -EINTR; ptlrpc_client_wake_req(req); - spin_unlock (&req->rq_lock); + spin_unlock(&req->rq_lock); } } diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 154db42301da0e14521f3d12845cafd0d238f423..ff316c5c6a8659965f7e1cb05ebbe6b171fb5f6b 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -112,9 +112,6 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) int rc = 0; ENTRY; - if (test_bit(LIOD_STOP, &pc->pc_flags)) - RETURN(1); - spin_lock(&pc->pc_set->set_new_req_lock); list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); @@ -167,7 +164,7 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) static int ptlrpcd(void *arg) { struct ptlrpcd_ctl *pc = arg; - int rc; + int rc, exit = 0; ENTRY; if ((rc = cfs_daemonize_ctxt(pc->pc_name))) { @@ -183,7 +180,7 @@ static int ptlrpcd(void *arg) * there are requests in the set. New requests come in on the set's * new_req_list and ptlrpcd_check() moves them into the set. */ - while (1) { + do { struct l_wait_info lwi; int timeout; @@ -196,12 +193,17 @@ static int ptlrpcd(void *arg) /* * Abort inflight rpcs for forced stop case. */ - if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags)) - ptlrpc_abort_set(pc->pc_set); + if (test_bit(LIOD_STOP, &pc->pc_flags)) { + if (test_bit(LIOD_FORCE, &pc->pc_flags)) + ptlrpc_abort_set(pc->pc_set); + exit++; + } - if (test_bit(LIOD_STOP, &pc->pc_flags)) - break; - } + /* + * Let's make one more loop to make sure that ptlrpcd_check() + * copied all raced new rpcs into the set so we can kill them. + */ + } while (exit < 2); /* * Wait for inflight requests to drain. @@ -213,6 +215,7 @@ static int ptlrpcd(void *arg) out: clear_bit(LIOD_START, &pc->pc_flags); clear_bit(LIOD_STOP, &pc->pc_flags); + clear_bit(LIOD_FORCE, &pc->pc_flags); return 0; } @@ -308,7 +311,7 @@ void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force) set_bit(LIOD_STOP, &pc->pc_flags); if (force) - set_bit(LIOD_STOP_FORCE, &pc->pc_flags); + set_bit(LIOD_FORCE, &pc->pc_flags); cfs_waitq_signal(&pc->pc_set->set_waitq); #ifdef __KERNEL__ wait_for_completion(&pc->pc_finishing); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 4f2f74483aba7f4b4de80cd826f18c98b0e6c142..0d46150a3fe09b643914b3c35e30f5b392b52068 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -76,15 +76,27 @@ enum { LLOG_LCM_FL_EXIT = 1 << 1 }; +static void llcd_print(struct llog_canceld_ctxt *llcd, + const char *func, int line) +{ + CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); + CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); + CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); + CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); + CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); +} + /** * Allocate new llcd from cache, init it and return to caller. * Bumps number of objects allocated. */ -static struct llog_canceld_ctxt *llcd_alloc(void) +static struct llog_canceld_ctxt *llcd_alloc(struct llog_commit_master *lcm) { struct llog_canceld_ctxt *llcd; int llcd_size; + LASSERT(lcm != NULL); + /* * Payload of lustre_msg V2 is bigger. */ @@ -98,7 +110,17 @@ static struct llog_canceld_ctxt *llcd_alloc(void) CFS_INIT_LIST_HEAD(&llcd->llcd_list); llcd->llcd_size = llcd_size; llcd->llcd_cookiebytes = 0; + + spin_lock(&lcm->lcm_lock); + llcd->llcd_lcm = lcm; + atomic_inc(&lcm->lcm_count); + list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); + spin_unlock(&lcm->lcm_lock); atomic_inc(&llcd_count); + + CDEBUG(D_RPCTRACE, "Alloc llcd %p on lcm %p (%d)\n", + llcd, lcm, atomic_read(&lcm->lcm_count)); + return llcd; } @@ -107,9 +129,27 @@ static struct llog_canceld_ctxt *llcd_alloc(void) */ static void llcd_free(struct llog_canceld_ctxt *llcd) { + struct llog_commit_master *lcm = llcd->llcd_lcm; + + if (lcm) { + if (atomic_read(&lcm->lcm_count) == 0) { + CERROR("Invalid llcd free %p\n", llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + spin_lock(&lcm->lcm_lock); + LASSERT(!list_empty(&llcd->llcd_list)); + list_del_init(&llcd->llcd_list); + atomic_dec(&lcm->lcm_count); + spin_unlock(&lcm->lcm_lock); + } + + CDEBUG(D_RPCTRACE, "Free llcd %p on lcm %p (%d)\n", + llcd, lcm, atomic_read(&lcm->lcm_count)); + LASSERT(atomic_read(&llcd_count) > 0); - OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); atomic_dec(&llcd_count); + OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); } /** @@ -128,22 +168,12 @@ static void llcd_copy(struct llog_canceld_ctxt *llcd, * 1 if yes and 0 otherwise. */ static int llcd_fit(struct llog_canceld_ctxt *llcd, - struct llog_cookie *cookies) + struct llog_cookie *cookies) { return (llcd->llcd_size - llcd->llcd_cookiebytes) >= sizeof(*cookies); } -static void llcd_print(struct llog_canceld_ctxt *llcd, - const char *func, int line) -{ - CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); - CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); - CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); - CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); - CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); -} - /** * Llcd completion function. Called uppon llcd send finish regardless * sending result. Error is passed in @rc. Note, that this will be called @@ -153,7 +183,7 @@ static int llcd_interpret(struct ptlrpc_request *req, void *noused, int rc) { struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0]; - CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc); + CDEBUG(D_RPCTRACE, "Sent llcd %p (%d) - killing it\n", llcd, rc); llcd_free(llcd); return 0; } @@ -255,21 +285,15 @@ exit: static int llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) { - struct llog_commit_master *lcm; - LASSERT(ctxt != NULL && llcd != NULL); LASSERT_SEM_LOCKED(&ctxt->loc_sem); LASSERT(ctxt->loc_llcd == NULL); - lcm = ctxt->loc_lcm; - spin_lock(&lcm->lcm_lock); - atomic_inc(&lcm->lcm_count); - list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds); - spin_unlock(&lcm->lcm_lock); - CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n", - llcd, ctxt, atomic_read(&lcm->lcm_count)); llcd->llcd_ctxt = llog_ctxt_get(ctxt); - llcd->llcd_lcm = ctxt->loc_lcm; ctxt->loc_llcd = llcd; + + CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p\n", + llcd, ctxt); + return 0; } @@ -279,7 +303,6 @@ llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) */ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; LASSERT(ctxt != NULL); @@ -289,22 +312,10 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) if (!llcd) return NULL; - lcm = ctxt->loc_lcm; - if (atomic_read(&lcm->lcm_count) == 0) { - CERROR("Invalid detach occured %p:%p\n", ctxt, llcd); - llcd_print(llcd, __FUNCTION__, __LINE__); - LBUG(); - } - spin_lock(&lcm->lcm_lock); - LASSERT(!list_empty(&llcd->llcd_list)); - list_del_init(&llcd->llcd_list); - atomic_dec(&lcm->lcm_count); - spin_unlock(&lcm->lcm_lock); - ctxt->loc_llcd = NULL; - - CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", - llcd, ctxt, atomic_read(&lcm->lcm_count)); + CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p\n", + llcd, ctxt); + ctxt->loc_llcd = NULL; llog_ctxt_put(ctxt); return llcd; } @@ -317,9 +328,9 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) { struct llog_canceld_ctxt *llcd; - llcd = llcd_alloc(); + llcd = llcd_alloc(ctxt->loc_lcm); if (!llcd) { - CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt); + CERROR("Can't alloc an llcd for ctxt %p\n", ctxt); return NULL; } llcd_attach(ctxt, llcd); @@ -331,10 +342,8 @@ static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) */ static void llcd_put(struct llog_ctxt *ctxt) { - struct llog_commit_master *lcm; struct llog_canceld_ctxt *llcd; - lcm = ctxt->loc_lcm; llcd = llcd_detach(ctxt); if (llcd) llcd_free(llcd); @@ -424,6 +433,18 @@ void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) llcd_print(llcd, __FUNCTION__, __LINE__); } spin_unlock(&lcm->lcm_lock); + + /* + * No point to go further with busy llcds at this point + * as this is clear bug. It might mean we got hanging + * rpc which holds import ref and this means we will not + * be able to cleanup anyways. + * + * Or we just missed to kill them when they were not + * attached to ctxt. In this case our slab will remind + * us about this a bit later. + */ + LBUG(); } EXIT; } @@ -627,6 +648,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, * then do it. */ if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) { + CDEBUG(D_RPCTRACE, "Sync llcd %p\n", llcd); rc = llcd_push(ctxt); if (rc) GOTO(out, rc);