diff --git a/lustre/include/lustre_import.h b/lustre/include/lustre_import.h
index 491d0d08500cfa9585ca72b00ffc29edb6f8a84a..30adbdf721c10fcf3008f7e09e424450a2678208 100644
--- a/lustre/include/lustre_import.h
+++ b/lustre/include/lustre_import.h
@@ -123,6 +123,7 @@ struct obd_import {
         cfs_waitq_t               imp_recovery_waitq;
 
         atomic_t                  imp_inflight;
+        atomic_t                  imp_unregistering;
         atomic_t                  imp_replay_inflight;
         atomic_t                  imp_inval_count;
         enum lustre_imp_state     imp_state;
diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h
index 54c43ce515ebb5b193fe872977f7b2bc4f41bfe9..cdca3f6c3dec18826dcff6595059985824a18db1 100644
--- a/lustre/include/lustre_net.h
+++ b/lustre/include/lustre_net.h
@@ -55,6 +55,8 @@
 #include <lustre_import.h>
 #include <lprocfs_status.h>
 
+#include <obd_support.h>
+
 /* MD flags we _always_ use */
 #define PTLRPC_MD_OPTIONS  0
 
@@ -260,11 +262,13 @@ struct ptlrpc_reply_state {
 struct ptlrpc_thread;
 
 enum rq_phase {
-        RQ_PHASE_NEW         = 0xebc0de00,
-        RQ_PHASE_RPC         = 0xebc0de01,
-        RQ_PHASE_BULK        = 0xebc0de02,
-        RQ_PHASE_INTERPRET   = 0xebc0de03,
-        RQ_PHASE_COMPLETE    = 0xebc0de04,
+        RQ_PHASE_NEW            = 0xebc0de00,
+        RQ_PHASE_RPC            = 0xebc0de01,
+        RQ_PHASE_BULK           = 0xebc0de02,
+        RQ_PHASE_INTERPRET      = 0xebc0de03,
+        RQ_PHASE_COMPLETE       = 0xebc0de04,
+        RQ_PHASE_UNREGISTERING  = 0xebc0de05,
+        RQ_PHASE_UNDEFINED      = 0xebc0de06
 };
 
 struct ptlrpc_request_pool {
@@ -300,7 +304,8 @@ struct ptlrpc_request {
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
                 rq_sent_final:1;    /* stop sending early replies */
-        enum rq_phase rq_phase; /* one of RQ_PHASE_* */
+        enum rq_phase rq_phase;     /* one of RQ_PHASE_* */
+        enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
         atomic_t rq_refcount;   /* client-side refcount for SENT race,
                                    server-side refcounf for multiple replies */
 
@@ -363,6 +368,7 @@ struct ptlrpc_request {
         volatile time_t rq_deadline;     /* when request must finish. volatile
                so that servers' early reply updates to the deadline aren't
                kept in per-cpu cache */
+        time_t rq_reply_deadline;        /* when req reply unlink must finish. */
         int    rq_timeout;               /* service time estimate (secs) */
 
         /* Multi-rpc bits */
@@ -411,9 +417,9 @@ static inline int lustre_rep_need_swab(struct ptlrpc_request *req)
 }
 
 static inline const char *
-ptlrpc_rqphase2str(struct ptlrpc_request *req)
+ptlrpc_phase2str(enum rq_phase phase)
 {
-        switch (req->rq_phase) {
+        switch (phase) {
         case RQ_PHASE_NEW:
                 return "New";
         case RQ_PHASE_RPC:
@@ -424,11 +430,19 @@ ptlrpc_rqphase2str(struct ptlrpc_request *req)
                 return "Interpret";
         case RQ_PHASE_COMPLETE:
                 return "Complete";
+        case RQ_PHASE_UNREGISTERING:
+                return "Unregistering";
         default:
                 return "?Phase?";
         }
 }
 
+static inline const char *
+ptlrpc_rqphase2str(struct ptlrpc_request *req)
+{
+        return ptlrpc_phase2str(req->rq_phase);
+}
+
 /* Spare the preprocessor, spoil the bugs. */
 #define FLAG(field, str) (field ? str : "")
 
@@ -741,29 +755,9 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
 void ptlrpc_cleanup_client(struct obd_import *imp);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
 
-static inline int
-ptlrpc_client_recv_or_unlink (struct ptlrpc_request *req)
-{
-        int           rc;
-
-        spin_lock(&req->rq_lock);
-        rc = req->rq_receiving_reply || req->rq_must_unlink;
-        spin_unlock(&req->rq_lock);
-        return (rc);
-}
-
-static inline void
-ptlrpc_wake_client_req (struct ptlrpc_request *req)
-{
-        if (req->rq_set == NULL)
-                cfs_waitq_signal(&req->rq_reply_waitq);
-        else
-                cfs_waitq_signal(&req->rq_set->set_waitq);
-}
-
 int ptlrpc_queue_wait(struct ptlrpc_request *req);
 int ptlrpc_replay_req(struct ptlrpc_request *req);
-void ptlrpc_unregister_reply(struct ptlrpc_request *req);
+int ptlrpc_unregister_reply(struct ptlrpc_request *req, int async);
 void ptlrpc_restart_req(struct ptlrpc_request *req);
 void ptlrpc_abort_inflight(struct obd_import *imp);
 void ptlrpc_abort_set(struct ptlrpc_request_set *set);
@@ -918,6 +912,81 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
 void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
 void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
 
+static inline void
+ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
+{
+        if (req->rq_phase == new_phase)
+                return;
+        
+        if (new_phase == RQ_PHASE_UNREGISTERING) {
+                req->rq_next_phase = req->rq_phase;
+                if (req->rq_import)
+                        atomic_inc(&req->rq_import->imp_unregistering);
+        }
+        
+        if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+                if (req->rq_import)
+                        atomic_dec(&req->rq_import->imp_unregistering);
+        }
+
+        DEBUG_REQ(D_RPCTRACE, req, "move req \"%s\" -> \"%s\"", 
+                  ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
+
+        req->rq_phase = new_phase;
+}
+
+static inline int
+ptlrpc_client_early(struct ptlrpc_request *req)
+{
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec())
+                return 0;
+        return req->rq_early;
+}
+
+static inline int
+ptlrpc_client_replied(struct ptlrpc_request *req)
+{
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec())
+                return 0;
+        return req->rq_replied;
+}
+
+static inline int
+ptlrpc_client_recv(struct ptlrpc_request *req)
+{
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec())
+                return 1;
+        return req->rq_receiving_reply;
+}
+
+static inline int
+ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req)
+{
+        int rc;
+
+        spin_lock(&req->rq_lock);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) &&
+            req->rq_reply_deadline > cfs_time_current_sec()) {
+                spin_unlock(&req->rq_lock);
+                return 1;
+        }
+        rc = req->rq_receiving_reply || req->rq_must_unlink;
+        spin_unlock(&req->rq_lock);
+        return rc;
+}
+
+static inline void
+ptlrpc_client_wake_req(struct ptlrpc_request *req)
+{
+        if (req->rq_set == NULL)
+                cfs_waitq_signal(&req->rq_reply_waitq);
+        else
+                cfs_waitq_signal(&req->rq_set->set_waitq);
+}
+
 static inline void
 ptlrpc_rs_addref(struct ptlrpc_reply_state *rs)
 {
diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h
index 0a8f310e97ca1e85998957a7d6d68e797077b7d0..e040a05d79b218a151ad1b9eaca76e4a4f8c100b 100644
--- a/lustre/include/obd_support.h
+++ b/lustre/include/obd_support.h
@@ -260,6 +260,7 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_PTLRPC_PAUSE_REP        0x50c
 
 #define OBD_FAIL_PTLRPC_DUMP_LOG         0x50e
+#define OBD_FAIL_PTLRPC_LONG_UNLINK      0x50f
 
 #define OBD_FAIL_OBD_PING_NET            0x600
 #define OBD_FAIL_OBD_LOG_CANCEL_NET      0x601
diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c
index 975414d08d523246d02248d91fcf889145ff0dfe..a5e7297c05d852e22816fcd24e713d89eb0416bc 100644
--- a/lustre/obdclass/genops.c
+++ b/lustre/obdclass/genops.c
@@ -857,6 +857,7 @@ struct obd_import *class_new_import(struct obd_device *obd)
         cfs_waitq_init(&imp->imp_recovery_waitq);
 
         atomic_set(&imp->imp_refcount, 2);
+        atomic_set(&imp->imp_unregistering, 0);
         atomic_set(&imp->imp_inflight, 0);
         atomic_set(&imp->imp_replay_inflight, 0);
         atomic_set(&imp->imp_inval_count, 0);
diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c
index 169a1f0c8fbff040e99220d858d23f17935a6770..53a5bae81a86cb62b90f09b5db3011232b77d3ab 100644
--- a/lustre/obdclass/lprocfs_status.c
+++ b/lustre/obdclass/lprocfs_status.c
@@ -654,6 +654,7 @@ int lprocfs_rd_import(char *page, char **start, off_t off, int count,
                       "    target: %s@%s\n"
                       "    state: %s\n"
                       "    inflight: %u\n"
+                      "    unregistering: %u\n"
                       "    conn_cnt: %u\n"
                       "    generation: %u\n"
                       "    inval_cnt: %u\n"
@@ -665,6 +666,7 @@ int lprocfs_rd_import(char *page, char **start, off_t off, int count,
                       obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid,
                       imp_state_name,
                       atomic_read(&imp->imp_inflight),
+                      atomic_read(&imp->imp_unregistering),
                       imp->imp_conn_cnt,
                       imp->imp_generation,
                       atomic_read(&imp->imp_inval_count),
diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c
index ca0b4a5b43a646952a8fbfb072bf6e9604cad5eb..4868d573e73565abb32830a69b97f24fbedc2812 100644
--- a/lustre/ptlrpc/client.c
+++ b/lustre/ptlrpc/client.c
@@ -578,7 +578,9 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
         request->rq_reply_cbid.cbid_arg = request;
 
+        request->rq_reply_deadline = 0;
         request->rq_phase = RQ_PHASE_NEW;
+        request->rq_next_phase = RQ_PHASE_UNDEFINED;
 
         request->rq_request_portal = imp->imp_client->cli_request_portal;
         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
@@ -809,12 +811,12 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
         /* serialise with network callback */
         spin_lock(&req->rq_lock);
 
-        if (req->rq_replied)
+        if (ptlrpc_client_replied(req))
                 GOTO(out, rc = 1);
 
         if (req->rq_net_err && !req->rq_timedout) {
                 spin_unlock(&req->rq_lock);
-                rc = ptlrpc_expire_one_request(req);
+                rc = ptlrpc_expire_one_request(req, 0);
                 spin_lock(&req->rq_lock);
                 GOTO(out, rc);
         }
@@ -828,7 +830,7 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
         if (req->rq_restart)
                 GOTO(out, rc = 1);
 
-        if (req->rq_early) {
+        if (ptlrpc_client_early(req)) {
                 ptlrpc_at_recv_early_reply(req);
                 GOTO(out, rc = 0); /* keep waiting */
         }
@@ -963,7 +965,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
                 RETURN (0);
 
-        req->rq_phase = RQ_PHASE_RPC;
+        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
 
         imp = req->rq_import;
         spin_lock(&imp->imp_lock);
@@ -990,7 +992,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         if (rc != 0) {
                 spin_unlock(&imp->imp_lock);
                 req->rq_status = rc;
-                req->rq_phase = RQ_PHASE_INTERPRET;
+                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                 RETURN(rc);
         }
 
@@ -1036,6 +1038,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                     ptlrpc_send_new_req(req)) {
                         force_timer_recalc = 1;
                 }
+
                 /* delayed send - skip */
                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
                         continue;
@@ -1043,30 +1046,53 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (!(req->rq_phase == RQ_PHASE_RPC ||
                       req->rq_phase == RQ_PHASE_BULK ||
                       req->rq_phase == RQ_PHASE_INTERPRET ||
+                      req->rq_phase == RQ_PHASE_UNREGISTERING ||
                       req->rq_phase == RQ_PHASE_COMPLETE)) {
                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
                         LBUG();
                 }
 
+                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+                        LASSERT(req->rq_next_phase != req->rq_phase);
+                        LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
+
+                        /* Skip processing until reply is unlinked. We
+                         * can't return to pool before that and we can't
+                         * call interpret before that. We need to make
+                         * sure that all rdma transfers finished and will
+                         * not corrupt any data. */
+                        if (ptlrpc_client_recv_or_unlink(req))
+                                continue;
+                        
+                        /* Turn fail_loc off to prevent it from looping
+                         * forever. */
+                        OBD_FAIL_CHECK_QUIET(OBD_FAIL_PTLRPC_LONG_UNLINK | 
+                                             OBD_FAIL_ONCE);
+
+                        /* Move to next phase if reply was successfully 
+                         * unlinked. */
+                        ptlrpc_rqphase_move(req, req->rq_next_phase);
+                }
+
                 if (req->rq_phase == RQ_PHASE_COMPLETE)
                         continue;
 
                 if (req->rq_phase == RQ_PHASE_INTERPRET)
                         GOTO(interpret, req->rq_status);
 
-                if (req->rq_net_err && !req->rq_timedout)
-                        ptlrpc_expire_one_request(req);
+                /* Note that this also will start async reply unlink */
+                if (req->rq_net_err && !req->rq_timedout) {
+                        ptlrpc_expire_one_request(req, 1);
+                        
+                        /* Check if we still need to wait for unlink. */
+                        if (ptlrpc_client_recv_or_unlink(req))
+                                continue;
+                }
 
                 if (req->rq_err) {
-                        ptlrpc_unregister_reply(req);
                         if (req->rq_status == 0)
                                 req->rq_status = -EIO;
-                        req->rq_phase = RQ_PHASE_INTERPRET;
-
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        spin_unlock(&imp->imp_lock);
-
+                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
@@ -1076,15 +1102,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                  * seen a timeout.  our policy is to only interpret
                  * interrupted rpcs after they have timed out */
                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
-                        /* NB could be on delayed list */
-                        ptlrpc_unregister_reply(req);
                         req->rq_status = -EINTR;
-                        req->rq_phase = RQ_PHASE_INTERPRET;
-
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        spin_unlock(&imp->imp_lock);
-
+                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
@@ -1092,7 +1111,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
                                 int status;
 
-                                ptlrpc_unregister_reply(req);
+                                if (!ptlrpc_unregister_reply(req, 1))
+                                        continue;
 
                                 spin_lock(&imp->imp_lock);
 
@@ -1101,19 +1121,22 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         continue;
                                 }
 
-                                list_del_init(&req->rq_list);
                                 if (status != 0)  {
                                         req->rq_status = status;
-                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        ptlrpc_rqphase_move(req, 
+                                                RQ_PHASE_INTERPRET);
                                         spin_unlock(&imp->imp_lock);
                                         GOTO(interpret, req->rq_status);
                                 }
                                 if (req->rq_no_resend) {
                                         req->rq_status = -ENOTCONN;
-                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        ptlrpc_rqphase_move(req, 
+                                                RQ_PHASE_INTERPRET);
                                         spin_unlock(&imp->imp_lock);
                                         GOTO(interpret, req->rq_status);
                                 }
+
+                                list_del_init(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_sending_list);
 
@@ -1130,7 +1153,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         if (req->rq_bulk) {
                                                 __u64 old_xid = req->rq_xid;
 
-                                                ptlrpc_unregister_bulk (req);
+                                                ptlrpc_unregister_bulk(req);
 
                                                 /* ensure previous bulk fails */
                                                 req->rq_xid = ptlrpc_next_xid();
@@ -1154,36 +1177,33 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 
                         spin_lock(&req->rq_lock);
 
-                        if (req->rq_early) {
+                        if (ptlrpc_client_early(req)) {
                                 ptlrpc_at_recv_early_reply(req);
                                 spin_unlock(&req->rq_lock);
                                 continue;
                         }
 
                         /* Still waiting for a reply? */
-                        if (req->rq_receiving_reply) {
+                        if (ptlrpc_client_recv(req)) {
                                 spin_unlock(&req->rq_lock);
                                 continue;
                         }
 
                         /* Did we actually receive a reply? */
-                        if (!req->rq_replied) {
+                        if (!ptlrpc_client_replied(req)) {
                                 spin_unlock(&req->rq_lock);
                                 continue;
                         }
 
                         spin_unlock(&req->rq_lock);
 
-                        spin_lock(&imp->imp_lock);
-                        list_del_init(&req->rq_list);
-                        spin_unlock(&imp->imp_lock);
-
                         req->rq_status = after_reply(req);
                         if (req->rq_resend) {
                                 /* Add this req to the delayed list so
                                    it can be errored if the import is
                                    evicted after recovery. */
                                 spin_lock(&imp->imp_lock);
+                                list_del_init(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_delayed_list);
                                 spin_unlock(&imp->imp_lock);
@@ -1192,15 +1212,15 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 
                         /* If there is no bulk associated with this request,
                          * then we're done and should let the interpreter
-                         * process the reply.  Similarly if the RPC returned
+                         * process the reply. Similarly if the RPC returned
                          * an error, and therefore the bulk will never arrive.
                          */
                         if (req->rq_bulk == NULL || req->rq_status != 0) {
-                                req->rq_phase = RQ_PHASE_INTERPRET;
+                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                                 GOTO(interpret, req->rq_status);
                         }
 
-                        req->rq_phase = RQ_PHASE_BULK;
+                        ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
                 }
 
                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
@@ -1214,19 +1234,26 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                          * the ACK for her PUT. */
                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
                         req->rq_status = -EIO;
-                        req->rq_phase = RQ_PHASE_INTERPRET;
+                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
-                req->rq_phase = RQ_PHASE_INTERPRET;
+                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
         interpret:
                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
-                LASSERT(!req->rq_receiving_reply);
 
-                ptlrpc_unregister_reply(req);
+                /* This moves to "unregistering" phase we need to wait for
+                 * reply unlink. */
+                if (!ptlrpc_unregister_reply(req, 1))
+                        continue;
+
                 if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk (req);
+                        ptlrpc_unregister_bulk(req);
+
+                /* When calling interpret receiving already should be
+                 * finished. */
+                LASSERT(!req->rq_receiving_reply);
 
                 if (req->rq_interpret_reply != NULL) {
                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
@@ -1234,7 +1261,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         req->rq_status = interpreter(req, &req->rq_async_args,
                                                      req->rq_status);
                 }
-                req->rq_phase = RQ_PHASE_COMPLETE;
+                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
 
                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
                        "opc %s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
@@ -1243,9 +1270,13 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
                        lustre_msg_get_opc(req->rq_reqmsg));
 
-                set->set_remaining--;
-
+                spin_lock(&imp->imp_lock);
+                if (!list_empty(&req->rq_list))
+                        list_del_init(&req->rq_list);
                 atomic_dec(&imp->imp_inflight);
+                spin_unlock(&imp->imp_lock);
+
+                set->set_remaining--;
                 cfs_waitq_signal(&imp->imp_recovery_waitq);
         }
 
@@ -1254,7 +1285,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 }
 
 /* Return 1 if we should give up, else 0 */
-int ptlrpc_expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
 {
         struct obd_import *imp = req->rq_import;
         int rc = 0;
@@ -1280,7 +1311,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
         req->rq_timedout = 1;
         spin_unlock(&req->rq_lock);
 
-        ptlrpc_unregister_reply (req);
+        ptlrpc_unregister_reply(req, async_unlink);
 
         if (obd_dump_on_timeout)
                 libcfs_debug_dumplog();
@@ -1337,24 +1368,24 @@ int ptlrpc_expired_set(void *data)
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
-                /* request in-flight? */
-                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting &&
-                       !req->rq_resend) ||
+                /* Request in-flight? */
+                if (!((req->rq_phase == RQ_PHASE_RPC &&
+                       !req->rq_waiting && !req->rq_resend) ||
                       (req->rq_phase == RQ_PHASE_BULK)))
                         continue;
 
-                if (req->rq_timedout ||           /* already dealt with */
+                if (req->rq_timedout ||     /* already dealt with */
                     req->rq_deadline > now) /* not expired */
                         continue;
 
-                /* deal with this guy */
-                ptlrpc_expire_one_request (req);
+                /* Deal with this guy. Do it asynchronously to not block
+                 * ptlrpcd thread. */
+                ptlrpc_expire_one_request(req, 1);
         }
 
         /* When waiting for a whole set, we always to break out of the
          * sleep so we can recalculate the timeout, or enable interrupts
-         * iff everyone's timed out.
-         */
+         * if everyone's timed out. */
         RETURN(1);
 }
 
@@ -1399,17 +1430,27 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
         list_for_each(tmp, &set->set_requests) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
-                /* request in-flight? */
-                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
+                /* Request in-flight? */
+                if (!(((req->rq_phase & 
+                        (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) &&
+                      !req->rq_waiting) ||
                       (req->rq_phase == RQ_PHASE_BULK) ||
                       (req->rq_phase == RQ_PHASE_NEW)))
                         continue;
 
-                if (req->rq_timedout)   /* already timed out */
+                /* Check those waiting for long reply unlink every one 
+                 * second. */
+                if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+                        timeout = 1;
+                        break;
+                }
+
+                /* Already timed out. */
+                if (req->rq_timedout)
                         continue;
 
                 if (req->rq_phase == RQ_PHASE_NEW)
-                        deadline = req->rq_sent;        /* delayed send */
+                        deadline = req->rq_sent;    /* delayed send */
                 else
                         deadline = req->rq_deadline;
 
@@ -1417,9 +1458,8 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
                         timeout = 1;    /* ASAP */
                         break;
                 }
-                if ((timeout == 0) || (timeout > (deadline - now))) {
+                if ((timeout == 0) || (timeout > (deadline - now)))
                         timeout = deadline - now;
-                }
         }
         RETURN(timeout);
 }
