From f7ae5bb5398d5a2a9236ee461a4fee2cbc93d2b8 Mon Sep 17 00:00:00 2001
From: bobijam <bobijam>
Date: Tue, 19 Aug 2008 02:23:13 +0000
Subject: [PATCH] Branch b1_6 b=16566 o=Jonathan Li(jli@cray.com) i=shadow,
 bobijam

Description: Upcall on Lustre log has been dumped
Details    : Allow for a user mode script to be called once a Lustre log has
             been dumped. It passes the filename of the dumped log to the
             script, the location of the script can be specified via
             /proc/sys/lnet/debug_log_upcall.
---
 lustre/ChangeLog             | 18 +++++--
 lustre/include/obd_support.h | 14 ++---
 lustre/ptlrpc/service.c      | 99 +++++++++++++++++++-----------------
 3 files changed, 72 insertions(+), 59 deletions(-)

diff --git a/lustre/ChangeLog b/lustre/ChangeLog
index 1d37a4a8f3..b30b4bb040 100644
--- a/lustre/ChangeLog
+++ b/lustre/ChangeLog
@@ -38,17 +38,25 @@ tbd Sun Microsystems, Inc.
 	* Output of lfs quota has been made less detailed by default,
 	  old (verbose) output can be obtained by using -v option.
 
+Severity   : enhancement
+Bugzilla   : 16566
+Description: Upcall on Lustre log has been dumped
+Details    : Allow for a user mode script to be called once a Lustre log has
+             been dumped. It passes the filename of the dumped log to the
+	     script, the location of the script can be specified via
+	     /proc/sys/lnet/debug_log_upcall.
+
 Severity   : minor
 Bugzilla   : 16583
 Frequency  : rare
-Description: avoid messages about idr_remove called for id  which is not allocated. 
-Details    : Move assigment s_dev for clustered nfs to end of initialization, for avoid
-             problem with error handling.
+Description: avoid messages about idr_remove called for id  which is not allocated.
+Details    : Move assigment s_dev for clustered nfs to end of initialization, for
+             avoid problem with error handling.
 
 Severity   : minor
 Bugzilla   : 16583
 Frequency  : rare
-Description: avoid messages about idr_remove called for id  which is not allocated. 
+Description: avoid messages about idr_remove called for id  which is not allocated.
 Details    : Move assigment s_dev for clustered nfs to end of initialization, for avoid
              problem with error handling.
 
@@ -123,7 +131,7 @@ Details    : release reference to stats when client disconnected, not
 Severity   : normal
 Bugzilla   : 16679
 Description: more cleanup in mds_lov
-Details    : add workaround for get valid ost count for avoid warnings about 
+Details    : add workaround for get valid ost count for avoid warnings about
              drop too big messages, not init llog cat under semphore which
              can be blocked on reconnect and break normal replay, fix access
              to wrong pointer.
diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h
index 84c0790545..56c048173b 100644
--- a/lustre/include/obd_support.h
+++ b/lustre/include/obd_support.h
@@ -55,7 +55,7 @@ extern unsigned int obd_fail_val;
 extern unsigned int obd_debug_peer_on_timeout;
 extern unsigned int obd_dump_on_timeout;
 extern unsigned int obd_dump_on_eviction;
-/* obd_timeout should only be used for recovery, not for 
+/* obd_timeout should only be used for recovery, not for
    networking / disk / timings affected by load (use Adaptive Timeouts) */
 extern unsigned int obd_timeout;          /* seconds */
 extern unsigned int ldlm_timeout;         /* seconds */
@@ -92,12 +92,12 @@ extern unsigned int obd_alloc_fail_rate;
 #define CONNECTION_SWITCH_MAX min(50U, max(CONNECTION_SWITCH_MIN,obd_timeout))
 #define CONNECTION_SWITCH_INC 5  /* Connection timeout backoff */
 #ifndef CRAY_XT3
-/* In general this should be low to have quick detection of a system 
+/* In general this should be low to have quick detection of a system
    running on a backup server. (If it's too low, import_select_connection
    will increase the timeout anyhow.)  */
 #define INITIAL_CONNECT_TIMEOUT max(CONNECTION_SWITCH_MIN,obd_timeout/20)
 #else
-/* ...but for very large systems (e.g. CRAY) we need to keep the initial 
+/* ...but for very large systems (e.g. CRAY) we need to keep the initial
    connect t.o. high (bz 10803), because they will nearly ALWAYS be doing the
    connects for the first time (clients "reboot" after every process, so no
    chance to generate adaptive timeout data. */
