From dbac3b15f9157e2b3913057d058b336e807351e6 Mon Sep 17 00:00:00 2001 From: shaver <shaver> Date: Sat, 17 Aug 2002 22:06:32 +0000 Subject: [PATCH] * l_wait_event can now do interrupts without a timeout, if we're feeling brave. * Big doc comment for l_wait_event. * Only fire the timeout once from l_wait_event. * Made timeout and the recovery-upcall path configurable via sysctl. * Added OBD_FAIL_OSC codes for simulating simple client failure. * Tentative rewiring of recovd into client connections, needs more thought and then more typing. We do fire the upcall, at least. * Use the provided cluuid instead of NULL wherever it's handy already. * Protect (feebly) against waiting for recovery that will never happen, in sync_io_timeout. * Add timeouts to bulk operations in MDS and OST -- a recovery stub is now triggered, but nothing else. * Document the unpleasant business in osc_brw_{read,write} as pertains to errors in the callbacks and cleanup of descriptors. * Remove now-unused ptlrpc_check_bulk_{sent,received}. --- lustre/include/linux/lustre_lib.h | 90 ++++++++++++++++++++---------- lustre/include/linux/lustre_mds.h | 4 +- lustre/include/linux/obd_support.h | 8 ++- lustre/lib/l_net.c | 11 ++-- lustre/lib/page.c | 18 +++--- lustre/llite/recover.c | 1 - lustre/mds/handler.c | 23 ++++++-- lustre/obdclass/class_obd.c | 6 ++ lustre/obdclass/sysctl.c | 15 +++-- lustre/osc/osc_request.c | 37 +++++++----- lustre/ost/ost_handler.c | 42 ++++++++++---- lustre/ptlrpc/client.c | 23 ++++---- lustre/ptlrpc/niobuf.c | 32 ----------- lustre/ptlrpc/recovd.c | 3 +- lustre/ptlrpc/rpc.c | 4 -- 15 files changed, 187 insertions(+), 130 deletions(-) diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 47f3fdc287..2b3ff7acc4 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -413,6 +413,42 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg) #define OBD_IOC_DEC_FS_USE_COUNT _IO ('f', 133 ) +/* + * l_wait_event is a flexible sleeping function, permitting simple caller + * configuration of interrupt and timeout sensitivity along with actions to + * be performed in the event of either exception. + * + * Common usage looks like this: + * + * struct l_wait_info lwi = LWI_TIMEOUT_INTR(timeout, timeout_handler, + * intr_handler, callback_data); + * rc = l_wait_event(waitq, condition, &lwi); + * + * (LWI_TIMEOUT and LWI_INTR macros are available for timeout- and + * interrupt-only variants, respectively.) + * + * If a timeout is specified, the timeout_handler will be invoked in the event + * that the timeout expires before the process is awakened. (Note that any + * waking of the process will restart the timeout, even if the condition is + * not satisfied and the process immediately returns to sleep. This might be + * considered a bug.) If the timeout_handler returns non-zero, l_wait_event + * will return -ETIMEDOUT and the caller will continue. If the handler returns + * zero instead, the process will go back to sleep until it is awakened by the + * waitq or some similar mechanism, or an interrupt occurs (if the caller has + * asked for interrupts to be detected). The timeout will only fire once, so + * callers should take care that a timeout_handler which returns zero will take + * future steps to awaken the process. N.B. that these steps must include making + * the provided condition become true. + * + * If the interrupt flag (lwi_signals) is non-zero, then the process will be + * interruptible, and will be awakened by any "killable" signal (SIGTERM, + * SIGKILL or SIGINT). If a timeout is also specified, then the process will + * only become interruptible _after_ the timeout has expired, though it can be + * awakened by a signal that was delivered before the timeout and is still + * pending when the timeout expires. If a timeout is not specified, the process + * will be interruptible at all times during l_wait_event. + */ + struct l_wait_info { long lwi_timeout; int (*lwi_on_timeout)(void *); @@ -428,18 +464,18 @@ struct l_wait_info { lwi_cb_data: data \ }) -#define LWI_INTR(signals, cb, data) \ +#define LWI_INTR(cb, data) \ ((struct l_wait_info) { \ - lwi_signals: signals, \ + lwi_signals: 1, \ lwi_on_signal: cb, \ lwi_cb_data: data \ }) -#define LWI_TIMEOUT_INTR(time, time_cb, signals, sig_cb, data) \ +#define LWI_TIMEOUT_INTR(time, time_cb, sig_cb, data) \ ((struct l_wait_info) { \ lwi_timeout: time, \ lwi_on_timeout: time_cb, \ - lwi_signals: signals, \ + lwi_signals: 1, \ lwi_on_signal: sig_cb, \ lwi_cb_data: data \ }) @@ -454,48 +490,44 @@ struct l_wait_info { do { \ wait_queue_t __wait; \ long __state; \ + int __timed_out = 0; \ init_waitqueue_entry(&__wait, current); \ \ add_wait_queue(&wq, &__wait); \ - __state = TASK_UNINTERRUPTIBLE; \ + if (info->lwi_signals && !info->lwi_timeout) \ + __state = TASK_INTERRUPTIBLE; \ + else \ + __state = TASK_UNINTERRUPTIBLE; \ for (;;) { \ set_current_state(__state); \ if (condition) \ break; \ - /* We only become INTERRUPTIBLE if a timeout has fired, and \ - * the caller has given us some signals to care about. \ - * \ - * XXXshaver we should check against info->wli_signals here, \ - * XXXshaver instead of just using l_killable_pending, perhaps. \ - */ \ - if (__state == TASK_INTERRUPTIBLE && \ - l_killable_pending(current)) { \ - CERROR("lwe: interrupt for %d\n", current->pid); \ - if (info->lwi_on_signal) \ - info->lwi_on_signal(info->lwi_cb_data); \ - ret = -EINTR; \ - break; \ + if (__state == TASK_INTERRUPTIBLE && l_killable_pending(current)) { \ + CERROR("lwe: interrupt\n"); \ + if (info->lwi_on_signal) \ + info->lwi_on_signal(info->lwi_cb_data); \ + ret = -EINTR; \ + break; \ } \ - if (info->lwi_timeout) { \ + if (info->lwi_timeout && !__timed_out) { \ if (schedule_timeout(info->lwi_timeout) == 0) { \ - CERROR("lwe: timeout for %d\n", current->pid); \ + CERROR("lwe: timeout\n"); \ + __timed_out = 1; \ if (!info->lwi_on_timeout || \ info->lwi_on_timeout(info->lwi_cb_data)) { \ ret = -ETIMEDOUT; \ break; \ } \ - /* We'll take signals only after a timeout. */ \ + /* We'll take signals after a timeout. */ \ if (info->lwi_signals) { \ __state = TASK_INTERRUPTIBLE; \ /* Check for a pending interrupt. */ \ - if (info->lwi_signals && \ - l_killable_pending(current)) { \ - CERROR("lwe: pending interrupt for %d\n", \ - current->pid); \ - if (info->lwi_on_signal) \ - info->lwi_on_signal(info->lwi_cb_data); \ - ret = -EINTR; \ - break; \ + if (info->lwi_signals && l_killable_pending(current)) { \ + CERROR("lwe: pending interrupt\n"); \ + if (info->lwi_on_signal) \ + info->lwi_on_signal(info->lwi_cb_data); \ + ret = -EINTR; \ + break; \ } \ } \ } \ diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 3177cdf107..45ec453291 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -88,8 +88,8 @@ struct mds_export_data { /* file data for open files on MDS */ struct mds_file_data { - struct list_head mfd_list; - struct file * mfd_file; + struct list_head mfd_list; + struct file *mfd_file; __u64 mfd_clientfd; __u32 mfd_clientcookie; }; diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 72f3a94d24..e9cd1187ff 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -30,6 +30,8 @@ /* global variables */ extern unsigned long obd_memory; extern unsigned long obd_fail_loc; +extern unsigned long obd_timeout; +extern char obd_recovery_upcall[128]; #define OBD_FAIL_MDS 0x100 #define OBD_FAIL_MDS_HANDLE_UNPACK 0x101 @@ -80,7 +82,7 @@ extern unsigned long obd_fail_loc; #define OBD_FAIL_OST_BRW_WRITE_BULK 0x20e #define OBD_FAIL_OST_BRW_READ_BULK 0x20f -#define OBB_FAIL_LDLM 0x300 +#define OBD_FAIL_LDLM 0x300 #define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301 #define OBD_FAIL_LDLM_ENQUEUE 0x302 #define OBD_FAIL_LDLM_CONVERT 0x303 @@ -88,6 +90,10 @@ extern unsigned long obd_fail_loc; #define OBD_FAIL_LDLM_BL_CALLBACK 0x305 #define OBD_FAIL_LDLM_CP_CALLBACK 0x306 +#define OBD_FAIL_OSC 0x400 +#define OBD_FAIL_OSC_BRW_READ_BULK 0x401 +#define OBD_FAIL_OSC_BRW_WRITE_BULK 0x402 + /* preparation for a more advanced failure testbed (not functional yet) */ #define OBD_FAIL_MASK_SYS 0x0000FF00 #define OBD_FAIL_MASK_LOC (0x000000FF | OBD_FAIL_MASK_SYS) diff --git a/lustre/lib/l_net.c b/lustre/lib/l_net.c index a1a687c48a..239f8c0441 100644 --- a/lustre/lib/l_net.c +++ b/lustre/lib/l_net.c @@ -98,9 +98,11 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) /* XXX get recovery hooked in here again */ //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,... - ptlrpc_init_client(NULL, NULL, rq_portal, rp_portal, mdc->cl_client); - ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - mdc->cl_ldlm_client); + ptlrpc_init_client(ptlrpc_connmgr, NULL, rq_portal, rp_portal, + mdc->cl_client); + /* XXXshaver Should the LDLM have its own recover function? Probably. */ + ptlrpc_init_client(ptlrpc_connmgr, NULL, LDLM_REQUEST_PORTAL, + LDLM_REPLY_PORTAL, mdc->cl_ldlm_client); mdc->cl_client->cli_name = "mdc"; mdc->cl_ldlm_client->cli_name = "ldlm"; mdc->cl_max_mdsize = sizeof(struct lov_stripe_md); @@ -142,8 +144,7 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd, ENTRY; down(&cli->cl_sem); MOD_INC_USE_COUNT; -#warning shaver: we might need a real cluuid here - rc = class_connect(conn, obd, NULL); + rc = class_connect(conn, obd, cluuid); if (rc) { MOD_DEC_USE_COUNT; GOTO(out_sem, rc); diff --git a/lustre/lib/page.c b/lustre/lib/page.c index ddcd5de653..ac7660e681 100644 --- a/lustre/lib/page.c +++ b/lustre/lib/page.c @@ -58,16 +58,20 @@ static int sync_io_timeout(void *data) ENTRY; desc->b_connection->c_level = LUSTRE_CONN_RECOVD; desc->b_flags |= PTL_RPC_FL_TIMEOUT; - if (desc->b_client && desc->b_client->cli_recovd) { + if (desc->b_client && desc->b_client->cli_recovd && + class_signal_client_failure) { /* XXXshaver Do we need a resend strategy, or do we just * XXXshaver return -ERESTARTSYS and punt it? */ CERROR("signalling failure of client %p\n", desc->b_client); class_signal_client_failure(desc->b_client); - } - /* We go back to sleep, until we're resumed or interrupted. */ - RETURN(0); + /* We go back to sleep, until we're resumed or interrupted. */ + RETURN(0); + } + + /* If we can't be recovered, just abort the syscall with -ETIMEDOUT. */ + RETURN(1); } static int sync_io_intr(void *data) @@ -86,11 +90,9 @@ int ll_sync_io_cb(struct io_cb_data *data, int err, int phase) ENTRY; if (phase == CB_PHASE_START) { -#warning shaver hardcoded timeout (/proc/sys/lustre/timeout) struct l_wait_info lwi; - lwi = LWI_TIMEOUT_INTR(100 * HZ, sync_io_timeout, - SIGTERM | SIGKILL | SIGINT, sync_io_intr, - data); + lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, sync_io_timeout, + sync_io_intr, data); ret = l_wait_event(data->waitq, data->complete, &lwi); if (atomic_dec_and_test(&data->refcount)) OBD_FREE(data, sizeof(*data)); diff --git a/lustre/llite/recover.c b/lustre/llite/recover.c index f2e4719a7c..e282daf29e 100644 --- a/lustre/llite/recover.c +++ b/lustre/llite/recover.c @@ -57,7 +57,6 @@ static int ll_reconnect(struct ll_sb_info *sbi) return err; } - int ll_recover(struct ptlrpc_client *cli) { struct ptlrpc_request *req; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 9a4c151537..dff14c13cb 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -45,6 +45,14 @@ inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) return &req->rq_export->exp_obd->u.mds; } +static int mds_bulk_timeout(void *data) +{ + struct ptlrpc_bulk_desc *desc = data; + + ENTRY; + CERROR("(not yet) starting recovery of client %p\n", desc->b_client); + RETURN(1); +} /* Assumes caller has already pushed into the kernel filesystem context */ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, @@ -54,6 +62,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, struct mds_obd *mds = mds_req2mds(req); struct ptlrpc_bulk_desc *desc; struct ptlrpc_bulk_page *bulk; + struct l_wait_info lwi; char *buf; ENTRY; @@ -90,9 +99,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, GOTO(cleanup_buf, rc); } - wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - if (desc->b_flags & PTL_RPC_FL_INTR) - GOTO(cleanup_buf, rc = -EINTR); + lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc); + rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi); + if (rc) { + if (rc != -ETIMEDOUT) + LBUG(); + GOTO(cleanup_buf, rc); + } EXIT; cleanup_buf: @@ -277,9 +290,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, RETURN(0); } } - -#warning shaver: we might need a real cluuid here - rc = class_connect(conn, obd, NULL); + rc = class_connect(conn, obd, cluuid); if (rc) GOTO(out_dec, rc); exp = class_conn2export(conn); diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index 9da3866d61..d1405118f4 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -46,7 +46,11 @@ struct semaphore obd_conf_sem; /* serialize configuration commands */ struct obd_device obd_dev[MAX_OBD_DEVICES]; struct list_head obd_types; unsigned long obd_memory = 0; + +/* The following are visible and mutable through /proc/sys/lustre/. */ unsigned long obd_fail_loc = 0; +unsigned long obd_timeout = 100; +char obd_recovery_upcall[128] = "/usr/lib/lustre/ha_assist"; extern struct obd_type *class_nm_to_type(char *nm); @@ -573,6 +577,8 @@ EXPORT_SYMBOL(obd_dev); EXPORT_SYMBOL(obdo_cachep); EXPORT_SYMBOL(obd_memory); EXPORT_SYMBOL(obd_fail_loc); +EXPORT_SYMBOL(obd_timeout); +EXPORT_SYMBOL(obd_recovery_upcall); EXPORT_SYMBOL(class_register_type); EXPORT_SYMBOL(class_unregister_type); diff --git a/lustre/obdclass/sysctl.c b/lustre/obdclass/sysctl.c index 57ae735344..8e74aab659 100644 --- a/lustre/obdclass/sysctl.c +++ b/lustre/obdclass/sysctl.c @@ -54,11 +54,12 @@ static int obd_sctl_reset( ctl_table * table, int write, struct file #define OBD_FAIL_LOC 1 /* control test failures instrumentation */ #define OBD_ENTRY 2 /* control enter/leave pattern */ -#define OBD_TIMEOUT 3 /* timeout on upcalls to become intrble */ -#define OBD_HARD 4 /* mount type "hard" or "soft" */ -#define OBD_VARS 5 -#define OBD_INDEX 6 -#define OBD_RESET 7 +#define OBD_VARS 3 +#define OBD_INDEX 4 +#define OBD_RESET 5 +#define OBD_TIMEOUT 6 /* RPC timeout before recovery/intr */ +/* XXX move to /proc/sys/lustre/recovery? */ +#define OBD_UPCALL 7 /* path to recovery upcall */ #define OBD_VARS_SLOT 2 @@ -67,6 +68,10 @@ static ctl_table obd_table[] = { {OBD_VARS, "vars", &vars[0], sizeof(int), 0644, NULL, &proc_dointvec}, {OBD_INDEX, "index", &index, sizeof(int), 0644, NULL, &obd_sctl_vars}, {OBD_RESET, "reset", NULL, 0, 0644, NULL, &obd_sctl_reset}, + {OBD_TIMEOUT, "timeout", &obd_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, + /* XXX need to lock so we avoid update races with the recovery upcall! */ + {OBD_UPCALL, "recovery_upcall", obd_recovery_upcall, 128, 0644, NULL, + &proc_dostring, &sysctl_string }, { 0 } }; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9bb2c0416e..152b1039c3 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -26,6 +26,7 @@ #include <linux/obd_lov.h> #include <linux/init.h> #include <linux/lustre_ha.h> +#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *md) @@ -407,25 +408,30 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md, * * On error, we never do the brw_finish, so we handle all decrefs. */ - rc = ptlrpc_register_bulk(desc); - if (rc) - GOTO(out_unmap, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) { + CERROR("obd_fail_loc=%x, skipping register_bulk\n", + OBD_FAIL_OSC_BRW_READ_BULK); + } else { + rc = ptlrpc_register_bulk(desc); + if (rc) + GOTO(out_unmap, rc); + } request->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(request); rc = ptlrpc_check_status(request, rc); - /* XXX: Mike, this is the only place I'm not sure of. If we have - * an error here, will we have always called brw_finish? If no, - * then out_req will not clean up and we should go to out_desc. - * If maybe, then we are screwed, and we need to set things up - * so that bulk_sink_callback is called for each bulk page, - * even on error so brw_finish is always called. It would need - * to be passed an error code as a parameter to know what to do. - * - * That would also help with the partial completion case, so - * we could say in brw_finish "these pages are done, don't - * restart them" and osc_brw callers can know this. + /* + * XXX: If there is an error during the processing of the callback, + * such as a timeout in a sleep that it performs, brw_finish + * will never get called, and we'll leak the desc, fail to kunmap + * things, cats will live with dogs. One solution would be to + * export brw_finish as osc_brw_finish, so that the timeout case and + * its kin could call it for proper cleanup. An alternative would + * be for an error return from the callback to cause us to clean up, + * but that doesn't help the truly async cases (like LOV), which + * will immediately return from their PHASE_START callback, before + * any such cleanup-requiring error condition can be detected. */ if (rc) GOTO(out_req, rc); @@ -541,6 +547,9 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md, if (desc->b_page_count != page_count) LBUG(); + if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) + GOTO(out_unmap, rc = 0); + /* Our reference is released when brw_finish is complete. */ rc = ptlrpc_send_bulk(desc); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index ccbf640e36..9792e6d6ed 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -196,6 +196,15 @@ static int ost_setattr(struct ptlrpc_request *req) RETURN(0); } +static int ost_bulk_timeout(void *data) +{ + struct ptlrpc_bulk_desc *desc = data; + + ENTRY; + CERROR("(not yet) starting recovery of client %p\n", desc->b_client); + RETURN(1); +} + static int ost_brw_read(struct ptlrpc_request *req) { struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg; @@ -205,6 +214,7 @@ static int ost_brw_read(struct ptlrpc_request *req) struct niobuf_local *local_nb = NULL; struct obd_ioobj *ioo; struct ost_body *body; + struct l_wait_info lwi; int rc, cmd, i, j, objcount, niocount, size = sizeof(*body); ENTRY; @@ -216,6 +226,9 @@ static int ost_brw_read(struct ptlrpc_request *req) niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); cmd = OBD_BRW_READ; + if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) + GOTO(out, rc = 0); + for (i = 0; i < objcount; i++) { ost_unpack_ioo(&tmp1, &ioo); if (tmp2 + ioo->ioo_bufcnt > end2) { @@ -226,12 +239,9 @@ static int ost_brw_read(struct ptlrpc_request *req) ost_unpack_niobuf(&tmp2, &remote_nb); } - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc) - RETURN(rc); OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount); if (local_nb == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); /* The unpackers move tmp1 and tmp2, so reset them before using */ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); @@ -240,7 +250,7 @@ static int ost_brw_read(struct ptlrpc_request *req) tmp1, niocount, tmp2, local_nb, NULL); if (req->rq_status) - GOTO(out_local, 0); + GOTO(out, 0); desc = ptlrpc_prep_bulk(req->rq_connection); if (desc == NULL) @@ -262,10 +272,12 @@ static int ost_brw_read(struct ptlrpc_request *req) if (rc) GOTO(out_bulk, rc); -#warning OST must time out here. - wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - if (desc->b_flags & PTL_RPC_FL_INTR) - rc = -EINTR; + lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc); + rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi); + if (rc) { + LASSERT(rc == -ETIMEDOUT); + GOTO(out_bulk, rc); + } /* The unpackers move tmp1 and tmp2, so reset them before using */ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); @@ -273,6 +285,8 @@ static int ost_brw_read(struct ptlrpc_request *req) req->rq_status = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb, NULL); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + out_bulk: ptlrpc_free_bulk(desc); out_local: @@ -298,6 +312,7 @@ static int ost_brw_write(struct ptlrpc_request *req) void *desc_priv = NULL; int reply_sent = 0; struct ptlrpc_service *srv; + struct l_wait_info lwi; __u32 xid; ENTRY; @@ -381,8 +396,13 @@ static int ost_brw_write(struct ptlrpc_request *req) reply_sent = 1; ptlrpc_reply(req->rq_svc, req); -#warning OST must time out here. - wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD); + lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc); + rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_RCVD, &lwi); + if (rc) { + if (rc != -ETIMEDOUT) + LBUG(); + GOTO(fail_bulk, rc); + } rc = obd_commitrw(cmd, conn, objcount, tmp1, niocount, local_nb, desc->b_desc_private); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index f276238985..c775a5ec17 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -33,7 +33,6 @@ void ptlrpc_init_client(struct recovd_obd *recovd, int rep_portal, struct ptlrpc_client *cl) { memset(cl, 0, sizeof(*cl)); - cl->cli_recovd = recovd; cl->cli_recover = recover; if (recovd) recovd_cli_manage(recovd, cl); @@ -486,21 +485,23 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) list_add_tail(&req->rq_list, &cli->cli_delayed_head); spin_unlock(&cli->cli_lock); -#warning shaver: what happens when we get interrupted during this wait? - lwi = LWI_INTR(SIGTERM | SIGKILL | SIGINT, NULL, NULL); - l_wait_event(req->rq_wait_for_rep, - req->rq_level <= req->rq_connection->c_level, - &lwi); + lwi = LWI_INTR(NULL, NULL); + rc = l_wait_event(req->rq_wait_for_rep, + req->rq_level <= req->rq_connection->c_level, + &lwi); spin_lock(&cli->cli_lock); list_del_init(&req->rq_list); spin_unlock(&cli->cli_lock); + + if (rc) + RETURN(rc); CERROR("process %d resumed\n", current->pid); } resend: req->rq_time = CURRENT_TIME; - req->rq_timeout = 100; + req->rq_timeout = obd_timeout; rc = ptl_send_rpc(req); if (rc) { CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc); @@ -518,8 +519,7 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req) CDEBUG(D_OTHER, "-- sleeping\n"); lwi = LWI_TIMEOUT_INTR(req->rq_timeout * HZ, expired_request, - SIGKILL | SIGTERM | SIGINT, interrupted_request, - req); + interrupted_request,req); l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi); CDEBUG(D_OTHER, "-- done\n"); @@ -570,7 +570,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) { int rc = 0; struct ptlrpc_client *cli = req->rq_client; - struct l_wait_info lwi = LWI_INTR(SIGKILL|SIGTERM|SIGINT, NULL, NULL); + struct l_wait_info lwi; ENTRY; init_waitqueue_head(&req->rq_wait_for_rep); @@ -579,7 +579,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) req->rq_connection->c_level); req->rq_time = CURRENT_TIME; - req->rq_timeout = 100; + req->rq_timeout = obd_timeout; rc = ptl_send_rpc(req); if (rc) { CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc); @@ -589,6 +589,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) } CDEBUG(D_OTHER, "-- sleeping\n"); + lwi = LWI_INTR(NULL, NULL); l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi); CDEBUG(D_OTHER, "-- done\n"); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 0f7c955b70..3933160285 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -30,38 +30,6 @@ extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq, bulk_sink_eq; static ptl_process_id_t local_id = {PTL_NID_ANY, PTL_PID_ANY}; -int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *desc) -{ - ENTRY; - - if (desc->b_flags & PTL_BULK_FL_SENT) - RETURN(1); - - if (l_killable_pending(current)) { - desc->b_flags |= PTL_RPC_FL_INTR; - RETURN(1); - } - - CDEBUG(D_NET, "no event yet\n"); - RETURN(0); -} - -int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *desc) -{ - ENTRY; - - if (desc->b_flags & PTL_BULK_FL_RCVD) - RETURN(1); - - if (l_killable_pending(current)) { - desc->b_flags |= PTL_RPC_FL_INTR; - RETURN(1); - } - - CDEBUG(D_NET, "no event yet\n"); - RETURN(0); -} - static int ptl_send_buf(struct ptlrpc_request *request, struct ptlrpc_connection *conn, int portal) { diff --git a/lustre/ptlrpc/recovd.c b/lustre/ptlrpc/recovd.c index 1c8037c13d..194e2b44f7 100644 --- a/lustre/ptlrpc/recovd.c +++ b/lustre/ptlrpc/recovd.c @@ -19,6 +19,7 @@ #include <linux/kmod.h> #include <linux/lustre_lite.h> #include <linux/lustre_ha.h> +#include <linux/obd_support.h> struct recovd_obd *ptlrpc_connmgr; @@ -60,7 +61,7 @@ static int recovd_upcall(void) char *argv[2]; char *envp[3]; - argv[0] = "/usr/src/obd/utils/ha_assist.sh"; + argv[0] = obd_recovery_upcall; argv[1] = NULL; envp [0] = "HOME=/"; diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index e0c9414a64..2459760b6d 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -146,10 +146,6 @@ static void __exit ptlrpc_exit(void) ptlrpc_cleanup_connection(); } -/* events.c */ -EXPORT_SYMBOL(ptlrpc_check_bulk_sent); -EXPORT_SYMBOL(ptlrpc_check_bulk_received); - /* connmgr.c */ EXPORT_SYMBOL(ptlrpc_connmgr); EXPORT_SYMBOL(connmgr_connect); -- GitLab