Skip to content
Snippets Groups Projects
Commit f0ea8ec6 authored by tianzy's avatar tianzy
Browse files

Branch HEAD

let dqacq_in_flight() hold lock for qunit
b=16890
i=johann
i=panda
parent 2ba6fcc3
No related branches found
No related tags found
No related merge requests found
...@@ -417,19 +417,6 @@ out: ...@@ -417,19 +417,6 @@ out:
return ret; return ret;
} }
/* caller must hold qunit_hash_lock */
static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
struct qunit_data *qdata)
{
unsigned int hashent = qunit_hashfn(qctxt, qdata);
struct lustre_qunit *qunit;
ENTRY;
LASSERT_SPIN_LOCKED(&qunit_hash_lock);
qunit = find_qunit(hashent, qctxt, qdata);
RETURN(qunit);
}
static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
struct qunit_data *qdata, int opc) struct qunit_data *qdata, int opc)
{ {
...@@ -468,6 +455,21 @@ static void qunit_put(struct lustre_qunit *qunit) ...@@ -468,6 +455,21 @@ static void qunit_put(struct lustre_qunit *qunit)
free_qunit(qunit); free_qunit(qunit);
} }
/* caller must hold qunit_hash_lock and release ref of qunit after using it */
static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
struct qunit_data *qdata)
{
unsigned int hashent = qunit_hashfn(qctxt, qdata);
struct lustre_qunit *qunit;
ENTRY;
LASSERT_SPIN_LOCKED(&qunit_hash_lock);
qunit = find_qunit(hashent, qctxt, qdata);
if (qunit)
qunit_get(qunit);
RETURN(qunit);
}
static void static void
insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
{ {
...@@ -638,6 +640,9 @@ out: ...@@ -638,6 +640,9 @@ out:
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
wake_up_all(&qunit->lq_waitq); wake_up_all(&qunit->lq_waitq);
/* this is for dqacq_in_flight() */
qunit_put(qunit);
/* this is for alloc_qunit() */
qunit_put(qunit); qunit_put(qunit);
if (rc < 0 && rc != -EDQUOT) if (rc < 0 && rc != -EDQUOT)
RETURN(err); RETURN(err);
...@@ -816,19 +821,16 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ...@@ -816,19 +821,16 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
spin_lock(&qunit_hash_lock); spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, qdata); qunit = dqacq_in_flight(qctxt, qdata);
if (qunit) { if (qunit) {
if (wait)
qunit_get(qunit);
spin_unlock(&qunit_hash_lock); spin_unlock(&qunit_hash_lock);
qunit_put(empty); qunit_put(empty);
goto wait_completion; goto wait_completion;
} }
qunit = empty; qunit = empty;
qunit_get(qunit);
insert_qunit_nolock(qctxt, qunit); insert_qunit_nolock(qctxt, qunit);
spin_unlock(&qunit_hash_lock); spin_unlock(&qunit_hash_lock);
LASSERT(qunit);
quota_search_lqs(qdata, NULL, qctxt, &lqs); quota_search_lqs(qdata, NULL, qctxt, &lqs);
if (lqs) { if (lqs) {
spin_lock(&lqs->lqs_lock); spin_lock(&lqs->lqs_lock);
...@@ -852,6 +854,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ...@@ -852,6 +854,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
QDATA_SET_CHANGE_QS(qdata); QDATA_SET_CHANGE_QS(qdata);
rc = qctxt->lqc_handler(obd, qdata, opc); rc = qctxt->lqc_handler(obd, qdata, opc);
rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc); rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
/* this is for qunit_get() */
qunit_put(qunit);
do_gettimeofday(&work_end); do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL); timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
if (opc == QUOTA_DQACQ) if (opc == QUOTA_DQACQ)
...@@ -879,6 +884,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ...@@ -879,6 +884,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN); QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
wake_up_all(&qunit->lq_waitq); wake_up_all(&qunit->lq_waitq);
/* this is for qunit_get() */
qunit_put(qunit);
/* this for alloc_qunit() */
qunit_put(qunit); qunit_put(qunit);
spin_lock(&qctxt->lqc_lock); spin_lock(&qctxt->lqc_lock);
if (wait && !qctxt->lqc_import) { if (wait && !qctxt->lqc_import) {
...@@ -911,6 +919,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ...@@ -911,6 +919,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
if (req == NULL) { if (req == NULL) {
CDEBUG(D_ERROR, "Can't alloc request\n"); CDEBUG(D_ERROR, "Can't alloc request\n");
dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc); dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
/* this is for qunit_get() */
qunit_put(qunit);
RETURN(-ENOMEM); RETURN(-ENOMEM);
} }
...@@ -921,12 +931,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, ...@@ -921,12 +931,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc); CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
ptlrpc_req_finished(req); ptlrpc_req_finished(req);
dqacq_completion(obd, qctxt, qdata, -EPROTO, opc); dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
/* this is for qunit_get() */
qunit_put(qunit);
RETURN(rc); RETURN(rc);
} }
if (wait && qunit)
qunit_get(qunit);
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req); aa = ptlrpc_req_async_args(req);
aa->aa_ctxt = qctxt; aa->aa_ctxt = qctxt;
...@@ -956,9 +965,9 @@ wait_completion: ...@@ -956,9 +965,9 @@ wait_completion:
spin_unlock(&qunit->lq_lock); spin_unlock(&qunit->lq_lock);
CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
qunit, rc); qunit, rc);
qunit_put(qunit);
} }
qunit_put(qunit);
do_gettimeofday(&work_end); do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL); timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
if (opc == QUOTA_DQACQ) if (opc == QUOTA_DQACQ)
...@@ -1037,11 +1046,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, ...@@ -1037,11 +1046,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
spin_lock(&qunit_hash_lock); spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, &qdata); qunit = dqacq_in_flight(qctxt, &qdata);
if (qunit)
/* grab reference on this qunit to handle races with
* dqacq_completion(). Otherwise, this qunit could be freed just
* after we release the qunit_hash_lock */
qunit_get(qunit);
spin_unlock(&qunit_hash_lock); spin_unlock(&qunit_hash_lock);
if (qunit) { if (qunit) {
...@@ -1058,6 +1062,7 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, ...@@ -1058,6 +1062,7 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id,
else else
rc = qunit->lq_rc; rc = qunit->lq_rc;
spin_unlock(&qunit->lq_lock); spin_unlock(&qunit->lq_lock);
/* this is for dqacq_in_flight() */
qunit_put(qunit); qunit_put(qunit);
do_gettimeofday(&work_end); do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL); timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment