From 1940e679ad3a5cddd8c8babd2e558477b3143053 Mon Sep 17 00:00:00 2001
From: ericm <ericm>
Date: Thu, 18 Sep 2008 19:18:50 +0000
Subject: [PATCH] branch: HEAD - fix race of handling early reply in sptlrpc. -
 port AT api changes from b1_8 (b16972) b=16999 r=rread r=wangdi

---
 lustre/include/lustre_sec.h |   5 +-
 lustre/ptlrpc/client.c      | 102 ++++++++++++++++++----------------
 lustre/ptlrpc/gss/sec_gss.c |  29 +++++-----
 lustre/ptlrpc/sec.c         | 106 +++++++++++++++++-------------------
 lustre/ptlrpc/sec_null.c    |   5 +-
 lustre/ptlrpc/sec_plain.c   |  10 +---
 6 files changed, 124 insertions(+), 133 deletions(-)

diff --git a/lustre/include/lustre_sec.h b/lustre/include/lustre_sec.h
index cf6cf4faf5..00c20d279e 100644
--- a/lustre/include/lustre_sec.h
+++ b/lustre/include/lustre_sec.h
@@ -671,8 +671,9 @@ int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
                                int segment, int newsize);
-int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req);
-int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req);
+int  sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
+                                    struct ptlrpc_request **req_ret);
+void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req);
 
 void sptlrpc_request_out_callback(struct ptlrpc_request *req);
 
diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c
index cbe80c696e..f185808e6a 100644
--- a/lustre/ptlrpc/client.c
+++ b/lustre/ptlrpc/client.c
@@ -218,17 +218,15 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
 }
 
 /* Adjust max service estimate based on server value */
-static void ptlrpc_at_adj_service(struct ptlrpc_request *req)
+static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
+                                  unsigned int serv_est)
 {
         int idx;
-        unsigned int serv_est, oldse;
-        struct imp_at *at = &req->rq_import->imp_at;
+        unsigned int oldse;
+        struct imp_at *at;
 
         LASSERT(req->rq_import);
-
-        /* service estimate is returned in the repmsg timeout field,
-           may be 0 on err */
-        serv_est = lustre_msg_get_timeout(req->rq_repmsg);
+        at = &req->rq_import->imp_at;
 
         idx = import_at_get_index(req->rq_import, req->rq_request_portal);
         /* max service estimates are tracked on the server side,
@@ -248,21 +246,22 @@ int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
 }
 
 /* Adjust expected network latency */
-static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req)
+static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
+                                      unsigned int service_time)
 {
-        unsigned int st, nl, oldnl;
-        struct imp_at *at = &req->rq_import->imp_at;
+        unsigned int nl, oldnl;
+        struct imp_at *at;
         time_t now = cfs_time_current_sec();
 
         LASSERT(req->rq_import);
-
-        st = lustre_msg_get_service_time(req->rq_repmsg);
+        at = &req->rq_import->imp_at;
 
         /* Network latency is total time less server processing time */
-        nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/;
-        if (st > now - req->rq_sent + 3 /* bz16408 */)
+        nl = max_t(int, now - req->rq_sent - service_time, 0) +1/*st rounding*/;
+        if (service_time > now - req->rq_sent + 3 /* bz16408 */)
                 CWARN("Reported service time %u > total measured time "
-                       CFS_DURATION_T"\n", st, cfs_time_sub(now, req->rq_sent));
+                      CFS_DURATION_T"\n", service_time,
+                      cfs_time_sub(now, req->rq_sent));
 
         oldnl = at_add(&at->iat_net_latency, nl);
         if (oldnl != 0)
@@ -299,45 +298,53 @@ static int unpack_reply(struct ptlrpc_request *req)
  * Handle an early reply message, called with the rq_lock held.
  * If anything goes wrong just ignore it - same as if it never happened
  */
