From 13cbeef246d31c26a53a56278489b6577ad74bf0 Mon Sep 17 00:00:00 2001
From: zam <zam>
Date: Wed, 3 Dec 2008 11:32:24 +0000
Subject: [PATCH] Branch HEAD b=17461 i=adilger i=alex.zhuravlev

Ptlrpc optimizations to minimize COS overhead:
. reply state object use own locking instead of ptlrpc service one
. per-cpu dedicated threads for reply handling
. resource hash table size increase
---
 lustre/include/lustre_dlm.h   |   2 +-
 lustre/include/lustre_net.h   |  17 +-
 lustre/ldlm/ldlm_lib.c        |   8 +-
 lustre/mdt/mdt_handler.c      |   2 +
 lustre/mdt/mdt_recovery.c     |   2 +
 lustre/mgs/mgs_handler.c      |   2 +
 lustre/obdecho/echo.c         |   2 +
 lustre/ptlrpc/events.c        |   2 +
 lustre/ptlrpc/pack_generic.c  |   2 +
 lustre/ptlrpc/ptlrpc_module.c |   7 +
 lustre/ptlrpc/service.c       | 484 +++++++++++++++++++++++++++++-----
 11 files changed, 452 insertions(+), 78 deletions(-)

diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h
index e11fcbb1cd..39326dced0 100644
--- a/lustre/include/lustre_dlm.h
+++ b/lustre/include/lustre_dlm.h
@@ -514,7 +514,7 @@ static inline int ns_connect_lru_resize(struct ldlm_namespace *ns)
  *
  */
 
-#define RES_HASH_BITS 10
+#define RES_HASH_BITS 12
 #define RES_HASH_SIZE (1UL << RES_HASH_BITS)
 #define RES_HASH_MASK (RES_HASH_SIZE - 1)
 
diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h
index 8fa08d2bb9..f3ebcd45b5 100644
--- a/lustre/include/lustre_net.h
+++ b/lustre/include/lustre_net.h
@@ -260,7 +260,9 @@ struct ptlrpc_reply_state {
 #if RS_DEBUG
         struct list_head       rs_debug_list;
 #endif
-        /* updates to following flag serialised by srv_request_lock */
+        /* A spinlock to protect the reply state flags */
+        spinlock_t             rs_lock;
+        /* Reply state flags */
         unsigned long          rs_difficult:1;     /* ACK/commit stuff */
         unsigned long          rs_no_ack:1;    /* no ACK, even for
                                                   difficult requests */
@@ -692,7 +694,7 @@ struct ptlrpc_service {
         int              srv_threads_max;       /* thread upper limit */
         int              srv_threads_started;   /* index of last started thread */
         int              srv_threads_running;   /* # running threads */
-        int              srv_n_difficult_replies; /* # 'difficult' replies */
+        atomic_t         srv_n_difficult_replies; /* # 'difficult' replies */
         int              srv_n_active_reqs;     /* # reqs being served */
         int              srv_n_hpreq;           /* # HPreqs being served */
         cfs_duration_t   srv_rqbd_timeout;      /* timeout before re-posting reqs, in tick */
@@ -732,8 +734,9 @@ struct ptlrpc_service {
 
         atomic_t          srv_outstanding_replies;
         struct list_head  srv_active_replies;   /* all the active replies */
+#ifndef __KERNEL__
         struct list_head  srv_reply_queue;      /* replies waiting for service */
-
+#endif
         cfs_waitq_t       srv_waitq; /* all threads sleep on this. This
                                       * wait-queue is signalled when new
                                       * incoming request arrives and when
@@ -1006,6 +1009,7 @@ struct ptlrpc_service_conf {
 void ptlrpc_save_lock (struct ptlrpc_request *req,
                        struct lustre_handle *lock, int mode, int no_ack);
 void ptlrpc_commit_replies (struct obd_device *obd);
+void ptlrpc_dispatch_difficult_reply (struct ptlrpc_reply_state *rs);
 void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                             svc_handler_t h, char *name,
@@ -1033,6 +1037,13 @@ void ptlrpc_daemonize(char *name);
 int ptlrpc_service_health_check(struct ptlrpc_service *);
 void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
 
+#ifdef __KERNEL__
+int ptlrpc_hr_init(void);
+void ptlrpc_hr_fini(void);
+#else
+# define ptlrpc_hr_init() (0)
+# define ptlrpc_hr_fini() do {} while(0)
+#endif
 
 struct ptlrpc_svc_data {
         char *name;
diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c
index 7ac4c939c4..0976241023 100644
--- a/lustre/ldlm/ldlm_lib.c
+++ b/lustre/ldlm/ldlm_lib.c
@@ -2150,7 +2150,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
         spin_lock(&svc->srv_lock);
 
-        svc->srv_n_difficult_replies++;
+        atomic_inc(&svc->srv_n_difficult_replies);
 
         if (netrc != 0) {
                 /* error sending: reply is off the net.  Also we need +1
@@ -2163,18 +2163,18 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
                 atomic_inc (&svc->srv_outstanding_replies);
         }
 
+        spin_lock(&rs->rs_lock);
         if (rs->rs_transno <= obd->obd_last_committed ||
             (!rs->rs_on_net && !rs->rs_no_ack) ||
              list_empty(&rs->rs_exp_list) ||     /* completed already */
              list_empty(&rs->rs_obd_list)) {
                 CDEBUG(D_HA, "Schedule reply immediately\n");
-                list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
-                cfs_waitq_signal (&svc->srv_waitq);
+                ptlrpc_dispatch_difficult_reply(rs);
         } else {
                 list_add (&rs->rs_list, &svc->srv_active_replies);
                 rs->rs_scheduled = 0;           /* allow notifier to schedule */
         }
-
+        spin_unlock(&rs->rs_lock);
         spin_unlock(&svc->srv_lock);
         EXIT;
 }
diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c
index 0aaea1bd7e..ea8e45a683 100644
--- a/lustre/mdt/mdt_handler.c
+++ b/lustre/mdt/mdt_handler.c
@@ -4936,7 +4936,9 @@ static int mdt_obd_disconnect(struct obd_export *exp)
 
                 spin_lock(&svc->srv_lock);
                 list_del_init(&rs->rs_exp_list);
+                spin_lock(&rs->rs_lock);
                 ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&rs->rs_lock);
                 spin_unlock(&svc->srv_lock);
         }
         spin_unlock(&exp->exp_lock);
diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c
index 81c9dfa48f..b5e263eb51 100644
--- a/lustre/mdt/mdt_recovery.c
+++ b/lustre/mdt/mdt_recovery.c
@@ -1099,7 +1099,9 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req)
                 oldrep->rs_nlocks = 0;
 
                 DEBUG_REQ(D_HA, req, "stole locks for");
+                spin_lock(&oldrep->rs_lock);
                 ptlrpc_schedule_difficult_reply (oldrep);
+                spin_unlock(&oldrep->rs_lock);
 
                 spin_unlock (&svc->srv_lock);
                 break;
diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c
index d9b827214e..581f1e1d45 100644
--- a/lustre/mgs/mgs_handler.c
+++ b/lustre/mgs/mgs_handler.c
@@ -146,7 +146,9 @@ static int mgs_disconnect(struct obd_export *exp)
 
                 spin_lock(&svc->srv_lock);
                 list_del_init(&rs->rs_exp_list);
+                spin_lock(&rs->rs_lock);
                 ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&rs->rs_lock);
                 spin_unlock(&svc->srv_lock);
         }
         spin_unlock(&exp->exp_lock);
diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c
index ebc3bba336..d69cf59aa9 100644
--- a/lustre/obdecho/echo.c
+++ b/lustre/obdecho/echo.c
@@ -90,7 +90,9 @@ static int echo_disconnect(struct obd_export *exp)
 
                 spin_lock(&svc->srv_lock);
                 list_del_init(&rs->rs_exp_list);
+                spin_lock(&rs->rs_lock);
                 ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&rs->rs_lock);
                 spin_unlock(&svc->srv_lock);
         }
         spin_unlock(&exp->exp_lock);
diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c
index 3747f0788c..5e28f93be5 100644
--- a/lustre/ptlrpc/events.c
+++ b/lustre/ptlrpc/events.c
@@ -342,10 +342,12 @@ void reply_out_callback(lnet_event_t *ev)
                 /* Last network callback.  The net's ref on 'rs' stays put
                  * until ptlrpc_server_handle_reply() is done with it */
                 spin_lock(&svc->srv_lock);
