From 05245c55b36d50b1c005c67a7434376677da6686 Mon Sep 17 00:00:00 2001
From: yury <yury>
Date: Sun, 23 Nov 2008 20:38:31 +0000
Subject: [PATCH] b=17631 r=panda,shadow

- fixes possible sync long bulk unlink in ptlrpcd which would lead to assertion in forced umount time. Basically the fix is identical to 17310 where we move req to special phase UNREGISTERING and go processing other rpcs until bulk unlink is done;

- in sync bulk and reply unlink we check for wakeup condition every 1 sec to act quckly if unlink come instead of doing it every 20 sec as before.
---
 lustre/include/lustre_net.h  | 41 +++++++++++++----
 lustre/include/obd_support.h |  7 +--
 lustre/mds/handler.c         |  6 +--
 lustre/ost/ost_handler.c     | 14 +++---
 lustre/ptlrpc/client.c       | 55 ++++++++++++++---------
 lustre/ptlrpc/niobuf.c       | 86 ++++++++++++++++++++++--------------
 lustre/ptlrpc/service.c      |  3 +-
 lustre/tests/sanityN.sh      |  8 ++--
 8 files changed, 139 insertions(+), 81 deletions(-)

diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h
index 65bd60a26e..624d713482 100644
--- a/lustre/include/lustre_net.h
+++ b/lustre/include/lustre_net.h
@@ -387,6 +387,7 @@ struct ptlrpc_request {
                so that servers' early reply updates to the deadline aren't
                kept in per-cpu cache */
         time_t rq_reply_deadline;        /* when req reply unlink must finish. */
+        time_t rq_bulk_deadline;         /* when req bulk unlink must finish. */
         int    rq_timeout;               /* service time estimate (secs) */
 
         /* Multi-rpc bits */
@@ -751,18 +752,40 @@ extern lnet_pid_t ptl_get_pid(void);
 
 /* ptlrpc/niobuf.c */
 int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
-void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
+void ptlrpc_abort_bulk(struct ptlrpc_request *req);
 int ptlrpc_register_bulk(struct ptlrpc_request *req);
-void ptlrpc_unregister_bulk (struct ptlrpc_request *req);
+int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async);
 
-static inline int ptlrpc_bulk_active (struct ptlrpc_bulk_desc *desc)
+static inline int ptlrpc_server_bulk_active(struct ptlrpc_bulk_desc *desc)
 {
-        int           rc;
+        int rc;
+
+        LASSERT(desc != NULL);
+
+        spin_lock(&desc->bd_lock);
+        rc = desc->bd_network_rw;
+        spin_unlock(&desc->bd_lock);
+        return rc;
+}
+
+static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
+{
+        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+        int                      rc;
+
+        LASSERT(req != NULL);
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
+            req->rq_bulk_deadline > cfs_time_current_sec())
+                return 1;
+
+        if (!desc)
+                return 0;
 
         spin_lock(&desc->bd_lock);
         rc = desc->bd_network_rw;
         spin_unlock(&desc->bd_lock);
-        return (rc);
+        return rc;
 }
 
 #define PTLRPC_REPLY_MAYBE_DIFFICULT 0x01
@@ -966,7 +989,7 @@ ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
 static inline int
 ptlrpc_client_early(struct ptlrpc_request *req)
 {
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
             req->rq_reply_deadline > cfs_time_current_sec())
                 return 0;
         return req->rq_early;
@@ -975,7 +998,7 @@ ptlrpc_client_early(struct ptlrpc_request *req)
 static inline int
 ptlrpc_client_replied(struct ptlrpc_request *req)
 {
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
             req->rq_reply_deadline > cfs_time_current_sec())
                 return 0;
         return req->rq_replied;
@@ -984,7 +1007,7 @@ ptlrpc_client_replied(struct ptlrpc_request *req)
 static inline int
 ptlrpc_client_recv(struct ptlrpc_request *req)
 {
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
             req->rq_reply_deadline > cfs_time_current_sec())
                 return 1;
         return req->rq_receiving_reply;