@@ -258,6 +258,8 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_FAIL_PTLRPC_PAUSE_REQ        0x50a
 #define OBD_FAIL_PTLRPC_PAUSE_REP        0x50c
 
+#define OBD_FAIL_PTLRPC_DUMP_LOG         0x50e
+
 #define OBD_FAIL_OBD_PING_NET            0x600
 #define OBD_FAIL_OBD_LOG_CANCEL_NET      0x601
 #define OBD_FAIL_OBD_LOGD_NET            0x602
@@ -404,7 +406,7 @@ extern atomic_t libcfs_kmemory;
 #define OBD_ALLOC_FAIL_MASK ((1 << OBD_ALLOC_FAIL_BITS) - 1)
 #define OBD_ALLOC_FAIL_MULT (OBD_ALLOC_FAIL_MASK / 100)
 
-#ifdef LPROCFS 
+#ifdef LPROCFS
 #define obd_memory_add(size)                                                  \
         lprocfs_counter_add(obd_memory, OBD_MEMORY_STAT, (long)(size))
 #define obd_memory_sub(size)                                                  \
@@ -446,7 +448,7 @@ static inline void obd_memory_sub(long size)
         obd_alloc -= size;
 }
 
-static inline void obd_pages_add(int order) 
+static inline void obd_pages_add(int order)
 {
         obd_pages += 1<< order;
         if (obd_pages > obd_max_pages)
@@ -606,7 +608,7 @@ do {                                                                          \
         cfs_mem_cache_free((slab), (ptr));                                    \
         (ptr) = NULL;                                                         \
         0;                                                                    \
-}) 
+})
 #define OBD_SLAB_ALLOC(ptr, slab, type, size)                                 \
 do {                                                                          \
         LASSERT(!in_interrupt());                                             \
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
index 54ab2a306c..3d66064d32 100644
--- a/lustre/ptlrpc/service.c
+++ b/lustre/ptlrpc/service.c
@@ -301,7 +301,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_factor,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn, 
+                svcreq_printfn_t svcreq_printfn,
                 int min_threads, int max_threads, char *threadname)
 {
         int                    rc;
@@ -310,7 +310,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
 
         LASSERT (nbufs > 0);
         LASSERT (bufsize >= max_req_size);
-        
+
         OBD_ALLOC(service, sizeof(*service));
         if (service == NULL)
                 RETURN(NULL);
@@ -353,14 +353,14 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
         CFS_INIT_LIST_HEAD(&service->srv_at_list);
         cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
-        /* At SOW, service time should be quick; 10s seems generous. If client 
+        /* At SOW, service time should be quick; 10s seems generous. If client
            timeout is less than this, we'll be sending an early reply. */
         at_init(&service->srv_at_estimate, 10, 0);
 
         spin_lock (&ptlrpc_all_services_lock);
         list_add (&service->srv_list, &ptlrpc_all_services);
         spin_unlock (&ptlrpc_all_services_lock);
-        
+
         /* Now allocate the request buffers */
         rc = ptlrpc_grow_req_bufs(service);
         /* We shouldn't be under memory pressure at startup, so
@@ -554,7 +554,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
                                oldest_time);
                 }
         } else {
-                if (cfs_time_current_sec() > 
+                if (cfs_time_current_sec() >
                     (exp->exp_obd->obd_eviction_timer + extra_delay)) {
                         /* The evictor won't evict anyone who we've heard from
                          * recently, so we don't have to check before we start
@@ -569,7 +569,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 
 static int ptlrpc_check_req(struct ptlrpc_request *req)
 {
-        if (lustre_msg_get_conn_cnt(req->rq_reqmsg) < 
+        if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
             req->rq_export->exp_conn_cnt) {
                 DEBUG_REQ(D_ERROR, req,
                           "DROPPING req from old connection %d < %d",
@@ -602,11 +602,11 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
         }
 
         /* Set timer for closest deadline */
-        rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request, 
+        rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
                         rq_timed_list);
         next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
                        at_early_margin);
-        if (next <= 0) 
+        if (next <= 0)
                 ptlrpc_at_timer((unsigned long)svc);
         else
                 cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
@@ -621,12 +621,12 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         struct ptlrpc_request *rq;
         int found = 0;
 
-        if (AT_OFF) 
+        if (AT_OFF)
                 return(0);
 
         if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
                 return(-ENOSYS);