@@ -1502,6 +1542,8 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
         struct ptlrpc_request_pool *pool = request->rq_pool;
 
         spin_lock(&pool->prp_lock);
+        LASSERT(list_empty(&request->rq_list));
+        LASSERT(!request->rq_receiving_reply);
         list_add_tail(&request->rq_list, &pool->prp_req_list);
         spin_unlock(&pool->prp_lock);
 }
@@ -1612,24 +1654,41 @@ EXPORT_SYMBOL(ptlrpc_req_xid);
  * IDEMPOTENT, but _not_ safe against concurrent callers.
  * The request owner (i.e. the thread doing the I/O) must call...
  */
-void ptlrpc_unregister_reply (struct ptlrpc_request *request)
+int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 {
         int                rc;
         cfs_waitq_t       *wq;
         struct l_wait_info lwi;
         ENTRY;
 
-        LASSERT(!in_interrupt ());             /* might sleep */
+        /* Might sleep. */
+        LASSERT(!in_interrupt());
+
+        /* Let's setup deadline for reply unlink. */
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && 
+            async && request->rq_reply_deadline == 0)
+                request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
+
+        /* Nothing left to do. */
         if (!ptlrpc_client_recv_or_unlink(request))
-                /* Nothing left to do */
-                return;
+                RETURN(1);
+
+        LNetMDUnlink(request->rq_reply_md_h);
+
+        /* Let's check it once again. */        
+        if (!ptlrpc_client_recv_or_unlink(request))
+                RETURN(1);
+
+        /* Move to "Unregistering" phase as reply was not unlinked yet. */
+        ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
 