@@ -996,7 +1019,7 @@ ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req)
         int rc;
 
         spin_lock(&req->rq_lock);
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
             req->rq_reply_deadline > cfs_time_current_sec()) {
                 spin_unlock(&req->rq_lock);
                 return 1;
diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h
index 364f7c7e63..cd506a60ef 100644
--- a/lustre/include/obd_support.h
+++ b/lustre/include/obd_support.h
@@ -260,9 +260,10 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_PTLRPC_PAUSE_REP        0x50c
 
 #define OBD_FAIL_PTLRPC_DUMP_LOG         0x50e
-#define OBD_FAIL_PTLRPC_LONG_UNLINK      0x50f
-#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT    0x510
-#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT  0x511
+#define OBD_FAIL_PTLRPC_LONG_REPL_UNLINK 0x50f
+#define OBD_FAIL_PTLRPC_LONG_BULK_UNLINK 0x510
+#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT    0x511
+#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT  0x512
 
 #define OBD_FAIL_OBD_PING_NET            0x600
 #define OBD_FAIL_OBD_LOG_CANCEL_NET      0x601
diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c
index b1df5f5979..cfed97733b 100644
--- a/lustre/mds/handler.c
+++ b/lustre/mds/handler.c
@@ -146,8 +146,8 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                 CERROR("Req deadline already passed %lu (now: %lu)\n",
                        req->rq_deadline, cfs_time_current_sec());
         }
-        lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
-        rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
+        lwi = LWI_TIMEOUT(cfs_time_seconds(max(timeout, 1)), NULL, NULL);
+        rc = l_wait_event(desc->bd_waitq, !ptlrpc_server_bulk_active(desc), &lwi);
         LASSERT (rc == 0 || rc == -ETIMEDOUT);
 
         if (rc == 0) {
@@ -168,7 +168,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 
         EXIT;
  abort_bulk:
-        ptlrpc_abort_bulk (desc);
+        ptlrpc_abort_bulk(req);
  cleanup_buf:
         for (i = 0; i < npages; i++)
                 if (pages[i])
diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c
index 13faf6eeb7..8b1e34c5cf 100644
--- a/lustre/ost/ost_handler.c
+++ b/lustre/ost/ost_handler.c
@@ -877,7 +877,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                                            ost_bulk_timeout,
                                                            desc);
                                 rc = l_wait_event(desc->bd_waitq,
-                                                  !ptlrpc_bulk_active(desc) ||
+                                                  !ptlrpc_server_bulk_active(desc) ||
                                                   exp->exp_failed, &lwi);
                                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
                                 /* Wait again if we changed deadline */
@@ -890,11 +890,11 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                           req->rq_deadline - start,
                                           cfs_time_current_sec() -
                                           req->rq_deadline);
-                                ptlrpc_abort_bulk(desc);
+                                ptlrpc_abort_bulk(req);
                         } else if (exp->exp_failed) {
                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
                                 rc = -ENOTCONN;
-                                ptlrpc_abort_bulk(desc);
+                                ptlrpc_abort_bulk(req);
                         } else if (!desc->bd_success ||
                                    desc->bd_nob_transferred != desc->bd_nob) {
                                 DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
@@ -1098,7 +1098,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc->bd_export->exp_failed)
                 rc = -ENOTCONN;
         else
-                rc = ptlrpc_start_bulk_transfer (desc);
+                rc = ptlrpc_start_bulk_transfer(desc);
         if (rc == 0) {
                 time_t start = cfs_time_current_sec();
                 do {
@@ -1109,7 +1109,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
                                                    ost_bulk_timeout, desc);
                         rc = l_wait_event(desc->bd_waitq,
-                                          !ptlrpc_bulk_active(desc) ||
+                                          !ptlrpc_server_bulk_active(desc) ||
                                           desc->bd_export->exp_failed, &lwi);
                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
                         /* Wait again if we changed deadline */
@@ -1122,11 +1122,11 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                   req->rq_deadline - start,
                                   cfs_time_current_sec() -
                                   req->rq_deadline);