-static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
-        time_t          olddl;
-        int             rc;
+static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
+{
+        struct ptlrpc_request *early_req;
+        time_t                 olddl;
+        int                    rc;
         ENTRY;
 
         req->rq_early = 0;
         spin_unlock(&req->rq_lock);
 
-        rc = sptlrpc_cli_unwrap_early_reply(req);
-        if (rc)
-                GOTO(out, rc);
-
-        rc = unpack_reply(req);
-        if (rc)
-                GOTO(out_cleanup, rc);
+        rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
+        if (rc) {
+                spin_lock(&req->rq_lock);
+                RETURN(rc);
+        }
 
-        /* Expecting to increase the service time estimate here */
-        ptlrpc_at_adj_service(req);
-        ptlrpc_at_adj_net_latency(req);
+        rc = unpack_reply(early_req);
+        if (rc == 0) {
+                /* Expecting to increase the service time estimate here */
+                ptlrpc_at_adj_service(req,
+                        lustre_msg_get_timeout(early_req->rq_repmsg));
+                ptlrpc_at_adj_net_latency(req,
+                        lustre_msg_get_service_time(early_req->rq_repmsg));
+        }
 
-        /* Adjust the local timeout for this req */
-        ptlrpc_at_set_req_timeout(req);
+        sptlrpc_cli_finish_early_reply(early_req);
 
-        olddl = req->rq_deadline;
-        /* server assumes it now has rq_timeout from when it sent the
-           early reply, so client should give it at least that long. */
-        req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
-                    ptlrpc_at_get_net_latency(req);
-
-        DEBUG_REQ(D_ADAPTTO, req,
-                  "Early reply #%d, new deadline in "CFS_DURATION_T"s ("
-                  CFS_DURATION_T"s)", req->rq_early_count,
-                  cfs_time_sub(req->rq_deadline, cfs_time_current_sec()),
-                  cfs_time_sub(req->rq_deadline, olddl));
-
-out_cleanup:
-        sptlrpc_cli_finish_early_reply(req);
-out:
         spin_lock(&req->rq_lock);
+
+        if (rc == 0) {
+                /* Adjust the local timeout for this req */
+                ptlrpc_at_set_req_timeout(req);
+
+                olddl = req->rq_deadline;
+                /* server assumes it now has rq_timeout from when it sent the
+                   early reply, so client should give it at least that long. */
+                req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
+                            ptlrpc_at_get_net_latency(req);
+
+                DEBUG_REQ(D_ADAPTTO, req,
+                          "Early reply #%d, new deadline in "CFS_DURATION_T"s "
+                          "("CFS_DURATION_T"s)", req->rq_early_count,
+                          cfs_time_sub(req->rq_deadline,
+                                       cfs_time_current_sec()),
+                          cfs_time_sub(req->rq_deadline, olddl));
+        }
+
         RETURN(rc);
 }
 
@@ -976,8 +983,9 @@ static int after_reply(struct ptlrpc_request *req)
         }
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
-        ptlrpc_at_adj_service(req);
-        ptlrpc_at_adj_net_latency(req);
+        ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
+        ptlrpc_at_adj_net_latency(req,
+                                  lustre_msg_get_service_time(req->rq_repmsg));
 
         rc = ptlrpc_check_status(req);
         imp->imp_connect_error = rc;
diff --git a/lustre/ptlrpc/gss/sec_gss.c b/lustre/ptlrpc/gss/sec_gss.c
index 528ea1706e..3312239e2c 100644
--- a/lustre/ptlrpc/gss/sec_gss.c
+++ b/lustre/ptlrpc/gss/sec_gss.c
@@ -735,7 +735,7 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
         struct gss_header      *ghdr, *reqhdr;
         struct lustre_msg      *msg = req->rq_repdata;
         __u32                   major;
-        int                     pack_bulk, early = 0, rc = 0;
+        int                     pack_bulk, rc = 0;
         ENTRY;
 
         LASSERT(req->rq_cli_ctx == ctx);
