diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 40603b3416b1d5cc170c75e5bf0d0395bc35d5fc..4122112ecd2fd001e4e6a3bf3aa9ca382471b859 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -707,8 +707,6 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, list_add_tail(&req->rq_set_chain, &set->set_requests); req->rq_set = set; set->set_remaining++; - - atomic_inc(&req->rq_import->imp_inflight); } /** @@ -979,13 +977,12 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) spin_unlock (&req->rq_lock); DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: " - "(%s != %s)", - lustre_msg_get_status(req->rq_reqmsg) , + "(%s != %s)", lustre_msg_get_status(req->rq_reqmsg), ptlrpc_import_state_name(req->rq_send_state), ptlrpc_import_state_name(imp->imp_state)); - LASSERT(list_empty (&req->rq_list)); - + LASSERT(list_empty(&req->rq_list)); list_add_tail(&req->rq_list, &imp->imp_delayed_list); + atomic_inc(&req->rq_import->imp_inflight); spin_unlock(&imp->imp_lock); RETURN(0); } @@ -997,9 +994,9 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) RETURN(rc); } - /* XXX this is the same as ptlrpc_queue_wait */ LASSERT(list_empty(&req->rq_list)); list_add_tail(&req->rq_list, &imp->imp_sending_list); + atomic_inc(&req->rq_import->imp_inflight); spin_unlock(&imp->imp_lock); lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid()); @@ -1272,9 +1269,14 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set) lustre_msg_get_opc(req->rq_reqmsg)); spin_lock(&imp->imp_lock); - if (!list_empty(&req->rq_list)) + /* Request already may be not on sending or delaying list. This + * may happen in the case of marking it errorneous for the case + * ptlrpc_import_delay_req(req, status) find it impossible to + * allow sending this rpc and returns *status != 0. */ + if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - atomic_dec(&imp->imp_inflight); + atomic_dec(&imp->imp_inflight); + } spin_unlock(&imp->imp_lock); set->set_remaining--; @@ -1898,7 +1900,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) LASSERT(req->rq_set == NULL); LASSERT(!req->rq_receiving_reply); - atomic_inc(&imp->imp_inflight); /* for distributed debugging */ lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid()); @@ -1918,8 +1919,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) restart: if (ptlrpc_import_delay_req(imp, req, &rc)) { list_del(&req->rq_list); - list_add_tail(&req->rq_list, &imp->imp_delayed_list); + atomic_inc(&imp->imp_inflight); spin_unlock(&imp->imp_lock); DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)", @@ -1939,6 +1940,7 @@ restart: spin_lock(&imp->imp_lock); list_del_init(&req->rq_list); + atomic_dec(&imp->imp_inflight); if (req->rq_err) { /* rq_status was set locally */ @@ -1957,7 +1959,6 @@ restart: } if (rc != 0) { - list_del_init(&req->rq_list); spin_unlock(&imp->imp_lock); req->rq_status = rc; // XXX this ok? GOTO(out, rc); @@ -1985,6 +1986,7 @@ restart: /* XXX this is the same as ptlrpc_set_wait */ LASSERT(list_empty(&req->rq_list)); list_add_tail(&req->rq_list, &imp->imp_sending_list); + atomic_inc(&imp->imp_inflight); spin_unlock(&imp->imp_lock); rc = ptl_send_rpc(req, 0); @@ -2017,15 +2019,16 @@ restart: libcfs_nid2str(imp->imp_connection->c_peer.nid), lustre_msg_get_opc(req->rq_reqmsg)); - spin_lock(&imp->imp_lock); - list_del_init(&req->rq_list); - spin_unlock(&imp->imp_lock); - /* If the reply was received normally, this just grabs the spinlock * (ensuring the reply callback has returned), sees that * req->rq_receiving_reply is clear and returns. */ ptlrpc_unregister_reply(req, 0); + spin_lock(&imp->imp_lock); + list_del_init(&req->rq_list); + atomic_dec(&imp->imp_inflight); + spin_unlock(&imp->imp_lock); + if (req->rq_err) { DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", rc, req->rq_status); @@ -2095,8 +2098,6 @@ restart: LASSERT(!req->rq_receiving_reply); ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET); - - atomic_dec(&imp->imp_inflight); cfs_waitq_signal(&imp->imp_recovery_waitq); RETURN(rc); } diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 9d2e13c7787f23c4cbe5dba889b476a855385da8..92d422ed35b3e8f17a6bdbe43294242cfe6ee4fa 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -325,7 +325,6 @@ void ptlrpc_invalidate_import(struct obd_import *imp) * this point. */ rc = 0; } else { - CERROR("%s: RPCs in \"%s\" phase found (%d). " "Network is sluggish? Waiting them " "to error out.\n", cli_tgt, diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index 1315d6c84d482e8d2042c226851630e7f05c635d..95feaf0c4dda5a2b5cdac930bfff51dd279d9790 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -479,6 +479,7 @@ static int pinger_check_rpcs(void *arg) struct ptlrpc_request *req; struct ptlrpc_request_set *set; struct list_head *iter; + struct obd_import *imp; struct pinger_data *pd = &pinger_args; int rc; @@ -586,17 +587,23 @@ do_check_set: if (req->rq_phase == RQ_PHASE_COMPLETE) continue; - ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); - atomic_dec(&req->rq_import->imp_inflight); - set->set_remaining--; - /* If it was disconnected, don't sweat it. */ - if (list_empty(&req->rq_import->imp_pinger_chain)) { - ptlrpc_unregister_reply(req, 0); - continue; - } + CDEBUG(D_RPCTRACE, "Pinger initiate expire request(%p)\n", + req); - CDEBUG(D_RPCTRACE, "pinger initiate expire_one_request\n"); + /* This will also unregister reply. */ ptlrpc_expire_one_request(req, 0); + + /* We're done with this req, let's finally move it to complete + * phase and take care of inflights. */ + ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE); + imp = req->rq_import; + spin_lock(&imp->imp_lock); + if (!list_empty(&req->rq_list)) { + list_del_init(&req->rq_list); + atomic_dec(&imp->imp_inflight); + } + spin_unlock(&imp->imp_lock); + set->set_remaining--; } mutex_up(&pinger_sem);