-                        ptlrpc_abort_bulk(desc);
+                        ptlrpc_abort_bulk(req);
                 } else if (desc->bd_export->exp_failed) {
                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
                         rc = -ENOTCONN;
-                        ptlrpc_abort_bulk(desc);
+                        ptlrpc_abort_bulk(req);
                 } else if (!desc->bd_success ||
                            desc->bd_nob_transferred != desc->bd_nob) {
                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c
index faa9b0f76e..7e1e6cd010 100644
--- a/lustre/ptlrpc/client.c
+++ b/lustre/ptlrpc/client.c
@@ -100,8 +100,8 @@ static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal
         return desc;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
-                                               int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
+                                              int npages, int type, int portal)
 {
         struct obd_import *imp = req->rq_import;
         struct ptlrpc_bulk_desc *desc;
@@ -125,8 +125,8 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
         return desc;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
-                                               int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
+                                              int npages, int type, int portal)
 {
         struct obd_export *exp = req->rq_export;
         struct ptlrpc_bulk_desc *desc;
@@ -1060,13 +1060,22 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                          * call interpret before that. We need to make
                          * sure that all rdma transfers finished and will
                          * not corrupt any data. */
-                        if (ptlrpc_client_recv_or_unlink(req))
+                        if (ptlrpc_client_recv_or_unlink(req) ||
+                            ptlrpc_client_bulk_active(req))
                                 continue;
                         
-                        /* Turn fail_loc off to prevent it from looping
+                        /* Turn repl fail_loc off to prevent it from looping
                          * forever. */
-                        OBD_FAIL_CHECK_QUIET(OBD_FAIL_PTLRPC_LONG_UNLINK | 
-                                             OBD_FAIL_ONCE);
+                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
+                                OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK |
+                                               OBD_FAIL_ONCE);
+                        }
+
+                        /* Turn off bulk fail_loc. */
+                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
+                                OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK |
+                                               OBD_FAIL_ONCE);
+                        }
 
                         /* Move to next phase if reply was successfully 
                          * unlinked. */
@@ -1084,7 +1093,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         ptlrpc_expire_one_request(req, 1);
                         
                         /* Check if we still need to wait for unlink. */
-                        if (ptlrpc_client_recv_or_unlink(req))
+                        if (ptlrpc_client_recv_or_unlink(req) ||
+                            ptlrpc_client_bulk_active(req))
                                 continue;
                 }
 
@@ -1150,11 +1160,13 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         lustre_msg_add_flags(req->rq_reqmsg,
                                                              MSG_RESENT);
                                         if (req->rq_bulk) {
-                                                __u64 old_xid = req->rq_xid;
+                                                __u64 old_xid;
 
-                                                ptlrpc_unregister_bulk(req);
+                                                if (!ptlrpc_unregister_bulk(req, 1))
+                                                        continue;
 
                                                 /* ensure previous bulk fails */
+                                                old_xid = req->rq_xid;
                                                 req->rq_xid = ptlrpc_next_xid();
                                                 CDEBUG(D_HA, "resend bulk "
                                                        "old x"LPU64
@@ -1223,7 +1235,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 }
 
                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
-                if (ptlrpc_bulk_active(req->rq_bulk))
+                if (ptlrpc_client_bulk_active(req))
                         continue;
 
                 if (!req->rq_bulk->bd_success) {
@@ -1247,8 +1259,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (!ptlrpc_unregister_reply(req, 1))
                         continue;
 
-                if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk(req);
+                if (!ptlrpc_unregister_bulk(req, 1))
+                        continue;
 
                 /* When calling interpret receiving already should be
                  * finished. */
@@ -1316,13 +1328,11 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
         spin_unlock(&req->rq_lock);
 
         ptlrpc_unregister_reply(req, async_unlink);
+        ptlrpc_unregister_bulk(req, async_unlink);
 
         if (obd_dump_on_timeout)
                 libcfs_debug_dumplog();
 
-        if (req->rq_bulk != NULL)
-                ptlrpc_unregister_bulk (req);
-
         if (imp == NULL) {
                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
                 RETURN(1);
@@ -1661,7 +1671,7 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         LASSERT(!in_interrupt());
 
         /* Let's setup deadline for reply unlink. */
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && 
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && 
             async && request->rq_reply_deadline == 0)
                 request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
 
@@ -1693,7 +1703,8 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         for (;;) {
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+                                           cfs_time_seconds(1), NULL, NULL);
                 rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
                                   &lwi);
                 if (rc == 0) {
@@ -1959,7 +1970,7 @@ restart:
                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
 
                 if (req->rq_bulk != NULL) {
-                        ptlrpc_unregister_bulk (req);
+                        ptlrpc_unregister_bulk(req, 0);
 
                         /* bulk requests are supposed to be
                          * idempotent, so we are free to bump the xid
@@ -2071,7 +2082,7 @@ restart:
                          * me. */
                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
                         brc = l_wait_event(req->rq_reply_waitq,
-                                           !ptlrpc_bulk_active(req->rq_bulk),
+                                           !ptlrpc_client_bulk_active(req),
                                            &lwi);
                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
                         if (brc != 0) {
@@ -2084,7 +2095,7 @@ restart:
                         }
                 }
                 if (rc < 0)
-                        ptlrpc_unregister_bulk (req);
+                        ptlrpc_unregister_bulk(req, 0);
         }
 
         LASSERT(!req->rq_receiving_reply);
diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c
index eb674faae7..eedbb41749 100644
--- a/lustre/ptlrpc/niobuf.c
+++ b/lustre/ptlrpc/niobuf.c
@@ -96,7 +96,7 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
         RETURN (0);
 }
 