-        LNetMDUnlink (request->rq_reply_md_h);
+        /* Do not wait for unlink to finish. */
+        if (async)
+                RETURN(0);
 
         /* We have to l_wait_event() whatever the result, to give liblustre
          * a chance to run reply_in_callback(), and to make sure we've
          * unlinked before returning a req to the pool */
-
         if (request->rq_set != NULL)
                 wq = &request->rq_set->set_waitq;
         else
@@ -1639,17 +1698,19 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request)
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
                 lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
-                rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
-                                   &lwi);
-                if (rc == 0)
-                        return;
+                rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
+                                  &lwi);
+                if (rc == 0) {
+                        ptlrpc_rqphase_move(request, request->rq_next_phase);
+                        RETURN(1);
+                }
 
-                LASSERT (rc == -ETIMEDOUT);
+                LASSERT(rc == -ETIMEDOUT);
                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
                           "rvcng=%d unlnk=%d", request->rq_receiving_reply,
                           request->rq_must_unlink);
         }
-        EXIT;
+        RETURN(0);
 }
 
 /* caller must hold imp->imp_lock */
@@ -1743,7 +1804,7 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
                        old_xid, req->rq_xid);
         }
-        ptlrpc_wake_client_req(req);
+        ptlrpc_client_wake_req(req);
         spin_unlock(&req->rq_lock);
 }
 
