diff --git a/lustre/include/lustre_import.h b/lustre/include/lustre_import.h index 3fde7b2f223736948791b71856f6c5f285b4a45f..b594b855be1e59c8c3bc0f42504c6dd706c13ab9 100644 --- a/lustre/include/lustre_import.h +++ b/lustre/include/lustre_import.h @@ -126,6 +126,7 @@ struct obd_import { cfs_waitq_t imp_recovery_waitq; atomic_t imp_inflight; + atomic_t imp_unregistering; atomic_t imp_replay_inflight; atomic_t imp_inval_count; enum lustre_imp_state imp_state; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 363a0b8b4e4e748d078f6249f8428997372577bc..34b5831fefa0916716001275d0f6f2fd5d90684a 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -58,6 +58,8 @@ #include <lu_object.h> #include <lustre_req_layout.h> +#include <obd_support.h> + /* MD flags we _always_ use */ #define PTLRPC_MD_OPTIONS 0 @@ -291,11 +293,13 @@ struct ptlrpc_reply_state { struct ptlrpc_thread; enum rq_phase { - RQ_PHASE_NEW = 0xebc0de00, - RQ_PHASE_RPC = 0xebc0de01, - RQ_PHASE_BULK = 0xebc0de02, - RQ_PHASE_INTERPRET = 0xebc0de03, - RQ_PHASE_COMPLETE = 0xebc0de04, + RQ_PHASE_NEW = 0xebc0de00, + RQ_PHASE_RPC = 0xebc0de01, + RQ_PHASE_BULK = 0xebc0de02, + RQ_PHASE_INTERPRET = 0xebc0de03, + RQ_PHASE_COMPLETE = 0xebc0de04, + RQ_PHASE_UNREGISTERING = 0xebc0de05, + RQ_PHASE_UNDEFINED = 0xebc0de06 }; /** Type of request interpreter call-back */ @@ -347,6 +351,7 @@ struct ptlrpc_request { rq_sent_final:1; /* stop sending early replies */ enum rq_phase rq_phase; /* one of RQ_PHASE_* */ + enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ atomic_t rq_refcount; /* client-side refcount for SENT race, server-side refcounf for multiple replies */ @@ -454,6 +459,7 @@ struct ptlrpc_request { volatile time_t rq_deadline; /* when request must finish. volatile so that servers' early reply updates to the deadline aren't kept in per-cpu cache */ + time_t rq_reply_deadline; /* when req reply unlink must finish. */ int rq_timeout; /* service time estimate (secs) */ /* Multi-rpc bits */ @@ -505,9 +511,9 @@ static inline int lustre_rep_swabbed(struct ptlrpc_request *req, int index) } static inline const char * -ptlrpc_rqphase2str(const struct ptlrpc_request *req) +ptlrpc_phase2str(enum rq_phase phase) { - switch (req->rq_phase) { + switch (phase) { case RQ_PHASE_NEW: return "New"; case RQ_PHASE_RPC: @@ -518,11 +524,19 @@ ptlrpc_rqphase2str(const struct ptlrpc_request *req) return "Interpret"; case RQ_PHASE_COMPLETE: return "Complete"; + case RQ_PHASE_UNREGISTERING: + return "Unregistering"; default: return "?Phase?"; } } +static inline const char * +ptlrpc_rqphase2str(struct ptlrpc_request *req) +{ + return ptlrpc_phase2str(req->rq_phase); +} + /* Spare the preprocessor, spoil the bugs. */ #define FLAG(field, str) (field ? str : "") @@ -846,29 +860,9 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name, void ptlrpc_cleanup_client(struct obd_import *imp); struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid); -static inline int -ptlrpc_client_recv_or_unlink (struct ptlrpc_request *req) -{ - int rc; - - spin_lock(&req->rq_lock); - rc = req->rq_receiving_reply || req->rq_must_unlink; - spin_unlock(&req->rq_lock); - return (rc); -} - -static inline void -ptlrpc_wake_client_req (struct ptlrpc_request *req) -{ - if (req->rq_set == NULL) - cfs_waitq_signal(&req->rq_reply_waitq); - else - cfs_waitq_signal(&req->rq_set->set_waitq); -} - int ptlrpc_queue_wait(struct ptlrpc_request *req); int ptlrpc_replay_req(struct ptlrpc_request *req); -void ptlrpc_unregister_reply(struct ptlrpc_request *req); +int ptlrpc_unregister_reply(struct ptlrpc_request *req, int async); void ptlrpc_restart_req(struct ptlrpc_request *req); void ptlrpc_abort_inflight(struct obd_import *imp); void ptlrpc_abort_set(struct ptlrpc_request_set *set); @@ -1076,6 +1070,81 @@ lustre_shrink_reply(struct ptlrpc_request *req, int segment, newlen, move_data); } +static inline void +ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase) +{ + if (req->rq_phase == new_phase) + return; + + if (new_phase == RQ_PHASE_UNREGISTERING) { + req->rq_next_phase = req->rq_phase; + if (req->rq_import) + atomic_inc(&req->rq_import->imp_unregistering); + } + + if (req->rq_phase == RQ_PHASE_UNREGISTERING) { + if (req->rq_import) + atomic_dec(&req->rq_import->imp_unregistering); + } + + DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", + ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase)); + + req->rq_phase = new_phase; +} + +static inline int +ptlrpc_client_early(struct ptlrpc_request *req) +{ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + req->rq_reply_deadline > cfs_time_current_sec()) + return 0; + return req->rq_early; +} + +static inline int +ptlrpc_client_replied(struct ptlrpc_request *req) +{ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + req->rq_reply_deadline > cfs_time_current_sec()) + return 0; + return req->rq_replied; +} + +static inline int +ptlrpc_client_recv(struct ptlrpc_request *req) +{ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + req->rq_reply_deadline > cfs_time_current_sec()) + return 1; + return req->rq_receiving_reply; +} + +static inline int +ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req) +{ + int rc; + + spin_lock(&req->rq_lock); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + req->rq_reply_deadline > cfs_time_current_sec()) { + spin_unlock(&req->rq_lock); + return 1; + } + rc = req->rq_receiving_reply || req->rq_must_unlink; + spin_unlock(&req->rq_lock); + return rc; +} + +static inline void +ptlrpc_client_wake_req(struct ptlrpc_request *req) +{ + if (req->rq_set == NULL) + cfs_waitq_signal(&req->rq_reply_waitq); + else + cfs_waitq_signal(&req->rq_set->set_waitq); +} + static inline void ptlrpc_rs_addref(struct ptlrpc_reply_state *rs) { diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 14bd5a8f50e08026b166c81a759d8d5e80ddebbe..2a8389dd87b8607b26263b53de19511fbf3032e2 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -288,6 +288,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_PTLRPC_PAUSE_REP 0x50c #define OBD_FAIL_PTLRPC_DUMP_LOG 0x50e +#define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f #define OBD_FAIL_OBD_PING_NET 0x600 #define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601 diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 3051655999cf81fa93e69bf333980cd941ba9eb6..5eeae543928bd1f3f7763fc4f223b9fd51f3b6a6 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -917,6 +917,7 @@ struct obd_import *class_new_import(struct obd_device *obd) cfs_waitq_init(&imp->imp_recovery_waitq); atomic_set(&imp->imp_refcount, 2); + atomic_set(&imp->imp_unregistering, 0); atomic_set(&imp->imp_inflight, 0); atomic_set(&imp->imp_replay_inflight, 0); atomic_set(&imp->imp_inval_count, 0); diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 393c008ce09ef55bd85de3ca33f679fe5c9c7f64..e13ed69b90228f3ce564c3ef1111b0cb776e9fed 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -686,6 +686,7 @@ int lprocfs_rd_import(char *page, char **start, off_t off, int count, " target: %s@%s\n" " state: %s\n" " inflight: %u\n" + " unregistering: %u\n" " conn_cnt: %u\n" " generation: %u\n" " inval_cnt: %u\n" @@ -697,6 +698,7 @@ int lprocfs_rd_import(char *page, char **start, off_t off, int count, obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid, imp_state_name, atomic_read(&imp->imp_inflight), + atomic_read(&imp->imp_unregistering), imp->imp_conn_cnt, imp->imp_generation, atomic_read(&imp->imp_inval_count), diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index de5f94bc74a50a112cfd226a85a5d1353ec2a875..69259bbc436501c241c86e1baa70d1217438a4c5 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -407,8 +407,9 @@ void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) return; } -struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize, - void (*populate_pool)(struct ptlrpc_request_pool *, int)) +struct ptlrpc_request_pool * +ptlrpc_init_rq_pool(int num_rq, int msgsize, + void (*populate_pool)(struct ptlrpc_request_pool *, int)) { struct ptlrpc_request_pool *pool; @@ -434,7 +435,8 @@ struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize, return pool; } -static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) +static struct ptlrpc_request * +ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) { struct ptlrpc_request *request; struct lustre_msg *reqbuf; @@ -476,6 +478,7 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request) spin_lock(&pool->prp_lock); LASSERT(list_empty(&request->rq_list)); + LASSERT(!request->rq_receiving_reply); list_add_tail(&request->rq_list, &pool->prp_req_list); spin_unlock(&pool->prp_lock); } @@ -517,7 +520,9 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request, request->rq_reply_cbid.cbid_fn = reply_in_callback; request->rq_reply_cbid.cbid_arg = request; + request->rq_reply_deadline = 0; request->rq_phase = RQ_PHASE_NEW; + request->rq_next_phase = RQ_PHASE_UNDEFINED; request->rq_request_portal = imp->imp_client->cli_request_portal; request->rq_reply_portal = imp->imp_client->cli_reply_portal; @@ -878,12 +883,12 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) /* serialise with network callback */ spin_lock(&req->rq_lock); - if (req->rq_replied) + if (ptlrpc_client_replied(req)) GOTO(out, rc = 1); if (req->rq_net_err && !req->rq_timedout) { spin_unlock(&req->rq_lock); - rc = ptlrpc_expire_one_request(req); + rc = ptlrpc_expire_one_request(req, 0); spin_lock(&req->rq_lock); GOTO(out, rc); } @@ -897,7 +902,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) if (req->rq_restart) GOTO(out, rc = 1); - if (req->rq_early) { + if (ptlrpc_client_early(req)) { ptlrpc_at_recv_early_reply(req); GOTO(out, rc = 0); /* keep waiting */ } @@ -1062,7 +1067,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) if (req->rq_sent && (req->rq_sent > cfs_time_current_sec())) RETURN (0); - req->rq_phase = RQ_PHASE_RPC; + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); imp = req->rq_import; spin_lock(&imp->imp_lock); @@ -1089,7 +1094,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) if (rc != 0) { spin_unlock(&imp->imp_lock); req->rq_status = rc; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); RETURN(rc); } @@ -1147,6 +1152,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) ptlrpc_send_new_req(req)) { force_timer_recalc = 1; } + /* delayed send - skip */ if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent) continue; @@ -1154,31 +1160,64 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (!(req->rq_phase == RQ_PHASE_RPC || req->rq_phase == RQ_PHASE_BULK || req->rq_phase == RQ_PHASE_INTERPRET || + req->rq_phase == RQ_PHASE_UNREGISTERING || req->rq_phase == RQ_PHASE_COMPLETE)) { DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase); LBUG(); } + if (req->rq_phase == RQ_PHASE_UNREGISTERING) { + LASSERT(req->rq_next_phase != req->rq_phase); + LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED); + + /* + * Skip processing until reply is unlinked. We + * can't return to pool before that and we can't + * call interpret before that. We need to make + * sure that all rdma transfers finished and will + * not corrupt any data. + */ + if (ptlrpc_client_recv_or_unlink(req)) + continue; + + /* + * Turn fail_loc off to prevent it from looping + * forever. + */ + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK | + OBD_FAIL_ONCE); + + /* + * Move to next phase if reply was successfully + * unlinked. + */ + ptlrpc_rqphase_move(req, req->rq_next_phase); + } + if (req->rq_phase == RQ_PHASE_COMPLETE) continue; if (req->rq_phase == RQ_PHASE_INTERPRET) GOTO(interpret, req->rq_status); - if (req->rq_net_err && !req->rq_timedout) - ptlrpc_expire_one_request(req); + /* + * Note that this also will start async reply unlink. + */ + if (req->rq_net_err && !req->rq_timedout) { + ptlrpc_expire_one_request(req, 1); + + /* + * Check if we still need to wait for unlink. + */ + if (ptlrpc_client_recv_or_unlink(req)) + continue; + } if (req->rq_err) { - ptlrpc_unregister_reply(req); req->rq_replied = 0; if (req->rq_status == 0) req->rq_status = -EIO; - req->rq_phase = RQ_PHASE_INTERPRET; - - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } @@ -1189,15 +1228,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) * interrupted rpcs after they have timed out */ if (req->rq_intr && (req->rq_timedout || req->rq_waiting || req->rq_wait_ctx)) { - /* NB could be on delayed list */ - ptlrpc_unregister_reply(req); req->rq_status = -EINTR; - req->rq_phase = RQ_PHASE_INTERPRET; - - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } @@ -1206,7 +1238,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) req->rq_waiting || req->rq_wait_ctx) { int status; - ptlrpc_unregister_reply(req); + if (!ptlrpc_unregister_reply(req, 1)) + continue; spin_lock(&imp->imp_lock); @@ -1215,19 +1248,22 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) continue; } - list_del_init(&req->rq_list); if (status != 0) { req->rq_status = status; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, + RQ_PHASE_INTERPRET); spin_unlock(&imp->imp_lock); GOTO(interpret, req->rq_status); } if (req->rq_no_resend && !req->rq_wait_ctx) { req->rq_status = -ENOTCONN; - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, + RQ_PHASE_INTERPRET); spin_unlock(&imp->imp_lock); GOTO(interpret, req->rq_status); } + + list_del_init(&req->rq_list); list_add_tail(&req->rq_list, &imp->imp_sending_list); @@ -1244,7 +1280,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (req->rq_bulk) { __u64 old_xid = req->rq_xid; - ptlrpc_unregister_bulk (req); + ptlrpc_unregister_bulk(req); /* ensure previous bulk fails */ req->rq_xid = ptlrpc_next_xid(); @@ -1285,36 +1321,33 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_lock(&req->rq_lock); - if (req->rq_early) { + if (ptlrpc_client_early(req)) { ptlrpc_at_recv_early_reply(req); spin_unlock(&req->rq_lock); continue; } /* Still waiting for a reply? */ - if (req->rq_receiving_reply) { + if (ptlrpc_client_recv(req)) { spin_unlock(&req->rq_lock); continue; } /* Did we actually receive a reply? */ - if (!req->rq_replied) { + if (!ptlrpc_client_replied(req)) { spin_unlock(&req->rq_lock); continue; } spin_unlock(&req->rq_lock); - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - req->rq_status = after_reply(req); if (req->rq_resend) { /* Add this req to the delayed list so it can be errored if the import is evicted after recovery. */ spin_lock(&imp->imp_lock); + list_del_init(&req->rq_list); list_add_tail(&req->rq_list, &imp->imp_delayed_list); spin_unlock(&imp->imp_lock); @@ -1323,15 +1356,15 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) /* If there is no bulk associated with this request, * then we're done and should let the interpreter - * process the reply. Similarly if the RPC returned + * process the reply. Similarly if the RPC returned * an error, and therefore the bulk will never arrive. */ if (req->rq_bulk == NULL || req->rq_status != 0) { - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); GOTO(interpret, req->rq_status); } - req->rq_phase = RQ_PHASE_BULK; + ptlrpc_rqphase_move(req, RQ_PHASE_BULK); } LASSERT(req->rq_phase == RQ_PHASE_BULK); @@ -1347,15 +1380,22 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) LBUG(); } - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); interpret: LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); - LASSERT(!req->rq_receiving_reply); - ptlrpc_unregister_reply(req); + /* This moves to "unregistering" phase we need to wait for + * reply unlink. */ + if (!ptlrpc_unregister_reply(req, 1)) + continue; + if (req->rq_bulk != NULL) - ptlrpc_unregister_bulk (req); + ptlrpc_unregister_bulk(req); + + /* When calling interpret receiving already should be + * finished. */ + LASSERT(!req->rq_receiving_reply); if (req->rq_interpret_reply != NULL) { ptlrpc_interpterer_t interpreter = @@ -1364,7 +1404,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) &req->rq_async_args, req->rq_status); } - req->rq_phase = RQ_PHASE_COMPLETE; + ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:" "opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(), @@ -1373,7 +1413,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) libcfs_nid2str(imp->imp_connection->c_peer.nid), lustre_msg_get_opc(req->rq_reqmsg)); + spin_lock(&imp->imp_lock); + if (!list_empty(&req->rq_list)) + list_del_init(&req->rq_list); atomic_dec(&imp->imp_inflight); + spin_unlock(&imp->imp_lock); + set->set_remaining--; cfs_waitq_signal(&imp->imp_recovery_waitq); } @@ -1383,7 +1428,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) } /* Return 1 if we should give up, else 0 */ -int ptlrpc_expire_one_request(struct ptlrpc_request *req) +int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) { struct obd_import *imp = req->rq_import; int rc = 0; @@ -1412,7 +1457,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req) req->rq_timedout = 1; spin_unlock(&req->rq_lock); - ptlrpc_unregister_reply (req); + ptlrpc_unregister_reply(req, async_unlink); if (obd_dump_on_timeout) libcfs_debug_dumplog(); @@ -1465,28 +1510,33 @@ int ptlrpc_expired_set(void *data) LASSERT(set != NULL); - /* A timeout expired; see which reqs it applies to... */ + /* + * A timeout expired. See which reqs it applies to... + */ list_for_each (tmp, &set->set_requests) { struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - /* request in-flight? */ - if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting && - !req->rq_resend) || + /* Request in-flight? */ + if (!((req->rq_phase & + (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING) && + !req->rq_waiting && !req->rq_resend) || (req->rq_phase == RQ_PHASE_BULK))) continue; - - if (req->rq_timedout || /* already dealt with */ - req->rq_deadline > now) /* not expired */ + + if (req->rq_timedout || /* already dealt with */ + req->rq_deadline > now) /* not expired */ continue; - /* deal with this guy */ - ptlrpc_expire_one_request (req); + /* Deal with this guy. Do it asynchronously to not block + * ptlrpcd thread. */ + ptlrpc_expire_one_request(req, 1); } - /* When waiting for a whole set, we always to break out of the + /* + * When waiting for a whole set, we always to break out of the * sleep so we can recalculate the timeout, or enable interrupts - * iff everyone's timed out. + * if everyone's timed out. */ RETURN(1); } @@ -1510,14 +1560,17 @@ void ptlrpc_interrupted_set(void *data) struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - if (req->rq_phase != RQ_PHASE_RPC) + if (req->rq_phase != RQ_PHASE_RPC && + req->rq_phase != RQ_PHASE_UNREGISTERING) continue; ptlrpc_mark_interrupted(req); } } -/* get the smallest timeout in the set; this does NOT set a timeout. */ +/** + * Get the smallest timeout in the set; this does NOT set a timeout. + */ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) { struct list_head *tmp; @@ -1532,16 +1585,35 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) list_for_each(tmp, &set->set_requests) { req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - /* request in-flight? */ - if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) || + /* + * Request in-flight? + */ + if (!(((req->rq_phase & + (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) && + !req->rq_waiting) || (req->rq_phase == RQ_PHASE_BULK) || (req->rq_phase == RQ_PHASE_NEW))) continue; - if (req->rq_timedout) /* already timed out */ + /* + * Check those waiting for long reply unlink every one + * second. + */ + if (req->rq_phase == RQ_PHASE_UNREGISTERING) { + timeout = 1; + break; + } + + /* + * Already timed out. + */ + if (req->rq_timedout) continue; - if (req->rq_wait_ctx) /* waiting for ctx */ + /* + * Waiting for ctx. + */ + if (req->rq_wait_ctx) continue; if (req->rq_phase == RQ_PHASE_NEW) @@ -1737,23 +1809,54 @@ EXPORT_SYMBOL(ptlrpc_req_xid); * IDEMPOTENT, but _not_ safe against concurrent callers. * The request owner (i.e. the thread doing the I/O) must call... */ -void ptlrpc_unregister_reply (struct ptlrpc_request *request) +int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) { int rc; cfs_waitq_t *wq; struct l_wait_info lwi; - LASSERT(!in_interrupt ()); /* might sleep */ + /* + * Might sleep. + */ + LASSERT(!in_interrupt()); + + /* + * Let's setup deadline for reply unlink. + */ + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && + async && request->rq_reply_deadline == 0) + request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK; + + /* + * Nothing left to do. + */ if (!ptlrpc_client_recv_or_unlink(request)) - /* Nothing left to do */ - return; + RETURN(1); - LNetMDUnlink (request->rq_reply_md_h); + LNetMDUnlink(request->rq_reply_md_h); - /* We have to l_wait_event() whatever the result, to give liblustre - * a chance to run reply_in_callback(), and to make sure we've - * unlinked before returning a req to the pool */ + /* + * Let's check it once again. + */ + if (!ptlrpc_client_recv_or_unlink(request)) + RETURN(1); + + /* + * Move to "Unregistering" phase as reply was not unlinked yet. + */ + ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING); + /* + * Do not wait for unlink to finish. + */ + if (async) + RETURN(0); + + /* + * We have to l_wait_event() whatever the result, to give liblustre + * a chance to run reply_in_callback(), and to make sure we've + * unlinked before returning a req to the pool. + */ if (request->rq_set != NULL) wq = &request->rq_set->set_waitq; else @@ -1763,16 +1866,19 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request) /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL); - rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request), - &lwi); - if (rc == 0) - return; - - LASSERT (rc == -ETIMEDOUT); + rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), + &lwi); + if (rc == 0) { + ptlrpc_rqphase_move(request, request->rq_next_phase); + RETURN(1); + } + + LASSERT(rc == -ETIMEDOUT); DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout " "rvcng=%d unlnk=%d", request->rq_receiving_reply, request->rq_must_unlink); } + RETURN(0); } /* caller must hold imp->imp_lock */ @@ -1866,7 +1972,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n", old_xid, req->rq_xid); } - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -1879,7 +1985,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req) spin_lock(&req->rq_lock); req->rq_restart = 1; req->rq_timedout = 0; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); } @@ -1888,15 +1994,19 @@ static int expired_request(void *data) struct ptlrpc_request *req = data; ENTRY; - /* some failure can suspend regular timeouts */ + /* + * Some failure can suspend regular timeouts. + */ if (ptlrpc_check_suspend()) RETURN(1); - /* deadline may have changed with an early reply */ + /* + * Deadline may have changed with an early reply. + */ if (req->rq_deadline > cfs_time_current_sec()) RETURN(1); - RETURN(ptlrpc_expire_one_request(req)); + RETURN(ptlrpc_expire_one_request(req, 0)); } static void interrupted_request(void *data) @@ -1986,7 +2096,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) lustre_msg_get_opc(req->rq_reqmsg)); /* Mark phase here for a little debug help */ - req->rq_phase = RQ_PHASE_RPC; + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); spin_lock(&imp->imp_lock); req->rq_import_generation = imp->imp_generation; @@ -2111,8 +2221,7 @@ after_send: /* If the reply was received normally, this just grabs the spinlock * (ensuring the reply callback has returned), sees that * req->rq_receiving_reply is clear and returns. */ - ptlrpc_unregister_reply (req); - + ptlrpc_unregister_reply(req, 0); if (req->rq_err) { DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", @@ -2141,7 +2250,7 @@ after_send: GOTO(out, rc = -ETIMEDOUT); } - if (!req->rq_replied) { + if (!ptlrpc_client_replied(req)) { /* How can this be? -eeb */ DEBUG_REQ(D_ERROR, req, "!rq_replied: "); LBUG(); @@ -2182,7 +2291,7 @@ after_send: } LASSERT(!req->rq_receiving_reply); - req->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); atomic_dec(&imp->imp_inflight); cfs_waitq_signal(&imp->imp_recovery_waitq); @@ -2204,7 +2313,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, ENTRY; atomic_dec(&imp->imp_replay_inflight); - if (!req->rq_replied) { + if (!ptlrpc_client_replied(req)) { CERROR("request replay timed out, restarting recovery\n"); GOTO(out, rc = -ETIMEDOUT); } @@ -2224,7 +2333,7 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, if (req->rq_replay_cb) req->rq_replay_cb(req); - if (req->rq_replied && + if (ptlrpc_client_replied(req) && lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) { DEBUG_REQ(D_ERROR, req, "status %d, old was %d", lustre_msg_get_status(req->rq_repmsg), @@ -2274,6 +2383,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) aa->praa_old_state = req->rq_send_state; req->rq_send_state = LUSTRE_IMP_REPLAY; req->rq_phase = RQ_PHASE_NEW; + req->rq_next_phase = RQ_PHASE_UNDEFINED; if (req->rq_repmsg) aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg); req->rq_status = 0; @@ -2315,7 +2425,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; req->rq_status = -EINTR; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } spin_unlock (&req->rq_lock); } @@ -2330,7 +2440,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp) if (req->rq_import_generation < imp->imp_generation) { req->rq_err = 1; req->rq_status = -EINTR; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } spin_unlock (&req->rq_lock); } @@ -2363,7 +2473,7 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set) req->rq_err = 1; req->rq_status = -EINTR; - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock (&req->rq_lock); } } diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 5133fb6e3d6fd1ab0e85324d3553651255e1b139..a1cf611befce6f9ccca41560362e18d41bdacf54 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -77,7 +77,7 @@ void request_out_callback(lnet_event_t *ev) req->rq_net_err = 1; spin_unlock(&req->rq_lock); - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } ptlrpc_req_finished(req); @@ -161,7 +161,7 @@ void reply_in_callback(lnet_event_t *ev) out_wake: /* NB don't unlock till after wakeup; req can disappear under us * since we don't have our own ref */ - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); spin_unlock(&req->rq_lock); EXIT; } @@ -201,7 +201,7 @@ void client_bulk_callback (lnet_event_t *ev) /* NB don't unlock till after wakeup; desc can disappear under us * otherwise */ - ptlrpc_wake_client_req(desc->bd_req); + ptlrpc_client_wake_req(desc->bd_req); spin_unlock(&desc->bd_lock); EXIT; diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index b575270f6a467b1c17da41c2946f06f089fdea2f..18fb0cfb13c5878243bd25bc9371618f3c2d5331 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -207,6 +207,47 @@ void ptlrpc_deactivate_import(struct obd_import *imp) ptlrpc_deactivate_and_unlock_import(imp); } +static unsigned int +ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now) +{ + long dl; + + if (!(((req->rq_phase & (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) && + !req->rq_waiting) || + (req->rq_phase == RQ_PHASE_BULK) || + (req->rq_phase == RQ_PHASE_NEW))) + return 0; + + if (req->rq_timedout) + return 0; + + if (req->rq_phase == RQ_PHASE_NEW) + dl = req->rq_sent; + else + dl = req->rq_deadline; + + if (dl <= now) + return 0; + + return dl - now; +} + +static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp) +{ + time_t now = cfs_time_current_sec(); + struct list_head *tmp, *n; + struct ptlrpc_request *req; + unsigned int timeout = 0; + + spin_lock(&imp->imp_lock); + list_for_each_safe(tmp, n, &imp->imp_sending_list) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + timeout = max(ptlrpc_inflight_deadline(req, now), timeout); + } + spin_unlock(&imp->imp_lock); + return timeout; +} + /* * This function will invalidate the import, if necessary, then block * for all the RPC completions, and finally notify the obd to @@ -218,6 +259,7 @@ void ptlrpc_invalidate_import(struct obd_import *imp) struct list_head *tmp, *n; struct ptlrpc_request *req; struct l_wait_info lwi; + unsigned int timeout; int rc; atomic_inc(&imp->imp_inval_count); @@ -234,32 +276,78 @@ void ptlrpc_invalidate_import(struct obd_import *imp) LASSERT(imp->imp_invalid); - /* wait for all requests to error out and call completion callbacks. - Cap it at obd_timeout -- these should all have been locally - cancelled by ptlrpc_abort_inflight. */ - lwi = LWI_TIMEOUT_INTERVAL( - cfs_timeout_cap(cfs_time_seconds(obd_timeout)), - cfs_time_seconds(1), NULL, NULL); - rc = l_wait_event(imp->imp_recovery_waitq, - (atomic_read(&imp->imp_inflight) == 0), &lwi); + /* Wait forever until inflight == 0. We really can't do it another + * way because in some cases we need to wait for very long reply + * unlink. We can't do anything before that because there is really + * no guarantee that some rdma transfer is not in progress right now. */ + do { + /* Calculate max timeout for waiting on rpcs to error + * out. Use obd_timeout if calculated value is smaller + * than it. */ + timeout = ptlrpc_inflight_timeout(imp); + timeout += timeout / 3; + + if (timeout == 0) + timeout = obd_timeout; + + CDEBUG(D_RPCTRACE, "Sleeping %d sec for inflight to error out\n", + timeout); + + /* Wait for all requests to error out and call completion + * callbacks. Cap it at obd_timeout -- these should all + * have been locally cancelled by ptlrpc_abort_inflight. */ + lwi = LWI_TIMEOUT_INTERVAL( + cfs_timeout_cap(cfs_time_seconds(timeout)), + cfs_time_seconds(1), NULL, NULL); + rc = l_wait_event(imp->imp_recovery_waitq, + (atomic_read(&imp->imp_inflight) == 0), &lwi); + if (rc) { + const char *cli_tgt = obd2cli_tgt(imp->imp_obd); - if (rc) { - CERROR("%s: rc = %d waiting for callback (%d != 0)\n", - obd2cli_tgt(imp->imp_obd), rc, - atomic_read(&imp->imp_inflight)); - spin_lock(&imp->imp_lock); - list_for_each_safe(tmp, n, &imp->imp_sending_list) { - req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "still on sending list"); - } - list_for_each_safe(tmp, n, &imp->imp_delayed_list) { - req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "still on delayed list"); - } - spin_unlock(&imp->imp_lock); - LASSERT(atomic_read(&imp->imp_inflight) == 0); - } + CERROR("%s: rc = %d waiting for callback (%d != 0)\n", + cli_tgt, rc, atomic_read(&imp->imp_inflight)); + spin_lock(&imp->imp_lock); + list_for_each_safe(tmp, n, &imp->imp_sending_list) { + req = list_entry(tmp, struct ptlrpc_request, + rq_list); + DEBUG_REQ(D_ERROR, req, "still on sending list"); + } + list_for_each_safe(tmp, n, &imp->imp_delayed_list) { + req = list_entry(tmp, struct ptlrpc_request, + rq_list); + DEBUG_REQ(D_ERROR, req, "still on delayed list"); + } + + if (atomic_read(&imp->imp_unregistering) == 0) { + /* We know that only "unregistering" rpcs may + * still survive in sending or delaying lists + * (They are waiting for long reply unlink in + * sluggish nets). Let's check this. If there + * is no unregistering and inflight != 0 this + * is bug. */ + LASSERT(atomic_read(&imp->imp_inflight) == 0); + + /* Let's save one loop as soon as inflight have + * dropped to zero. No new inflights possible at + * this point. */ + rc = 0; + } else { + CERROR("%s: RPCs in \"%s\" phase found (%d). " + "Network is sluggish? Waiting them " + "to error out.\n", cli_tgt, + ptlrpc_phase2str(RQ_PHASE_UNREGISTERING), + atomic_read(&imp->imp_unregistering)); + } + spin_unlock(&imp->imp_lock); + } + } while (rc != 0); + + /* + * Let's additionally check that no new rpcs added to import in + * "invalidate" state. + */ + LASSERT(atomic_read(&imp->imp_inflight) == 0); out: obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE); sptlrpc_import_flush_all_ctx(imp); @@ -574,6 +662,7 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid) #endif lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER); + request->rq_no_resend = request->rq_no_delay = 1; request->rq_send_state = LUSTRE_IMP_CONNECTING; /* Allow a slightly larger reply for future growth compatibility */ req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER, diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 476f6662b2f7b57655520c7c352d47f55ce125e9..fbc144d5639f0ba0aec86f45a18c9e21d3ee578d 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -485,7 +485,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) /* If this is a re-transmit, we're required to have disengaged * cleanly from the previous attempt */ - LASSERT (!request->rq_receiving_reply); + LASSERT(!request->rq_receiving_reply); if (request->rq_import->imp_obd && request->rq_import->imp_obd->obd_fail) { @@ -633,7 +633,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) rc2 = LNetMEUnlink(reply_me_h); LASSERT (rc2 == 0); /* UNLINKED callback called synchronously */ - LASSERT (!request->rq_receiving_reply); + LASSERT(!request->rq_receiving_reply); cleanup_bulk: if (request->rq_bulk != NULL) diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 42790f125b17e64a1e6322ac0f422d400ead8db0..b03990c32e8461ae9f6a440846d53363e8097cb8 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2203,7 +2203,7 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask, (char *)req->rq_export->exp_connection->c_remote_uuid.uuid : "<?>", req->rq_request_portal, req->rq_reply_portal, req->rq_reqlen, req->rq_replen, - req->rq_early_count, req->rq_timeout, req->rq_deadline, + req->rq_early_count, !!req->rq_timeout, req->rq_deadline, atomic_read(&req->rq_refcount), DEBUG_REQ_FLAGS(req), req->rq_reqmsg && req_ptlrpc_body_swabbed(req) ? lustre_msg_get_flags(req->rq_reqmsg) : -1, diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index 0da3caa5ab33be5b5d62cdb73a84e917d7fef123..a1042d1716fac307b3f25d5d6bd931bdd7f74f80 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -615,7 +615,7 @@ static int pinger_check_rpcs(void *arg) req->rq_no_resend = 1; ptlrpc_request_set_replen(req); req->rq_send_state = LUSTRE_IMP_FULL; - req->rq_phase = RQ_PHASE_RPC; + ptlrpc_rqphase_move(req, RQ_PHASE_RPC); req->rq_import_generation = generation; ptlrpc_set_add_req(set, req); } else { @@ -661,17 +661,17 @@ do_check_set: if (req->rq_phase == RQ_PHASE_COMPLETE) continue; - req->rq_phase = RQ_PHASE_COMPLETE; + ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); atomic_dec(&req->rq_import->imp_inflight); set->set_remaining--; /* If it was disconnected, don't sweat it. */ if (list_empty(&req->rq_import->imp_pinger_chain)) { - ptlrpc_unregister_reply(req); + ptlrpc_unregister_reply(req, 0); continue; } CDEBUG(D_RPCTRACE, "pinger initiate expire_one_request\n"); - ptlrpc_expire_one_request(req); + ptlrpc_expire_one_request(req, 0); } mutex_up(&pinger_sem); diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index bdea8bd6724fd7d5d49e253743a86301cdfb204a..03a22320eca3a1bda44b320a228f040274159441 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -81,7 +81,7 @@ void ptlrpc_lprocfs_do_request_stat (struct ptlrpc_request *req, /* recovd_thread.c */ -int ptlrpc_expire_one_request(struct ptlrpc_request *req); +int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink); /* pers.c */ void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index 934e1e7c648871a4c2758ebb434c82d3d5941035..175fa50b0424f165d4e94713749c720aef560f41 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -183,10 +183,11 @@ static int ptlrpcd(void *arg) */ while (1) { struct l_wait_info lwi; - cfs_duration_t timeout; + int timeout; - timeout = cfs_time_seconds(ptlrpc_set_next_timeout(pc->pc_set)); - lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, pc->pc_set); + timeout = ptlrpc_set_next_timeout(pc->pc_set); + lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), + ptlrpc_expired_set, pc->pc_set); lu_context_enter(&pc->pc_env.le_ctx); l_wait_event(pc->pc_set->set_waitq, diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c index 03d8270d65c41dedc17a101bb7272f64cd16a9fb..3f2458d8bc919fef638746de3ad26e9977f0a245 100644 --- a/lustre/ptlrpc/recover.c +++ b/lustre/ptlrpc/recover.c @@ -186,7 +186,7 @@ void ptlrpc_wake_delayed(struct obd_import *imp) req = list_entry(tmp, struct ptlrpc_request, rq_list); DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set); - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } spin_unlock(&imp->imp_lock); } diff --git a/lustre/ptlrpc/sec.c b/lustre/ptlrpc/sec.c index 977ac89d2d181b983d692e3d060d0b36a40f09ac..56de3eec4750722edefa844a35e0dd0c3bfdfe7f 100644 --- a/lustre/ptlrpc/sec.c +++ b/lustre/ptlrpc/sec.c @@ -289,7 +289,7 @@ void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx) spin_lock(&ctx->cc_lock); list_for_each_entry_safe(req, next, &ctx->cc_req_list, rq_ctx_chain) { list_del_init(&req->rq_ctx_chain); - ptlrpc_wake_client_req(req); + ptlrpc_client_wake_req(req); } spin_unlock(&ctx->cc_lock); } @@ -518,7 +518,7 @@ int ctx_refresh_timeout(void *data) /* conn_cnt is needed in expire_one_request */ lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt); - rc = ptlrpc_expire_one_request(req); + rc = ptlrpc_expire_one_request(req, 1); /* if we started recovery, we should mark this ctx dead; otherwise * in case of lgssd died nobody would retire this ctx, following * connecting will still find the same ctx thus cause deadlock. @@ -682,7 +682,7 @@ again: * - timedout, and we don't want recover from the failure; * - timedout, and waked up upon recovery finished; * - someone else mark this ctx dead by force; - * - someone invalidate the req and call wake_client_req(), + * - someone invalidate the req and call ptlrpc_client_wake_req(), * e.g. ptlrpc_abort_inflight(); */ if (!cli_ctx_is_refreshed(ctx)) { diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 37d7bf9d12ca5967e4285277e40e7296f0c2e3da..76de4bcde717b771ffdd4cdcf02b4f53f884b622 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -1136,7 +1136,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, goto put_rpc_export; } - request->rq_phase = RQ_PHASE_INTERPRET; + ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc " "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), @@ -1152,7 +1152,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, rc = svc->srv_handler(request); - request->rq_phase = RQ_PHASE_COMPLETE; + ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE); CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc " "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index d6ddc298e4099f12aba7ba2d81d9738ffd6c4691..05c8e6ed9d867deb5146246fd9a8d841986d7776 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -41,7 +41,7 @@ remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0 remote_ost_nodsh && skip "remote OST with nodsh" && exit 0 # -[ "$SLOW" = "no" ] && EXCEPT_SLOW="0 1 2 3 6 7 15 18 24b 25 30 31 32 33 34a " +[ "$SLOW" = "no" ] && EXCEPT_SLOW="0 1 2 3 6 7 15 18 24b 25 30 31 32 33 34a 45" assert_DIR @@ -1417,5 +1417,26 @@ umount_client $MOUNT cleanup_nocli cleanup_gss +test_45() { #17310 + setup + check_mount || return 2 + stop_mds + df -h $MOUNT & + log "sleep 60 sec" + sleep 60 +#define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f + do_facet client "lctl set_param fail_loc=0x50f" + log "sleep 10 sec" + sleep 10 + manual_umount_client --force || return 3 + do_facet client "lctl set_param fail_loc=0x0" + start_mds + mount_client $MOUNT || return 4 + cleanup + return 0 +} +run_test 45 "long unlink handling in ptlrpcd" + + equals_msg `basename $0`: test complete [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true