-int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
+int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
 {
         struct ptlrpc_connection *conn = desc->bd_export->exp_connection;
         int                       rc;
@@ -164,16 +164,17 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         RETURN(0);
 }
 
-void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
+/* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
+ * serialises with completion callback) */
+void ptlrpc_abort_bulk(struct ptlrpc_request *req)
 {
-        /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
-         * serialises with completion callback) */
-        struct l_wait_info lwi;
-        int                rc;
+        struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+        struct l_wait_info       lwi;
+        int                      rc;
 
-        LASSERT (!in_interrupt ());             /* might sleep */
+        LASSERT(!in_interrupt());               /* might sleep */
 
-        if (!ptlrpc_bulk_active(desc))          /* completed or */
+        if (!ptlrpc_client_bulk_active(req))           /* completed or */
                 return;                         /* never started */
         
         /* Do not send any meaningful data over the wire for evicted clients */
@@ -185,14 +186,15 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
          * but we must still l_wait_event() in this case, to give liblustre
          * a chance to run server_bulk_callback()*/
 
-        LNetMDUnlink (desc->bd_md_h);
+        LNetMDUnlink(desc->bd_md_h);
 
         for (;;) {
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL);
+                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+                                           cfs_time_seconds(1), NULL, NULL);
                 rc = l_wait_event(desc->bd_waitq, 
-                                  !ptlrpc_bulk_active(desc), &lwi);
+                                  !ptlrpc_client_bulk_active(req), &lwi);
                 if (rc == 0)
                         return;
 
@@ -201,7 +203,7 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
         }
 }
 
-int ptlrpc_register_bulk (struct ptlrpc_request *req)
+int ptlrpc_register_bulk(struct ptlrpc_request *req)
 {
         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
         lnet_process_id_t peer;
@@ -275,29 +277,45 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req)
         RETURN(0);
 }
 
-void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
+/* Disconnect a bulk desc from the network. Idempotent. Not
+ * thread-safe (i.e. only interlocks with completion callback). */
+int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
 {
-        /* Disconnect a bulk desc from the network. Idempotent. Not
-         * thread-safe (i.e. only interlocks with completion callback). */
         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
         cfs_waitq_t             *wq;
         struct l_wait_info       lwi;
         int                      rc;
+        ENTRY;
+
+        LASSERT(!in_interrupt());     /* might sleep */
 
-        LASSERT (!in_interrupt ());     /* might sleep */
+        /* Let's setup deadline for reply unlink. */
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) && 
+            async && req->rq_bulk_deadline == 0)
+                req->rq_bulk_deadline = cfs_time_current_sec() + LONG_UNLINK;
 