+                spin_lock(&rs->rs_lock);
                 rs->rs_on_net = 0;
                 if (!rs->rs_no_ack ||
                     rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
                         ptlrpc_schedule_difficult_reply (rs);
+                spin_unlock(&rs->rs_lock);
                 spin_unlock(&svc->srv_lock);
         }
 
diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c
index 1cd90f0187..1f41109545 100644
--- a/lustre/ptlrpc/pack_generic.c
+++ b/lustre/ptlrpc/pack_generic.c
@@ -320,6 +320,8 @@ int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
         rs->rs_service = req->rq_rqbd->rqbd_service;
         CFS_INIT_LIST_HEAD(&rs->rs_exp_list);
         CFS_INIT_LIST_HEAD(&rs->rs_obd_list);
+        CFS_INIT_LIST_HEAD(&rs->rs_list);
+        spin_lock_init(&rs->rs_lock);
 
         req->rq_replen = msg_len;
         req->rq_reply_state = rs;
diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c
index 363f39936a..e4f0a0ed1f 100644
--- a/lustre/ptlrpc/ptlrpc_module.c
+++ b/lustre/ptlrpc/ptlrpc_module.c
@@ -71,6 +71,11 @@ __init int ptlrpc_init(void)
         rc = req_layout_init();
         if (rc)
                 RETURN(rc);
+
+        rc = ptlrpc_hr_init();
+        if (rc)
+                RETURN(rc);
+
         cleanup_phase = 1;
 
         rc = ptlrpc_init_portals();
@@ -119,6 +124,7 @@ cleanup:
         case 2:
                 ptlrpc_exit_portals();
         case 1:
+                ptlrpc_hr_fini();
                 req_layout_fini();
         default: ;
         }
@@ -134,6 +140,7 @@ static void __exit ptlrpc_exit(void)
         ldlm_exit();
         ptlrpc_stop_pinger();
         ptlrpc_exit_portals();
+        ptlrpc_hr_fini();
         ptlrpc_connection_fini();
 }
 
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
index 2422ac5dca..07e3c80ec8 100644
--- a/lustre/ptlrpc/service.c
+++ b/lustre/ptlrpc/service.c
@@ -184,13 +184,162 @@ ptlrpc_save_lock (struct ptlrpc_request *req,
         rs->rs_no_ack = !!no_ack;
 }
 