@@ -1756,7 +1817,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req)
         spin_lock(&req->rq_lock);
         req->rq_restart = 1;
         req->rq_timedout = 0;
-        ptlrpc_wake_client_req(req);
+        ptlrpc_client_wake_req(req);
         spin_unlock(&req->rq_lock);
 }
 
@@ -1847,7 +1908,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
                lustre_msg_get_opc(req->rq_reqmsg));
 
         /* Mark phase here for a little debug help */
-        req->rq_phase = RQ_PHASE_RPC;
+        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
 
         spin_lock(&imp->imp_lock);
         req->rq_import_generation = imp->imp_generation;
@@ -1939,7 +2000,7 @@ restart:
         } while ((brc == -ETIMEDOUT) &&
                  (req->rq_deadline > cfs_time_current_sec()));
 
-        if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req)) {
+        if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req, 0)) {
                 /* Wait forever for reconnect / replay or failure */
                 lwi = LWI_INTR(interrupted_request, req);
                 brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
@@ -1960,8 +2021,7 @@ restart:
         /* If the reply was received normally, this just grabs the spinlock
          * (ensuring the reply callback has returned), sees that
          * req->rq_receiving_reply is clear and returns. */
-        ptlrpc_unregister_reply (req);
-
+        ptlrpc_unregister_reply(req, 0);
 
         if (req->rq_err) {
                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
@@ -1990,7 +2050,7 @@ restart:
                 GOTO(out, rc = -ETIMEDOUT);
         }
 
-        if (!req->rq_replied) {
+        if (!ptlrpc_client_replied(req)) {
                 /* How can this be? -eeb */
                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
                 LBUG();
@@ -2031,7 +2091,7 @@ restart:
         }
 
         LASSERT(!req->rq_receiving_reply);
-        req->rq_phase = RQ_PHASE_INTERPRET;
+        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
         atomic_dec(&imp->imp_inflight);
         cfs_waitq_signal(&imp->imp_recovery_waitq);
@@ -2052,7 +2112,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         ENTRY;
         atomic_dec(&imp->imp_replay_inflight);
 
-        if (!req->rq_replied) {
+        if (!ptlrpc_client_replied(req)) {
                 CERROR("request replay timed out, restarting recovery\n");
                 GOTO(out, rc = -ETIMEDOUT);
         }
@@ -2072,7 +2132,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         if (req->rq_replay_cb)
                 req->rq_replay_cb(req);
 
-        if (req->rq_replied &&
+        if (ptlrpc_client_replied(req) &&
             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
                           lustre_msg_get_status(req->rq_repmsg),
@@ -2116,6 +2176,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         aa->praa_old_state = req->rq_send_state;
         req->rq_send_state = LUSTRE_IMP_REPLAY;
         req->rq_phase = RQ_PHASE_NEW;
+        req->rq_next_phase = RQ_PHASE_UNDEFINED;
         if (req->rq_repmsg)
                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
         req->rq_status = 0;
@@ -2157,7 +2218,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
                         req->rq_status = -EINTR;
-                        ptlrpc_wake_client_req(req);
+                        ptlrpc_client_wake_req(req);
                 }
                 spin_unlock (&req->rq_lock);
         }
@@ -2172,7 +2233,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
                         req->rq_status = -EINTR;
-                        ptlrpc_wake_client_req(req);
+                        ptlrpc_client_wake_req(req);
                 }
                 spin_unlock (&req->rq_lock);
         }
@@ -2205,7 +2266,7 @@ void ptlrpc_abort_set(struct ptlrpc_request_set *set)
 
                 req->rq_err = 1;
                 req->rq_status = -EINTR;
-                ptlrpc_wake_client_req(req);
+                ptlrpc_client_wake_req(req);
                 spin_unlock (&req->rq_lock);
         }
 }
diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c
index d77a78b107a4da9ffa40e64e069fffd32a5a38fa..e765d5dd279ac17317698bbd29d3eef307694973 100644
--- a/lustre/ptlrpc/events.c
+++ b/lustre/ptlrpc/events.c
@@ -74,7 +74,7 @@ void request_out_callback(lnet_event_t *ev)
                 req->rq_net_err = 1;
                 spin_unlock(&req->rq_lock);
 
-                ptlrpc_wake_client_req(req);
+                ptlrpc_client_wake_req(req);
         }
 
         ptlrpc_req_finished(req);
@@ -165,7 +165,7 @@ void reply_in_callback(lnet_event_t *ev)
 out_wake:
         /* NB don't unlock till after wakeup; req can disappear under us
          * since we don't have our own ref */
-        ptlrpc_wake_client_req(req);
+        ptlrpc_client_wake_req(req);
         spin_unlock(&req->rq_lock);
         EXIT;
 }
@@ -206,7 +206,7 @@ void client_bulk_callback (lnet_event_t *ev)
 
         /* NB don't unlock till after wakeup; desc can disappear under us
          * otherwise */
-        ptlrpc_wake_client_req(desc->bd_req);
+        ptlrpc_client_wake_req(desc->bd_req);
 
         spin_unlock(&desc->bd_lock);
         EXIT;
diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c
index a032cbca174119eeb95008c84f6180aa84a0a5f5..a869a811b64c555c06f12ffe539ec1a893dbdf29 100644
--- a/lustre/ptlrpc/import.c
+++ b/lustre/ptlrpc/import.c
@@ -199,6 +199,47 @@ void ptlrpc_deactivate_import(struct obd_import *imp)
         ptlrpc_deactivate_and_unlock_import(imp);
 }
 
+static unsigned int 
+ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
+{
+        long dl;
+
+        if (!(((req->rq_phase & (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) && 
+              !req->rq_waiting) ||
+              (req->rq_phase == RQ_PHASE_BULK) ||
+              (req->rq_phase == RQ_PHASE_NEW)))
+                return 0;
+
+        if (req->rq_timedout)
+                return 0;
+
+        if (req->rq_phase == RQ_PHASE_NEW)
+                dl = req->rq_sent;
+        else
+                dl = req->rq_deadline;
+
+        if (dl <= now)
+                return 0;
+
+        return dl - now;
+}
+
+static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
+{
+        time_t now = cfs_time_current_sec();
+        struct list_head *tmp, *n;
+        struct ptlrpc_request *req;
+        unsigned int timeout = 0;
+
+        spin_lock(&imp->imp_lock);
+        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
+        }
+        spin_unlock(&imp->imp_lock);
+        return timeout;
+}
+
 /*
  * This function will invalidate the import, if necessary, then block
  * for all the RPC completions, and finally notify the obd to
@@ -210,6 +251,7 @@ void ptlrpc_invalidate_import(struct obd_import *imp)
         struct list_head *tmp, *n;
         struct ptlrpc_request *req;
         struct l_wait_info lwi;
+        unsigned int timeout;
         int rc;
 
         atomic_inc(&imp->imp_inval_count);
@@ -226,31 +268,77 @@ void ptlrpc_invalidate_import(struct obd_import *imp)
 
         LASSERT(imp->imp_invalid);
 
-        /* wait for all requests to error out and call completion callbacks.
-           Cap it at obd_timeout -- these should all have been locally
-           cancelled by ptlrpc_abort_inflight. */
-        lwi = LWI_TIMEOUT_INTERVAL(
-                cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
-                cfs_time_seconds(1), NULL, NULL);
-        rc = l_wait_event(imp->imp_recovery_waitq,
-                          (atomic_read(&imp->imp_inflight) == 0), &lwi);
-
-        if (rc) {
-                CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
-                       obd2cli_tgt(imp->imp_obd), rc,
-                       atomic_read(&imp->imp_inflight));
-                spin_lock(&imp->imp_lock);
-                list_for_each_safe(tmp, n, &imp->imp_sending_list) {
-                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                        DEBUG_REQ(D_ERROR, req, "still on sending list");
-                }
-                list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
-                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                        DEBUG_REQ(D_ERROR, req, "still on delayed list");
+        /* Wait forever until inflight == 0. We really can't do it another
+         * way because in some cases we need to wait for very long reply 
+         * unlink. We can't do anything before that because there is really
+         * no guarantee that some rdma transfer is not in progress right now. */
+        do {
+                /* Calculate max timeout for waiting on rpcs to error 
+                 * out. Use obd_timeout if calculated value is smaller
+                 * than it. */
+                timeout = ptlrpc_inflight_timeout(imp);
+                timeout += timeout / 3;
+                
+                if (timeout == 0)
+                        timeout = obd_timeout;
+                
+                CDEBUG(D_RPCTRACE, "Sleeping %d sec for inflight to error out\n",
+                       timeout);
+
+                /* Wait for all requests to error out and call completion
+                 * callbacks. Cap it at obd_timeout -- these should all
+                 * have been locally cancelled by ptlrpc_abort_inflight. */
+                lwi = LWI_TIMEOUT_INTERVAL(
+                        cfs_timeout_cap(cfs_time_seconds(timeout)),
+                        cfs_time_seconds(1), NULL, NULL);
+                rc = l_wait_event(imp->imp_recovery_waitq,
+                                (atomic_read(&imp->imp_inflight) == 0), &lwi);
+                if (rc) {
+                        const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
+
+                        CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
+                               cli_tgt, rc, atomic_read(&imp->imp_inflight));
+
+                        spin_lock(&imp->imp_lock);
+                        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+                                req = list_entry(tmp, struct ptlrpc_request, 
+                                        rq_list);
+                                DEBUG_REQ(D_ERROR, req, "still on sending list");
+                        }
+                        list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
+                                req = list_entry(tmp, struct ptlrpc_request, 
+                                        rq_list);
+                                DEBUG_REQ(D_ERROR, req, "still on delayed list");
+                        }
+                        
+                        if (atomic_read(&imp->imp_unregistering) == 0) {
+                                /* We know that only "unregistering" rpcs may
+                                 * still survive in sending or delaying lists
+                                 * (They are waiting for long reply unlink in
+                                 * sluggish nets). Let's check this. If there
+                                 * is no unregistering and inflight != 0 this
+                                 * is bug. */
+                                LASSERT(atomic_read(&imp->imp_inflight) == 0);
+                                
+                                /* Let's save one loop as soon as inflight have
+                                 * dropped to zero. No new inflights possible at
+                                 * this point. */
+                                rc = 0;
+                        } else {
+                                
+                                CERROR("%s: RPCs in \"%s\" phase found (%d). "
+                                       "Network is sluggish? Waiting them "
+                                       "to error out.\n", cli_tgt,
+                                       ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
+                                       atomic_read(&imp->imp_unregistering));
+                        }
+                        spin_unlock(&imp->imp_lock);
                 }
