From bce5aa48c1084073a8ccf9b7b8b96935a01a0a4c Mon Sep 17 00:00:00 2001
From: johann <johann>
Date: Mon, 28 Jan 2008 10:46:18 +0000
Subject: [PATCH] Branch b1_6 b=13843 i=adilger i=shadow

A lot of unlink operations with concurrent I/O can lead to a
deadlock causing evictions. To address the problem, the number of
oustanding OST_DESTROY requests is now throttled to
max_rpcs_in_flight per OSC.
---
 lustre/ChangeLog         |  9 +++++++++
 lustre/include/obd.h     |  4 ++++
 lustre/ldlm/ldlm_lib.c   |  2 ++
 lustre/osc/osc_request.c | 42 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 57 insertions(+)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 1189c6a9d1..611ac96cf8 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -277,6 +277,15 @@ Description: 35% write performance drop with ldiskfs2 when quotas are on
 Details    : Enable ext3 journalled quota by default to improve performance
 	     when quotas are turned on.
 
+Severity   : normal
+Bugzilla   : 13843
+Description: Client eviction while running blogbench
+Details    : A lot of unlink operations with concurrent I/O can lead to a
+	     deadlock causing evictions. To address the problem, the number of
+	     oustanding OST_DESTROY requests is now throttled to
+	     max_rpcs_in_flight per OSC and LDLM_FL_DISCARD_DATA blocking
+	     callbacks are processed in priority.
+
 --------------------------------------------------------------------------------
 
 2007-12-07         Cluster File Systems, Inc. <info@clusterfs.com>
diff --git a/lustre/include/obd.h b/lustre/include/obd.h
index 660137357b..a846bdb923 100644
--- a/lustre/include/obd.h
+++ b/lustre/include/obd.h
@@ -390,6 +390,10 @@ struct client_obd {
         struct obd_histogram     cl_read_offset_hist;
         struct obd_histogram     cl_write_offset_hist;
 
+        /* number of in flight destroy rpcs is limited to max_rpcs_in_flight */
+        atomic_t                 cl_destroy_in_flight;
+        cfs_waitq_t              cl_destroy_waitq;
+
         struct mdc_rpc_lock     *cl_rpc_lock;
         struct mdc_rpc_lock     *cl_setattr_lock;
         struct mdc_rpc_lock     *cl_close_lock;
diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c
index fc318262c9..4a48b01c56 100644
--- a/lustre/ldlm/ldlm_lib.c
+++ b/lustre/ldlm/ldlm_lib.c
@@ -265,6 +265,8 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
         spin_lock_init(&cli->cl_read_offset_hist.oh_lock);
         spin_lock_init(&cli->cl_write_offset_hist.oh_lock);
+        cfs_waitq_init(&cli->cl_destroy_waitq);
+        atomic_set(&cli->cl_destroy_in_flight, 0);
 #ifdef ENABLE_CHECKSUM
         cli->cl_checksum = 1;
 #endif
diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c
index 2d428bc3b9..823eaca682 100644
--- a/lustre/osc/osc_request.c
+++ b/lustre/osc/osc_request.c
@@ -570,6 +570,34 @@ static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
         RETURN(count);
 }
 
+static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
+                                 int rc)
+{
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+
+        atomic_dec(&cli->cl_destroy_in_flight);
+        cfs_waitq_signal(&cli->cl_destroy_waitq);
+        return 0;
+}
+
+static int osc_can_send_destroy(struct client_obd *cli)
+{
+        if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
+            cli->cl_max_rpcs_in_flight) {
+                /* The destroy request can be sent */
+                return 1;
+        }
+        if (atomic_dec_return(&cli->cl_destroy_in_flight) <
+            cli->cl_max_rpcs_in_flight) {
+                /*
+                 * The counter has been modified between the two atomic
+                 * operations.
+                 */
+                cfs_waitq_signal(&cli->cl_destroy_waitq);
+        }
+        return 0;
+}
+
 /* Destroy requests can be async always on the client, and we don't even really
  * care about the return code since the client cannot do anything at all about
  * a destroy failure.
@@ -589,6 +617,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         struct ost_body *body;
         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
         int count, bufcount = 2;
+        struct client_obd *cli = &exp->exp_obd->u.cli;
         ENTRY;
 
         if (!oa) {
@@ -606,6 +635,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
                 RETURN(-ENOMEM);
 
         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
+        req->rq_interpret_reply = osc_destroy_interpret;
         ptlrpc_at_set_req_timeout(req);
 
         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
@@ -618,6 +648,18 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         memcpy(&body->oa, oa, sizeof(*oa));
         ptlrpc_req_set_repsize(req, 2, size);
 
+        if (!osc_can_send_destroy(cli)) {
+                struct l_wait_info lwi = { 0 };
+
+                /*
+                 * Wait until the number of on-going destroy RPCs drops
+                 * under max_rpc_in_flight
+                 */
+                l_wait_event_exclusive(cli->cl_destroy_waitq,
+                                       osc_can_send_destroy(cli), &lwi);
+        }
+
+        /* Do not wait for response */
         ptlrpcd_add_req(req);
         RETURN(0);
 }
-- 
GitLab