+#ifdef __KERNEL__
+
+#define HRT_RUNNING 0
+#define HRT_STOPPING 1
+
+struct ptlrpc_hr_thread {
+        spinlock_t        hrt_lock;
+        unsigned long     hrt_flags;
+        cfs_waitq_t       hrt_wait;
+        struct list_head  hrt_queue;
+        struct completion hrt_completion;
+};
+
+struct ptlrpc_hr_service {
+        int                     hr_index;
+        int                     hr_n_threads;
+        int                     hr_size;
+        struct ptlrpc_hr_thread hr_threads[0];
+};
+
+struct rs_batch {
+        struct list_head        rsb_replies;
+        struct ptlrpc_service  *rsb_svc;
+        unsigned int            rsb_n_replies;
+};
+
+/**
+ *  A pointer to per-node reply handling service.
+ */
+static struct ptlrpc_hr_service *ptlrpc_hr = NULL;
+
+/**
+ * maximum mumber of replies scheduled in one batch
+ */
+#define MAX_SCHEDULED 256
+
+/**
+ * Initialize a reply batch.
+ *
+ * \param b batch
+ */
+static void rs_batch_init(struct rs_batch *b)
+{
+        memset(b, 0, sizeof *b);
+        CFS_INIT_LIST_HEAD(&b->rsb_replies);
+}
+
+/**
+ * Dispatch all replies accumulated in the batch to one from
+ * dedicated reply handing threads.
+ *
+ * \param b batch
+ */
+static void rs_batch_dispatch(struct rs_batch *b)
+{
+        if (b->rsb_n_replies != 0) {
+                struct ptlrpc_hr_service *hr = ptlrpc_hr;
+                int idx;
+
+                idx = hr->hr_index++;
+                if (hr->hr_index >= hr->hr_n_threads)
+                        hr->hr_index = 0;
+
+                spin_lock(&hr->hr_threads[idx].hrt_lock);
+                list_splice_init(&b->rsb_replies,
+                                 &hr->hr_threads[idx].hrt_queue);
+                spin_unlock(&hr->hr_threads[idx].hrt_lock);
+                cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
+                b->rsb_n_replies = 0;
+        }
+}
+
+/**
+ * Add a reply to a batch.
+ * Add one reply object to a batch, schedule batched replies if overload.
+ *
+ * \param b batch
+ * \param rs reply
+ */
+static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
+{
+        struct ptlrpc_service *svc = rs->rs_service;
+
+        if (svc != b->rsb_svc || b->rsb_n_replies >= MAX_SCHEDULED) {
+                if (b->rsb_svc != NULL) {
+                        rs_batch_dispatch(b);
+                        spin_unlock(&b->rsb_svc->srv_lock);
+                }
+                spin_lock(&svc->srv_lock);
+                b->rsb_svc = svc;
+        }
+        spin_lock(&rs->rs_lock);
+        rs->rs_scheduled_ever = 1;
+        if (rs->rs_scheduled == 0) {
+                list_move(&rs->rs_list, &b->rsb_replies);
+                rs->rs_scheduled = 1;
+                b->rsb_n_replies++;
+        }
+        spin_unlock(&rs->rs_lock);
+}
+
+/**
+ * Reply batch finalization.
+ * Dispatch remaining replies from the batch
+ * and release remaining spinlock.
+ *
+ * \param b batch
+ */
+static void rs_batch_fini(struct rs_batch *b)
+{
+        if (b->rsb_svc != 0) {
+                rs_batch_dispatch(b);
+                spin_unlock(&b->rsb_svc->srv_lock);
+        }
+}
+
+#define DECLARE_RS_BATCH(b)     struct rs_batch b
+
+#else /* __KERNEL__ */
+
+#define rs_batch_init(b)        do{}while(0)
+#define rs_batch_fini(b)        do{}while(0)
+#define rs_batch_add(b, r)      ptlrpc_schedule_difficult_reply(r)
+#define DECLARE_RS_BATCH(b)
+
+#endif /* __KERNEL__ */
+
+void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs)
+{
+#ifdef __KERNEL__
+        struct ptlrpc_hr_service *hr = ptlrpc_hr;
+        int idx;
+        ENTRY;
+
+        LASSERT(list_empty(&rs->rs_list));
+
+        idx = hr->hr_index++;
+        if (hr->hr_index >= hr->hr_n_threads)
+                hr->hr_index = 0;
+        spin_lock(&hr->hr_threads[idx].hrt_lock);
+        list_add_tail(&rs->rs_list, &hr->hr_threads[idx].hrt_queue);
+        spin_unlock(&hr->hr_threads[idx].hrt_lock);
+        cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait);
+        EXIT;
+#else
+        list_add_tail(&rs->rs_list, &rs->rs_service->srv_reply_queue);
+#endif
+}
+
 void
 ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
 {
-        struct ptlrpc_service *svc = rs->rs_service;
         ENTRY;
 
-        LASSERT_SPIN_LOCKED(&svc->srv_lock);
+        LASSERT_SPIN_LOCKED(&rs->rs_service->srv_lock);
+        LASSERT_SPIN_LOCKED(&rs->rs_lock);
         LASSERT (rs->rs_difficult);
         rs->rs_scheduled_ever = 1;              /* flag any notification attempt */
 
@@ -200,9 +349,8 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
         }
 
         rs->rs_scheduled = 1;
-        list_del (&rs->rs_list);
-        list_add (&rs->rs_list, &svc->srv_reply_queue);
-        cfs_waitq_signal (&svc->srv_waitq);
+        list_del_init(&rs->rs_list);
+        ptlrpc_dispatch_difficult_reply(rs);
         EXIT;
 }
 