-        
+
         spin_lock(&svc->srv_at_lock);
 
         if (unlikely(req->rq_sent_final)) {
@@ -657,9 +657,9 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
                 ptlrpc_at_set_timer(svc);
 
         return 0;
-}            
+}
 
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, 
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                                       int extra_time)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
@@ -669,18 +669,18 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
         time_t newdl;
         int rc;
         ENTRY;
-                            
-        /* deadline is when the client expects us to reply, margin is the 
+
+        /* deadline is when the client expects us to reply, margin is the
            difference between clients' and servers' expectations */
-        DEBUG_REQ(D_ADAPTTO, req, 
+        DEBUG_REQ(D_ADAPTTO, req,
                   "%ssending early reply (deadline %+lds, margin %+lds) for "
                   "%d+%d", AT_OFF ? "AT off - not " : "",
                   olddl, olddl - at_get(&svc->srv_at_estimate),
                   at_get(&svc->srv_at_estimate), extra_time);
 
-        if (AT_OFF) 
+        if (AT_OFF)
                 RETURN(0);
-        
+
         if (olddl < 0) {
                 DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
                           "not sending early reply. Consider increasing "
@@ -722,7 +722,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 OBD_FREE(reqcopy, sizeof *reqcopy);
                 RETURN(-ENOMEM);
         }
-        
+
         *reqcopy = *req;
         reqcopy->rq_reply_state = NULL;
         reqcopy->rq_rep_swab_mask = 0;
@@ -744,12 +744,12 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
 
         /* RPC ref */
         class_export_rpc_get(reqcopy->rq_export);
-        if (reqcopy->rq_export->exp_obd && 
+        if (reqcopy->rq_export->exp_obd &&
             reqcopy->rq_export->exp_obd->obd_fail)
                 GOTO(out_put, rc = -ENODEV);
 
         rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
-        if (rc) 
+        if (rc)
                 GOTO(out_put, rc);
 
         rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
@@ -762,7 +762,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
         }
 
-        /* Free the (early) reply state from lustre_pack_reply. 
+        /* Free the (early) reply state from lustre_pack_reply.
            (ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
         ptlrpc_req_drop_rs(reqcopy);
 
@@ -793,10 +793,10 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         }
         delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
         svc->srv_at_check = 0;
-        
+
         if (list_empty(&svc->srv_at_list)) {
                 spin_unlock(&svc->srv_at_lock);
-                RETURN(0);      
+                RETURN(0);
         }
 
         /* The timer went off, but maybe the nearest rpc already completed. */
@@ -807,10 +807,10 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 /* We've still got plenty of time.  Reset the timer. */
                 spin_unlock(&svc->srv_at_lock);
                 ptlrpc_at_set_timer(svc);
-                RETURN(0);      
+                RETURN(0);
         }
 
-        /* We're close to a timeout, and we don't know how much longer the 
+        /* We're close to a timeout, and we don't know how much longer the
            server will take. Send early replies to everyone expiring soon. */
         CFS_INIT_LIST_HEAD(&work_list);
         list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
@@ -829,9 +829,9 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
 
         CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
                "replies\n", first, at_extra, counter);
-        
+
         if (first < 0) {
-                /* We're already past request deadlines before we even get a 
+                /* We're already past request deadlines before we even get a
                    chance to send early replies */
                 LCONSOLE_WARN("%s: This server is not able to keep up with "
                               "request traffic (cpu-bound).\n",  svc->srv_name);
@@ -927,18 +927,18 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 rc = ptlrpc_check_req(req);
                 class_export_put(req->rq_export);
                 req->rq_export = NULL;
-                if (rc) 
+                if (rc)
                         goto err_req;
         }
 
         /* req_in handling should/must be fast */
-        if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5) 
+        if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
                 DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
                           cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
 
         /* Set rpc server deadline and add it to the timed list */
         deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
-                    MSGHDR_AT_SUPPORT) ? 
+                    MSGHDR_AT_SUPPORT) ?
                    /* The max time the client expects us to take */
                    lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
         req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
@@ -946,7 +946,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
                 goto err_req;
         }
-        
+
         ptlrpc_at_add_timed(req);
 
         /* Move it over to the request processing queue */
@@ -990,7 +990,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
              svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
                 /* Don't handle regular requests in the last thread, in order               * remain free to handle any 'difficult' replies (that might
                  * to handle difficult replies (which might block other threads)
-                 * as well as handle any incoming reqs, early replies, etc. 
+                 * as well as handle any incoming reqs, early replies, etc.
                  * That means we always need at least 2 service threads. */
                 spin_unlock(&svc->srv_lock);
                 RETURN(0);