@@ -743,13 +743,9 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
 
         gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
 
-        if ((char *) msg < req->rq_repbuf ||
-            (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
-                early = 1;
-
         /* special case for context negotiation, rq_repmsg/rq_replen actually
          * are not used currently. but early reply always be treated normally */
-        if (req->rq_ctx_init && !early) {
+        if (req->rq_ctx_init && !req->rq_early) {
                 req->rq_repmsg = lustre_msg_buf(msg, 1, 0);
                 req->rq_replen = msg->lm_buflens[1];
                 RETURN(0);
@@ -780,7 +776,7 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
         case PTLRPC_GSS_PROC_DATA:
                 pack_bulk = ghdr->gh_flags & LUSTRE_GSS_PACK_BULK;
 
-                if (!early && !equi(req->rq_pack_bulk == 1, pack_bulk)) {
+                if (!req->rq_early && !equi(req->rq_pack_bulk == 1, pack_bulk)){
                         CERROR("%s bulk flag in reply\n",
                                req->rq_pack_bulk ? "missing" : "unexpected");
                         RETURN(-EPROTO);
@@ -805,7 +801,7 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
                 if (major != GSS_S_COMPLETE)
                         RETURN(-EPERM);
 
-                if (early && reqhdr->gh_svc == SPTLRPC_SVC_NULL) {
+                if (req->rq_early && reqhdr->gh_svc == SPTLRPC_SVC_NULL) {
                         __u32 cksum;
 
                         cksum = crc32_le(!(__u32) 0,
@@ -837,7 +833,7 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
                 req->rq_replen = msg->lm_buflens[1];
                 break;
         case PTLRPC_GSS_PROC_ERR:
-                if (early) {
+                if (req->rq_early) {
                         CERROR("server return error with early reply\n");
                         rc = -EPROTO;
                 } else {
@@ -957,7 +953,7 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
         struct gss_cli_ctx      *gctx;
         struct gss_header       *ghdr;
         struct lustre_msg       *msg = req->rq_repdata;
-        int                      msglen, pack_bulk, early = 0, rc;
+        int                      msglen, pack_bulk, rc;
         __u32                    major;
         ENTRY;
 
@@ -967,10 +963,6 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
 
         gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
 
-        if ((char *) msg < req->rq_repbuf ||
-            (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
-                early = 1;
-
         ghdr = gss_swab_header(msg, 0);
         if (ghdr == NULL) {
                 CERROR("can't decode gss header\n");
@@ -988,7 +980,7 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
         case PTLRPC_GSS_PROC_DATA:
                 pack_bulk = ghdr->gh_flags & LUSTRE_GSS_PACK_BULK;
 
-                if (!early && !equi(req->rq_pack_bulk == 1, pack_bulk)) {
+                if (!req->rq_early && !equi(req->rq_pack_bulk == 1, pack_bulk)){
                         CERROR("%s bulk flag in reply\n",
                                req->rq_pack_bulk ? "missing" : "unexpected");
                         RETURN(-EPROTO);
@@ -1036,7 +1028,12 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
                 rc = 0;
                 break;
         case PTLRPC_GSS_PROC_ERR:
-                rc = gss_cli_ctx_handle_err_notify(ctx, req, ghdr);
+                if (req->rq_early) {
+                        CERROR("server return error with early reply\n");
+                        rc = -EPROTO;
+                } else {
+                        rc = gss_cli_ctx_handle_err_notify(ctx, req, ghdr);
+                }
                 break;
         default:
                 CERROR("unexpected proc %d\n", ghdr->gh_proc);
diff --git a/lustre/ptlrpc/sec.c b/lustre/ptlrpc/sec.c
index 59fedf7e20..d053bca0d4 100644
--- a/lustre/ptlrpc/sec.c
+++ b/lustre/ptlrpc/sec.c
@@ -950,114 +950,108 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         return do_cli_unwrap_reply(req);
 }
 
-/*
+/**
  * Upon called, the receive buffer might be still posted, so the reply data
  * might be changed at any time, no matter we're holding rq_lock or not. we
  * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
  *
- * we allocate a separate buffer to hold early reply data, pointed by
- * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
- * is the actual buffer size.
- *
- * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
- * process another early reply or real reply, to restore ptlrpc_request
- * to normal status.
+ * we allocate separate ptlrpc_request and reply buffer for early reply
+ * processing, return 0 and @req_ret is a duplicated ptlrpc_request. caller
+ * must call sptlrpc_cli_finish_early_reply() on the returned request to
+ * release it. if anything goes wrong @req_ret will not be set.
  */
-int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
+                                   struct ptlrpc_request **req_ret)
 {
-        struct lustre_msg      *early_buf;
+        struct ptlrpc_request  *early_req;
+        char                   *early_buf;
         int                     early_bufsz, early_size;
         int                     rc;
         ENTRY;
 
-        LASSERT(req->rq_repbuf);
-        LASSERT(req->rq_repdata == NULL);
-        LASSERT(req->rq_repmsg == NULL);
+        OBD_ALLOC_PTR(early_req);
+        if (early_req == NULL)
+                RETURN(-ENOMEM);
 
         early_size = req->rq_nob_received;
-        if (early_size < sizeof(struct lustre_msg)) {
-                CERROR("early reply length %d too small\n", early_size);
-                RETURN(-EPROTO);
-        }
-
         early_bufsz = size_roundup_power2(early_size);
         OBD_ALLOC(early_buf, early_bufsz);
         if (early_buf == NULL)
-                RETURN(-ENOMEM);
+                GOTO(err_req, rc = -ENOMEM);
 
-        /* copy data out, do it inside spinlock */
+        /* sanity checkings and copy data out, do it inside spinlock */
         spin_lock(&req->rq_lock);
 
         if (req->rq_replied) {
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EALREADY);
+                GOTO(err_buf, rc = -EALREADY);
         }
 
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+
         if (req->rq_reply_off != 0) {
                 CERROR("early reply with offset %u\n", req->rq_reply_off);
-                GOTO(err_free, rc = -EPROTO);
+                spin_unlock(&req->rq_lock);
+                GOTO(err_buf, rc = -EPROTO);
         }
 
         if (req->rq_nob_received != early_size) {
                 /* even another early arrived the size should be the same */
-                CWARN("data size has changed from %u to %u\n",
-                      early_size, req->rq_nob_received);
+                CERROR("data size has changed from %u to %u\n",
+                       early_size, req->rq_nob_received);
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EINVAL);
+                GOTO(err_buf, rc = -EINVAL);
         }
 
         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
                 CERROR("early reply length %d too small\n",
                        req->rq_nob_received);
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EALREADY);
+                GOTO(err_buf, rc = -EALREADY);
         }
 
         memcpy(early_buf, req->rq_repbuf, early_size);
         spin_unlock(&req->rq_lock);
 
-        req->rq_repdata = early_buf;
-        req->rq_repdata_len = early_size;
-
-        rc = do_cli_unwrap_reply(req);
-
-        /* treate resend as an error case. in fact server should never ask
-         * resend via early reply. */
-        if (req->rq_resend) {
-                req->rq_resend = 0;
-                rc = -EPROTO;
-        }
+        early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
+        early_req->rq_flvr = req->rq_flvr;
+        early_req->rq_repbuf = early_buf;
+        early_req->rq_repbuf_len = early_bufsz;
+        early_req->rq_repdata = (struct lustre_msg *) early_buf;
+        early_req->rq_repdata_len = early_size;
+        early_req->rq_early = 1;
 
+        rc = do_cli_unwrap_reply(early_req);
         if (rc) {
-                LASSERT(req->rq_repmsg == NULL);
-                req->rq_repdata = NULL;
-                req->rq_repdata_len = 0;
-                GOTO(err_free, rc);
+                DEBUG_REQ(D_ADAPTTO, early_req,
+                          "error %d unwrap early reply", rc);
+                GOTO(err_ctx, rc);
         }
 
-        LASSERT(req->rq_repmsg);
+        LASSERT(early_req->rq_repmsg);
+        *req_ret = early_req;
         RETURN(0);
 
-err_free:
+err_ctx:
+        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+err_buf:
         OBD_FREE(early_buf, early_bufsz);
+err_req:
+        OBD_FREE_PTR(early_req);
         RETURN(rc);
 }
 
-int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
 {
-        int     early_bufsz;
-
-        LASSERT(req->rq_repdata);
-        LASSERT(req->rq_repdata_len);
-        LASSERT(req->rq_repmsg);
+        LASSERT(early_req->rq_repbuf);
+        LASSERT(early_req->rq_repdata);
+        LASSERT(early_req->rq_repmsg);
 
-        early_bufsz = size_roundup_power2(req->rq_repdata_len);
-        OBD_FREE(req->rq_repdata, early_bufsz);
-
-        req->rq_repdata = NULL;
-        req->rq_repdata_len = 0;
-        req->rq_repmsg = NULL;
-        return 0;
+        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+        OBD_FREE(early_req->rq_repbuf, early_req->rq_repbuf_len);
+        OBD_FREE_PTR(early_req);
 }
 
 /**************************************************
diff --git a/lustre/ptlrpc/sec_null.c b/lustre/ptlrpc/sec_null.c
index 813adf0baf..ea7d5aefb9 100644
--- a/lustre/ptlrpc/sec_null.c
+++ b/lustre/ptlrpc/sec_null.c
@@ -109,10 +109,7 @@ int null_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 
         LASSERT(req->rq_repdata);
 
-        /* real reply rq_repdata point inside of rq_reqbuf; early reply
-         * rq_repdata point to a separate allocated space */
-        if ((char *) req->rq_repdata < req->rq_repbuf ||
-            (char *) req->rq_repdata >= req->rq_repbuf + req->rq_repbuf_len) {
+        if (req->rq_early) {
                 cksums = req->rq_repdata->lm_cksum;
                 req->rq_repdata->lm_cksum = 0;
 
diff --git a/lustre/ptlrpc/sec_plain.c b/lustre/ptlrpc/sec_plain.c
index 024e29b0c3..291a9fb526 100644
--- a/lustre/ptlrpc/sec_plain.c
+++ b/lustre/ptlrpc/sec_plain.c
@@ -151,7 +151,6 @@ static
 int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
         struct lustre_msg *msg = req->rq_repdata;
-        int                early = 0;
         __u32              cksum;
         ENTRY;
 
@@ -160,18 +159,13 @@ int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
                 RETURN(-EPROTO);
         }
 
-        /* find out if it's an early reply */
-        if ((char *) msg < req->rq_repbuf ||
-            (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
-                early = 1;
-
         /* expect no user desc in reply */
         if (PLAIN_WFLVR_HAS_USER(msg->lm_secflvr)) {
                 CERROR("Unexpected udesc flag in reply\n");
                 RETURN(-EPROTO);
         }
 
-        if (unlikely(early)) {
+        if (unlikely(req->rq_early)) {
                 cksum = crc32_le(!(__u32) 0,
                                  lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0),
                                  lustre_msg_buflen(msg, PLAIN_PACK_MSG_OFF));
@@ -183,7 +177,7 @@ int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
         } else {
                 /* whether we sent with bulk or not, we expect the same
                  * in reply, except for early reply */
-                if (!early &&
+                if (!req->rq_early &&
                     !equi(req->rq_pack_bulk == 1,
                           PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr))) {
                         CERROR("%s bulk checksum in reply\n",
-- 
GitLab