@@ -211,14 +359,15 @@ ptlrpc_commit_replies (struct obd_device *obd)
 {
         struct list_head   *tmp;
         struct list_head   *nxt;
+        DECLARE_RS_BATCH(batch);
         ENTRY;
 
+        rs_batch_init(&batch);
         /* Find any replies that have been committed and get their service
          * to attend to complete them. */
 
         /* CAVEAT EMPTOR: spinlock ordering!!! */
         spin_lock(&obd->obd_uncommitted_replies_lock);
-
         list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
                 struct ptlrpc_reply_state *rs =
                         list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
@@ -226,16 +375,12 @@ ptlrpc_commit_replies (struct obd_device *obd)
                 LASSERT (rs->rs_difficult);
 
                 if (rs->rs_transno <= obd->obd_last_committed) {
-                        struct ptlrpc_service *svc = rs->rs_service;
-
-                        spin_lock (&svc->srv_lock);
-                        list_del_init (&rs->rs_obd_list);
-                        ptlrpc_schedule_difficult_reply (rs);
-                        spin_unlock (&svc->srv_lock);
+                        list_del_init(&rs->rs_obd_list);
+                        rs_batch_add(&batch, rs);
                 }
         }
-
         spin_unlock(&obd->obd_uncommitted_replies_lock);
+        rs_batch_fini(&batch);
         EXIT;
 }
 
@@ -369,9 +514,12 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_request_history);
         CFS_INIT_LIST_HEAD(&service->srv_active_replies);
+#ifndef __KERNEL__
         CFS_INIT_LIST_HEAD(&service->srv_reply_queue);
+#endif
         CFS_INIT_LIST_HEAD(&service->srv_free_rs_list);
         cfs_waitq_init(&service->srv_free_rs_waitq);
+        atomic_set(&service->srv_n_difficult_replies, 0);
 
         spin_lock_init(&service->srv_at_lock);
         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
@@ -1240,7 +1388,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
             (
 #ifndef __KERNEL__
              /* !@%$# liblustre only has 1 thread */
-             svc->srv_n_difficult_replies != 0 &&
+             atomic_read(&svc->srv_n_difficult_replies) != 0 &&
 #endif
              svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
                  /* Don't handle regular requests in the last thread, in order               * re
@@ -1425,51 +1573,44 @@ out_req:
         RETURN(1);
 }
 
+/**
+ * An internal function to process a single reply state object.
+ */
 static int
-ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
+ptlrpc_handle_rs (struct ptlrpc_reply_state *rs)
 {
-        struct ptlrpc_reply_state *rs;
+        struct ptlrpc_service     *svc = rs->rs_service;
         struct obd_export         *exp;
         struct obd_device         *obd;
         int                        nlocks;
         int                        been_handled;
         ENTRY;
 
-        spin_lock(&svc->srv_lock);
-        if (list_empty (&svc->srv_reply_queue)) {
-                spin_unlock(&svc->srv_lock);
-                RETURN(0);
-        }
-
-        rs = list_entry (svc->srv_reply_queue.next,
-                         struct ptlrpc_reply_state, rs_list);
-
         exp = rs->rs_export;
         obd = exp->exp_obd;
 
         LASSERT (rs->rs_difficult);
         LASSERT (rs->rs_scheduled);
-
-        list_del_init (&rs->rs_list);
-
-        /* Disengage from notifiers carefully (lock order - irqrestore below!)*/
-        spin_unlock(&svc->srv_lock);
-
-        spin_lock (&obd->obd_uncommitted_replies_lock);
-        /* Noop if removed already */
-        list_del_init (&rs->rs_obd_list);
-        spin_unlock (&obd->obd_uncommitted_replies_lock);
+        LASSERT (list_empty(&rs->rs_list));
 
         spin_lock (&exp->exp_lock);
         /* Noop if removed already */
         list_del_init (&rs->rs_exp_list);
         spin_unlock (&exp->exp_lock);
 
-        spin_lock(&svc->srv_lock);
+        spin_lock(&rs->rs_lock);
 
         been_handled = rs->rs_handled;
         rs->rs_handled = 1;
 
+        if (!list_empty(&rs->rs_obd_list)) {
+                spin_unlock(&rs->rs_lock);
+                spin_lock(&obd->obd_uncommitted_replies_lock);
+                spin_lock(&rs->rs_lock);
+                list_del_init(&rs->rs_obd_list);
+                spin_unlock(&obd->obd_uncommitted_replies_lock);
+        }
+
         nlocks = rs->rs_nlocks;                 /* atomic "steal", but */
         rs->rs_nlocks = 0;                      /* locks still on rs_locks! */
 
@@ -1485,7 +1626,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
         }
 
         if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
-                spin_unlock(&svc->srv_lock);
+                spin_unlock(&rs->rs_lock);
 
                 if (!been_handled && rs->rs_on_net) {
                         LNetMDUnlink(rs->rs_md_h);
@@ -1497,34 +1638,59 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
                         ldlm_lock_decref(&rs->rs_locks[nlocks],
                                          rs->rs_modes[nlocks]);
 
-                spin_lock(&svc->srv_lock);
+                spin_lock(&rs->rs_lock);
         }
 
         rs->rs_scheduled = 0;
 
         if (!rs->rs_on_net) {
                 /* Off the net */
-                svc->srv_n_difficult_replies--;
-                if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
-                        /* wake up threads that are being stopped by
-                           ptlrpc_unregister_service/ptlrpc_stop_threads
-                           and sleep waiting svr_n_difficult_replies == 0 */
-                        cfs_waitq_broadcast(&svc->srv_waitq);
-                spin_unlock(&svc->srv_lock);
+                spin_unlock(&rs->rs_lock);
 
                 class_export_put (exp);
                 rs->rs_export = NULL;
                 ptlrpc_rs_decref (rs);
                 atomic_dec (&svc->srv_outstanding_replies);
+                if (atomic_dec_and_test(&svc->srv_n_difficult_replies) &&
+                    svc->srv_is_stopping)
+                        cfs_waitq_broadcast(&svc->srv_waitq);
                 RETURN(1);
         }
 
         /* still on the net; callback will schedule */
-        spin_unlock(&svc->srv_lock);
+        spin_unlock(&rs->rs_lock);
         RETURN(1);
 }
 
 #ifndef __KERNEL__