@@ -1004,6 +1004,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         spin_unlock(&svc->srv_lock);
 
+        if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+                libcfs_debug_dumplog();
+
         do_gettimeofday(&work_start);
         timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
         if (svc->srv_stats != NULL) {
@@ -1016,9 +1019,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
                                     at_get(&svc->srv_at_estimate));
         }
-        
+
         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
-        
+
         request->rq_svc_thread = thread;
         request->rq_export = class_conn2export(
                                      lustre_msg_get_handle(request->rq_reqmsg));
@@ -1030,7 +1033,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 export = class_export_rpc_get(request->rq_export);
         }
 
-        /* Discard requests queued for longer than the deadline.  
+        /* Discard requests queued for longer than the deadline.
            The deadline is increased if we send an early reply. */
         if (cfs_time_current_sec() > request->rq_deadline) {
                 DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
@@ -1057,7 +1060,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
 
         rc = svc->srv_handler(request);
-        
+
         request->rq_phase = RQ_PHASE_COMPLETE;
 
         CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
@@ -1095,7 +1098,7 @@ put_conn:
                cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
                request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
                request->rq_transno, request->rq_status,
-               request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg): 
+               request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
                -999);
         if (svc->srv_stats != NULL) {
                 __u32 op = lustre_msg_get_opc(request->rq_reqmsg);
@@ -1108,7 +1111,7 @@ put_conn:
                 }
         }
         if (request->rq_early_count) {
-                DEBUG_REQ(D_ADAPTTO, request, 
+                DEBUG_REQ(D_ADAPTTO, request,
                           "sent %d early replies before finishing in %lds",
                           request->rq_early_count,
                           work_end.tv_sec - request->rq_arrival_time.tv_sec);
@@ -1365,8 +1368,8 @@ static int ptlrpc_main(void *arg)
          */
         cfs_waitq_signal(&thread->t_ctl_waitq);
 
-        watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 : 
-                                   at_get(&svc->srv_at_estimate)) * 
+        watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
+                                   at_get(&svc->srv_at_estimate)) *
                                    svc->srv_watchdog_factor, NULL, NULL);
 
         spin_lock(&svc->srv_lock);
@@ -1404,8 +1407,8 @@ static int ptlrpc_main(void *arg)
                               &lwi);
 
                 lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
-                                     AT_OFF ? 0 : 
-                                     at_get(&svc->srv_at_estimate)) * 
+                                     AT_OFF ? 0 :
+                                     at_get(&svc->srv_at_estimate)) *
                                      svc->srv_watchdog_factor);
 
                 ptlrpc_check_rqbd_pool(svc);
@@ -1422,13 +1425,13 @@ static int ptlrpc_main(void *arg)
                 if (!list_empty(&svc->srv_req_in_queue)) {
                         /* Process all incoming reqs before handling any */
                         ptlrpc_server_handle_req_in(svc);
-                        /* but limit ourselves in case of flood */ 
+                        /* but limit ourselves in case of flood */
                         if (counter++ < 1000)
                                 continue;
                         counter = 0;
                 }
 
-                if (svc->srv_at_check) 
+                if (svc->srv_at_check)
                         ptlrpc_at_check_timed(svc);
 
                 /* don't handle requests in the last thread */
@@ -1569,7 +1572,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         d.thread = thread;
 
         CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-        
+
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in ptlrpc_daemonize() right away.
          */
@@ -1624,7 +1627,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
          * its 'unlink' flag set for each posted rqbd */
         list_for_each(tmp, &service->srv_active_rqbds) {
                 struct ptlrpc_request_buffer_desc *rqbd =
-                        list_entry(tmp, struct ptlrpc_request_buffer_desc, 
+                        list_entry(tmp, struct ptlrpc_request_buffer_desc,
                                    rqbd_list);
 
                 rc = LNetMDUnlink(rqbd->rqbd_md_h);
@@ -1752,14 +1755,14 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
                 spin_unlock(&svc->srv_lock);
                 return 0;
         }
-        
+
         /* How long has the next entry been waiting? */
         request = list_entry(svc->srv_request_queue.next,
                              struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
         spin_unlock(&svc->srv_lock);
 
-        if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 : 
+        if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
                                         at_max)) {
                 CERROR("%s: unhealthy - request has been waiting %lds\n",
                        svc->srv_name, timediff / ONE_MILLION);
-- 
GitLab