-                spin_unlock(&imp->imp_lock);
-                LASSERT(atomic_read(&imp->imp_inflight) == 0);
-        }
+        } while (rc != 0);
+
+        /* Let's additionally check that no new rpcs added to import in
+         * "invalidate" state. */
+        LASSERT(atomic_read(&imp->imp_inflight) == 0);
 
   out:
         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
@@ -545,6 +633,7 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
                 lustre_msg_add_op_flags(request->rq_reqmsg,
                                         MSG_CONNECT_NEXT_VER);
 
+        request->rq_no_resend = request->rq_no_delay = 1;
         request->rq_send_state = LUSTRE_IMP_CONNECTING;
         /* Allow a slightly larger reply for future growth compatibility */
         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c
index a7b03720710b116ffdaadfe6960f942b40043767..eb674faae7151f3dab1c98f60be7159563406766 100644
--- a/lustre/ptlrpc/niobuf.c
+++ b/lustre/ptlrpc/niobuf.c
@@ -474,7 +474,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 
         /* If this is a re-transmit, we're required to have disengaged
          * cleanly from the previous attempt */
-        LASSERT (!request->rq_receiving_reply);
+        LASSERT(!request->rq_receiving_reply);
 
         if (request->rq_import->imp_obd &&
             request->rq_import->imp_obd->obd_fail) {
@@ -611,7 +611,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         rc2 = LNetMEUnlink(reply_me_h);
         LASSERT (rc2 == 0);
         /* UNLINKED callback called synchronously */
-        LASSERT (!request->rq_receiving_reply);
+        LASSERT(!request->rq_receiving_reply);
 
  cleanup_repmsg:
         OBD_FREE(request->rq_repbuf, request->rq_replen);
diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c
index 2bafe82ac9248019778ba11c533c9065dbf871b3..cc08ecadda84185f08c8d8db3121fc33d8ed42cb 100644
--- a/lustre/ptlrpc/pack_generic.c
+++ b/lustre/ptlrpc/pack_generic.c
@@ -2740,7 +2740,7 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask,
                 (char *)req->rq_export->exp_connection->c_remote_uuid.uuid :
                 "<?>", req->rq_request_portal,  req->rq_reply_portal,
                 req->rq_reqlen, req->rq_replen,
-                req->rq_early_count, req->rq_timeout, req->rq_deadline,
+                req->rq_early_count, !!req->rq_timeout, req->rq_deadline,
                 atomic_read(&req->rq_refcount), DEBUG_REQ_FLAGS(req),
                 req_fl, rep_fl, req->rq_status, rep_status);
         va_end(args);
diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c
index 594638acf9b8f53297875cc2b8245ee887ac5b27..1315d6c84d482e8d2042c226851630e7f05c635d 100644
--- a/lustre/ptlrpc/pinger.c
+++ b/lustre/ptlrpc/pinger.c
@@ -540,7 +540,7 @@ static int pinger_check_rpcs(void *arg)
                         req->rq_no_resend = 1;
                         ptlrpc_req_set_repsize(req, 1, NULL);
                         req->rq_send_state = LUSTRE_IMP_FULL;
-                        req->rq_phase = RQ_PHASE_RPC;
+                        ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
                         req->rq_import_generation = generation;
                         ptlrpc_set_add_req(set, req);
                 } else {
@@ -586,17 +586,17 @@ do_check_set:
                 if (req->rq_phase == RQ_PHASE_COMPLETE)
                         continue;
 
-                req->rq_phase = RQ_PHASE_COMPLETE;
+                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
                 atomic_dec(&req->rq_import->imp_inflight);
                 set->set_remaining--;
                 /* If it was disconnected, don't sweat it. */
                 if (list_empty(&req->rq_import->imp_pinger_chain)) {
-                        ptlrpc_unregister_reply(req);
+                        ptlrpc_unregister_reply(req, 0);
                         continue;
                 }
 
                 CDEBUG(D_RPCTRACE, "pinger initiate expire_one_request\n");
-                ptlrpc_expire_one_request(req);
+                ptlrpc_expire_one_request(req, 0);
         }
         mutex_up(&pinger_sem);
 
diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h
index a10f4b06f025bf1f215e911ac0663c4368f7d638..c491094817e8686453735731aed0bba5c6cf3527 100644
--- a/lustre/ptlrpc/ptlrpc_internal.h
+++ b/lustre/ptlrpc/ptlrpc_internal.h
@@ -76,7 +76,7 @@ void ptlrpc_lprocfs_do_request_stat (struct ptlrpc_request *req,
 #endif /* LPROCFS */
 
 /* recovd_thread.c */
-int ptlrpc_expire_one_request(struct ptlrpc_request *req);
+int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink);
 
 /* pers.c */
 void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc);
diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c
index 94f4925cad71ffb79d84acbb8ff73f1226056d95..154db42301da0e14521f3d12845cafd0d238f423 100644
--- a/lustre/ptlrpc/ptlrpcd.c
+++ b/lustre/ptlrpc/ptlrpcd.c
@@ -185,10 +185,11 @@ static int ptlrpcd(void *arg)
          */
         while (1) {
                 struct l_wait_info lwi;
-                cfs_duration_t timeout;
+                int timeout;
 
-                timeout = cfs_time_seconds(ptlrpc_set_next_timeout(pc->pc_set));
-                lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, pc->pc_set);
+                timeout = ptlrpc_set_next_timeout(pc->pc_set);
+                lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1), 
+                                  ptlrpc_expired_set, pc->pc_set);
 
                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
 
diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c
index ed8917bc0e8866ee8ee52ad51bb2cbd5e6998ba6..0045ddd141decea2883f56a167d467ed581816e6 100644
--- a/lustre/ptlrpc/recover.c
+++ b/lustre/ptlrpc/recover.c
@@ -186,7 +186,7 @@ void ptlrpc_wake_delayed(struct obd_import *imp)
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
 
                 DEBUG_REQ(D_HA, req, "waking (set %p):", req->rq_set);
-                ptlrpc_wake_client_req(req);
+                ptlrpc_client_wake_req(req);
         }
         spin_unlock(&imp->imp_lock);
 }
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
index 0514398b65c495cba72fdc0467341fd375269933..72f80f9acb21cad8b550bfe0b8391320869575e1 100644
--- a/lustre/ptlrpc/service.c
+++ b/lustre/ptlrpc/service.c
@@ -1071,7 +1071,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 goto put_rpc_export;
         }
 
-        request->rq_phase = RQ_PHASE_INTERPRET;
+        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
 
         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
@@ -1087,7 +1087,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         rc = svc->srv_handler(request);
 
-        request->rq_phase = RQ_PHASE_COMPLETE;
+        ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
 
         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh
index 9e38a0c0c9ce6db5de405c3615247e9d8874172a..bf1c99381183b7ed7990f2c9d9a30968b3efab0c 100644
--- a/lustre/tests/conf-sanity.sh
+++ b/lustre/tests/conf-sanity.sh
@@ -38,7 +38,7 @@ remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0
 remote_ost_nodsh && skip "remote OST with nodsh" && exit 0
 
 #
-[ "$SLOW" = "no" ] && EXCEPT_SLOW="0 1 2 3 6 7 15 18 24b 25 30 31 32 33 34a "
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="0 1 2 3 6 7 15 18 24b 25 30 31 32 33 34a 45"
 
 assert_DIR
 
@@ -1412,5 +1412,26 @@ test_44() { # 16317
 }
 run_test 44 "mounted client proc entry exists"
 
+test_45() { #17310
+        setup
+        check_mount || return 2
+        stop_mds
+        df -h $MOUNT &
+        log "sleep 60 sec"
+        sleep 60
+#define OBD_FAIL_PTLRPC_LONG_UNLINK   0x50f
+        do_facet client "lctl set_param fail_loc=0x50f"
+        log "sleep 10 sec"
+        sleep 10
+        manual_umount_client --force || return 3
+        do_facet client "lctl set_param fail_loc=0x0"
+        start_mds
+        mount_client $MOUNT || return 4
+        cleanup
+        return 0
+}
+run_test 45 "long unlink handling in ptlrpcd"
+
+
 equals_msg `basename $0`: test complete
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true