+
+/**
+ * Check whether given service has a reply available for processing
+ * and process it.
+ *
+ * \param svc a ptlrpc service
+ * \retval 0 no replies processes
+ * \retval 1 one reply processed
+ */
+static int
+ptlrpc_server_handle_reply(struct ptlrpc_service *svc)
+{
+        struct ptlrpc_reply_state *rs = NULL;
+        ENTRY;
+
+        spin_lock(&svc->srv_lock);
+        if (!list_empty(&svc->srv_reply_queue)) {
+                rs = list_entry(svc->srv_reply_queue.prev,
+                                struct ptlrpc_reply_state,
+                                rs_list);
+                list_del_init(&rs->rs_list);
+        }
+        spin_unlock(&svc->srv_lock);
+        if (rs != NULL)
+                ptlrpc_handle_rs(rs);
+        RETURN(rs != NULL);
+}
+
 /* FIXME make use of timeout later */
 int
 liblustre_check_services (void *arg)
@@ -1701,8 +1867,7 @@ static int ptlrpc_main(void *arg)
 
         /* XXX maintain a list of all managed devices: insert here */
 
-        while ((thread->t_flags & SVC_STOPPING) == 0 ||
-               svc->srv_n_difficult_replies != 0) {
+        while ((thread->t_flags & SVC_STOPPING) == 0) {
                 /* Don't exit while there are replies to be handled */
                 struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
                                                      ptlrpc_retry_rqbds, svc);
@@ -1712,12 +1877,10 @@ static int ptlrpc_main(void *arg)
                 cond_resched();
 
                 l_wait_event_exclusive (svc->srv_waitq,
-                              ((thread->t_flags & SVC_STOPPING) != 0 &&
-                               svc->srv_n_difficult_replies == 0) ||
+                              ((thread->t_flags & SVC_STOPPING) != 0) ||
                               (!list_empty(&svc->srv_idle_rqbds) &&
                                svc->srv_rqbd_timeout == 0) ||
                               !list_empty(&svc->srv_req_in_queue) ||
-                              !list_empty(&svc->srv_reply_queue) ||
                               (ptlrpc_server_request_pending(svc, 0) &&
                                (svc->srv_n_active_reqs <
                                 (svc->srv_threads_running - 1))) ||
@@ -1737,9 +1900,6 @@ static int ptlrpc_main(void *arg)
                         ptlrpc_start_thread(dev, svc);
                 }
 
-                if (!list_empty(&svc->srv_reply_queue))
-                        ptlrpc_server_handle_reply(svc);
-
                 if (!list_empty(&svc->srv_req_in_queue)) {
                         /* Process all incoming reqs before handling any */
                         ptlrpc_server_handle_req_in(svc);
@@ -1796,6 +1956,144 @@ out:
         return rc;
 }
 
+struct ptlrpc_hr_args {
+        int                       thread_index;
+        int                       cpu_index;
+        struct ptlrpc_hr_service *hrs;
+};
+
+static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
+                          struct list_head *replies)
+{
+        int result;
+
+        spin_lock(&t->hrt_lock);
+        list_splice_init(&t->hrt_queue, replies);
+        result = test_bit(HRT_STOPPING, &t->hrt_flags) ||
+                !list_empty(replies);
+        spin_unlock(&t->hrt_lock);
+        return result;
+}
+
+static int ptlrpc_hr_main(void *arg)
+{
+        struct ptlrpc_hr_args * hr_args = arg;
+        struct ptlrpc_hr_service *hr = hr_args->hrs;
+        struct ptlrpc_hr_thread *t = &hr->hr_threads[hr_args->thread_index];
+        char threadname[20];
+        CFS_LIST_HEAD(replies);
+
+        snprintf(threadname, sizeof(threadname),
+                 "ptlrpc_hr_%d", hr_args->thread_index);
+
+        ptlrpc_daemonize(threadname);
+#if defined(HAVE_NODE_TO_CPUMASK)
+        set_cpus_allowed(cfs_current(),
+                         node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
+#endif
+        set_bit(HRT_RUNNING, &t->hrt_flags);
+        cfs_waitq_signal(&t->hrt_wait);
+
+        while (!test_bit(HRT_STOPPING, &t->hrt_flags)) {
+
+                cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies));
+                while (!list_empty(&replies)) {
+                        struct ptlrpc_reply_state *rs;
+
+                        rs = list_entry(replies.prev,
+                                        struct ptlrpc_reply_state,
+                                        rs_list);
+                        list_del_init(&rs->rs_list);
+                        ptlrpc_handle_rs(rs);
+                }
+        }
+
+        clear_bit(HRT_RUNNING, &t->hrt_flags);
+        complete(&t->hrt_completion);
+
+        return 0;
+}
+
+static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
+{
+        struct ptlrpc_hr_thread *t = &hr->hr_threads[n];
+        struct ptlrpc_hr_args args;
+        int rc;
+        ENTRY;
+
+        args.thread_index = n;
+        args.cpu_index = cpu;
+        args.hrs = hr;
+
+        rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
+                               CLONE_VM|CLONE_FILES);
+        if (rc < 0) {
+                complete(&t->hrt_completion);
+                GOTO(out, rc);
+        }
+        cfs_wait_event(t->hrt_wait, test_bit(HRT_RUNNING, &t->hrt_flags));
+        RETURN(0);
+ out:
+        return rc;
+}
+
+static void ptlrpc_stop_hr_thread(struct ptlrpc_hr_thread *t)
+{
+        ENTRY;
+
+        set_bit(HRT_STOPPING, &t->hrt_flags);
+        cfs_waitq_signal(&t->hrt_wait);
+        wait_for_completion(&t->hrt_completion);
+
+        EXIT;
+}
+
+static void ptlrpc_stop_hr_threads(struct ptlrpc_hr_service *hrs)
+{
+        int n;
+        ENTRY;
+
+        for (n = 0; n < hrs->hr_n_threads; n++)
+                ptlrpc_stop_hr_thread(&hrs->hr_threads[n]);
+
+        EXIT;
+}
+
+static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
+{
+        int rc = -ENOMEM;
+        int n, cpu, threads_started = 0;
+        ENTRY;
+
+        LASSERT(hr != NULL);
+        LASSERT(hr->hr_n_threads > 0);
+
+        for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) {
+#if defined(HAVE_NODE_TO_CPUMASK)
+                while(!cpu_online(cpu)) {
+                        cpu++;
+                        if (cpu >= num_possible_cpus())
+                                cpu = 0;
+                }
+#endif
+                rc = ptlrpc_start_hr_thread(hr, n, cpu);
+                if (rc != 0)
+                        break;
+                threads_started++;
+                cpu++;
+        }
+        if (threads_started == 0) {
+                CERROR("No reply handling threads started\n");
+                RETURN(-ESRCH);
+        }
+        if (threads_started < hr->hr_n_threads) {
+                CWARN("Started only %d reply handling threads from %d\n",
+                      threads_started, hr->hr_n_threads);
+                hr->hr_n_threads = threads_started;
+        }
+        RETURN(0);
+}
+
 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
                                struct ptlrpc_thread *thread)
 {
@@ -1922,7 +2220,65 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         rc = (thread->t_flags & SVC_STOPPED) ? thread->t_id : 0;
         RETURN(rc);
 }
