From b3b05d5fac68cf2ddee9f7eb5239793e1df226da Mon Sep 17 00:00:00 2001 From: ericm <ericm> Date: Tue, 14 Oct 2008 18:59:51 +0000 Subject: [PATCH] branch: HEAD do not repost buffer before all requests are finished. b=17228 r=wangdi r=nathan --- lustre/ptlrpc/service.c | 100 ++++++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 41 deletions(-) diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 20fa81f16c..1f08a77c14 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -400,41 +400,34 @@ failed: return NULL; } -static void ptlrpc_server_req_decref(struct ptlrpc_request *req) +/** + * to actually free the request, must be called without holding svc_lock. + * note it's caller's responsibility to unlink req->rq_list. + */ +static void ptlrpc_server_free_request(struct ptlrpc_request *req) { - struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; + LASSERT(atomic_read(&req->rq_refcount) == 0); + LASSERT(list_empty(&req->rq_timed_list)); - if (!atomic_dec_and_test(&req->rq_refcount)) - return; + /* DEBUG_REQ() assumes the reply state of a request with a valid + * ref will not be destroyed until that reference is dropped. */ + ptlrpc_req_drop_rs(req); sptlrpc_svc_ctx_decref(req); - LASSERT(list_empty(&req->rq_timed_list)); - if (req != &rqbd->rqbd_req) { + if (req != &req->rq_rqbd->rqbd_req) { /* NB request buffers use an embedded * req if the incoming req unlinked the * MD; this isn't one of them! */ OBD_FREE(req, sizeof(*req)); - } else { - struct ptlrpc_service *svc = rqbd->rqbd_service; - /* schedule request buffer for re-use. - * NB I can only do this after I've disposed of their - * reqs; particularly the embedded req */ - spin_lock(&svc->srv_lock); - list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); - spin_unlock(&svc->srv_lock); } } -static void __ptlrpc_server_free_request(struct ptlrpc_request *req) -{ - list_del(&req->rq_list); - ptlrpc_req_drop_rs(req); - ptlrpc_server_req_decref(req); -} - -static void -ptlrpc_server_free_request(struct ptlrpc_request *req) +/** + * drop a reference count of the request. if it reaches 0, we either + * put it into history list, or free it immediately. + */ +static void ptlrpc_server_drop_request(struct ptlrpc_request *req) { struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; @@ -442,12 +435,8 @@ ptlrpc_server_free_request(struct ptlrpc_request *req) struct list_head *tmp; struct list_head *nxt; - if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ - DEBUG_REQ(D_INFO, req, "free req"); - spin_lock(&svc->srv_at_lock); - req->rq_sent_final = 1; - list_del_init(&req->rq_timed_list); - spin_unlock(&svc->srv_at_lock); + if (!atomic_dec_and_test(&req->rq_refcount)) + return; spin_lock(&svc->srv_lock); @@ -490,20 +479,49 @@ ptlrpc_server_free_request(struct ptlrpc_request *req) req = list_entry(rqbd->rqbd_reqs.next, struct ptlrpc_request, rq_list); - __ptlrpc_server_free_request(req); + list_del(&req->rq_list); + ptlrpc_server_free_request(req); } spin_lock(&svc->srv_lock); + /* + * now all reqs including the embedded req has been + * disposed, schedule request buffer for re-use. + */ + LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0); + list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); } + + spin_unlock(&svc->srv_lock); } else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) { - /* If we are low on memory, we are not interested in - history */ + /* If we are low on memory, we are not interested in history */ + list_del(&req->rq_list); list_del_init(&req->rq_history_list); - __ptlrpc_server_free_request(req); + spin_unlock(&svc->srv_lock); + + ptlrpc_server_free_request(req); + } else { + spin_unlock(&svc->srv_lock); } +} - spin_unlock(&svc->srv_lock); +/** + * to finish a request: stop sending more early replies, and release + * the request. should be called after we finished handling the request. + */ +static void ptlrpc_server_finish_request(struct ptlrpc_request *req) +{ + struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + + if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ + DEBUG_REQ(D_INFO, req, "free req"); + spin_lock(&svc->srv_at_lock); + req->rq_sent_final = 1; + list_del_init(&req->rq_timed_list); + spin_unlock(&svc->srv_at_lock); + + ptlrpc_server_drop_request(req); } /* This function makes sure dead exports are evicted in a timely manner. @@ -889,8 +907,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) at_get(&svc->srv_at_estimate), delay); } - /* ptlrpc_server_free_request may delete an entry out of the work - list */ + /* ptlrpc_server_finish_request may delete an entry out of + * the work list */ spin_lock(&svc->srv_at_lock); while (!list_empty(&work_list)) { rq = list_entry(work_list.next, struct ptlrpc_request, @@ -904,7 +922,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) if (ptlrpc_at_send_early_reply(rq, at_extra) == 0) ptlrpc_at_add_timed(rq); - ptlrpc_server_req_decref(rq); + ptlrpc_server_drop_request(rq); spin_lock(&svc->srv_at_lock); } spin_unlock(&svc->srv_at_lock); @@ -1028,7 +1046,7 @@ err_req: svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; spin_unlock(&svc->srv_lock); - ptlrpc_server_free_request(req); + ptlrpc_server_finish_request(req); RETURN(1); } @@ -1203,7 +1221,7 @@ put_conn: } out_req: - ptlrpc_server_free_request(request); + ptlrpc_server_finish_request(request); RETURN(1); } @@ -1775,7 +1793,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - ptlrpc_server_free_request(req); + ptlrpc_server_finish_request(req); } while (!list_empty(&service->srv_request_queue)) { struct ptlrpc_request *req = @@ -1787,7 +1805,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - ptlrpc_server_free_request(req); + ptlrpc_server_finish_request(req); } LASSERT(service->srv_n_queued_reqs == 0); LASSERT(service->srv_n_active_reqs == 0); -- GitLab