-        if (!ptlrpc_bulk_active(desc))  /* completed or */
-                return;                 /* never registered */
+        if (!ptlrpc_client_bulk_active(req))  /* completed or */
+                RETURN(1);                    /* never registered */
 
-        LASSERT (desc->bd_req == req);  /* bd_req NULL until registered */
+        LASSERT(desc->bd_req == req);  /* bd_req NULL until registered */
 
         /* the unlink ensures the callback happens ASAP and is the last
          * one.  If it fails, it must be because completion just happened,
          * but we must still l_wait_event() in this case to give liblustre
          * a chance to run client_bulk_callback() */
 
-        LNetMDUnlink (desc->bd_md_h);
-        
+        LNetMDUnlink(desc->bd_md_h);
+
+        if (!ptlrpc_client_bulk_active(req))  /* completed or */
+                RETURN(1);                    /* never registered */
+
+        /* Move to "Unregistering" phase as bulk was not unlinked yet. */
+        ptlrpc_rqphase_move(req, RQ_PHASE_UNREGISTERING);
+
+        /* Do not wait for unlink to finish. */
+        if (async)
+                RETURN(0);
+
         if (req->rq_set != NULL)
                 wq = &req->rq_set->set_waitq;
         else
@@ -306,18 +324,22 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
         for (;;) {
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL);
-                rc = l_wait_event(*wq, !ptlrpc_bulk_active(desc), &lwi);
-                if (rc == 0)
-                        return;
+                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+                                           cfs_time_seconds(1), NULL, NULL);
+                rc = l_wait_event(*wq, !ptlrpc_client_bulk_active(req), &lwi);
+                if (rc == 0) {
+                        ptlrpc_rqphase_move(req, req->rq_next_phase);
+                        RETURN(1);
+                }
 
-                LASSERT (rc == -ETIMEDOUT);
-                DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p",
+                LASSERT(rc == -ETIMEDOUT);
+                DEBUG_REQ(D_WARNING, req, "Unexpectedly long timeout: desc %p",
                           desc);
         }
+        RETURN(0);
 }
 
-int ptlrpc_send_reply (struct ptlrpc_request *req, int flags)
+int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
 {
         struct ptlrpc_service     *svc = req->rq_rqbd->rqbd_service;
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
@@ -619,13 +641,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         request->rq_repmsg = NULL; //remove
 
  cleanup_bulk:
-        if (request->rq_bulk != NULL)
-                ptlrpc_unregister_bulk(request);
-
+        /* We do sync unlink here as there was no real transfer here so
+         * the chance to have long unlink to sluggish net is smaller here. */
+        ptlrpc_unregister_bulk(request, 0);
         return rc;
 }
 
-int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
+int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd)
 {
         struct ptlrpc_service   *service = rqbd->rqbd_service;
         static lnet_process_id_t  match_id = {LNET_NID_ANY, LNET_PID_ANY};
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
index 310001787c..0b7a5db94a 100644
--- a/lustre/ptlrpc/service.c
+++ b/lustre/ptlrpc/service.c
@@ -1879,7 +1879,8 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
 
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+                                           cfs_time_seconds(1), NULL, NULL);
                 rc = l_wait_event(service->srv_waitq,
                                   service->srv_nrqbd_receiving == 0,
                                   &lwi);
diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh
index 727ae08e5f..037043d4c5 100644
--- a/lustre/tests/sanityN.sh
+++ b/lustre/tests/sanityN.sh
@@ -698,14 +698,14 @@ test_33() { #16129
                 done
                 if [ $OPER == "timeout" ] ; then
                         for j in `seq $OSTCOUNT`; do
-                                #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT    0x510
-                                do_facet ost$j lctl set_param fail_loc=0x510
+                                #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT    0x511
+                                do_facet ost$j lctl set_param fail_loc=0x511
                         done
                         echo lock should expire
                 else
                         for j in `seq $OSTCOUNT`; do
-                                #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT  0x511
-                                do_facet ost$j lctl set_param fail_loc=0x511
+                                #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT  0x512
+                                do_facet ost$j lctl set_param fail_loc=0x512
                         done
                         echo lock should not expire
                 fi
-- 
GitLab