-#endif
+
+
+int ptlrpc_hr_init(void)
+{
+        int i;
+        int n_cpus = num_online_cpus();
+        struct ptlrpc_hr_service *hr;
+        int size;
+        ENTRY;
+
+        LASSERT(ptlrpc_hr == NULL);
+
+        size = offsetof(struct ptlrpc_hr_service, hr_threads[n_cpus]);
+        OBD_ALLOC(hr, size);
+        if (hr == NULL)
+                RETURN(-ENOMEM);
+        for (i = 0; i < n_cpus; i++) {
+                struct ptlrpc_hr_thread *t = &hr->hr_threads[i];
+
+                spin_lock_init(&t->hrt_lock);
+                cfs_waitq_init(&t->hrt_wait);
+                CFS_INIT_LIST_HEAD(&t->hrt_queue);
+                init_completion(&t->hrt_completion);
+        }
+        hr->hr_n_threads = n_cpus;
+        hr->hr_size = size;
+        ptlrpc_hr = hr;
+
+        RETURN(ptlrpc_start_hr_threads(hr));
+}
+
+void ptlrpc_hr_fini(void)
+{
+        if (ptlrpc_hr != NULL) {
+                ptlrpc_stop_hr_threads(ptlrpc_hr);
+                OBD_FREE(ptlrpc_hr, ptlrpc_hr->hr_size);
+                ptlrpc_hr = NULL;
+        }
+}
+
+#endif /* __KERNEL__ */
+
+/**
+ * Wait until all already scheduled replies are processed.
+ */
+static void ptlrpc_wait_replies(struct ptlrpc_service *svc)
+{
+        while (1) {
+                int rc;
+                struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10),
+                                                     NULL, NULL);
+                rc = l_wait_event(svc->srv_waitq,
+                                  atomic_read(&svc->srv_n_difficult_replies) == 0,
+                                  &lwi);
+                if (rc == 0)
+                        break;
+                CWARN("Unexpectedly long timeout %p\n", svc);
+        }
+}
 
 int ptlrpc_unregister_service(struct ptlrpc_service *service)
 {
@@ -1992,7 +2348,9 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 struct ptlrpc_reply_state *rs =
                         list_entry(service->srv_active_replies.next,
                                    struct ptlrpc_reply_state, rs_list);
+                spin_lock(&rs->rs_lock);
                 ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&rs->rs_lock);
         }
         spin_unlock(&service->srv_lock);
 
@@ -2036,21 +2394,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 ptlrpc_free_rqbd(rqbd);
         }
 
-        /* wait for all outstanding replies to complete (they were
-         * scheduled having been flagged to abort above) */
-        while (atomic_read(&service->srv_outstanding_replies) != 0) {
-                struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(10), NULL, NULL);
-
-                rc = l_wait_event(service->srv_waitq,
-                                  !list_empty(&service->srv_reply_queue), &lwi);
-                LASSERT(rc == 0 || rc == -ETIMEDOUT);
-
-                if (rc == 0) {
-                        ptlrpc_server_handle_reply(service);
-                        continue;
-                }
-                CWARN("Unexpectedly long timeout %p\n", service);
-        }
+        ptlrpc_wait_replies(service);
 
         list_for_each_entry_safe(rs, t, &service->srv_free_rs_list, rs_list) {
                 list_del(&rs->rs_list);
-- 
GitLab