diff --git a/lustre/include/linux/lustre_fsfilt.h b/lustre/include/linux/lustre_fsfilt.h index 978fbe4a4974b0fb9bff4e64dbd11d7062a3ca61..d6953fba719648451674ec9845f78f3750e8b216 100644 --- a/lustre/include/linux/lustre_fsfilt.h +++ b/lustre/include/linux/lustre_fsfilt.h @@ -215,7 +215,7 @@ fsfilt_start_ops(struct fsfilt_operations *ops, struct inode *inode, unsigned long now = jiffies; void *parent_handle = oti ? oti->oti_handle : NULL; void *handle = ops->fs_start(inode, op, parent_handle, logs); - CDEBUG(D_HA, "started handle %p (%p)\n", handle, parent_handle); + CDEBUG(D_INFO, "started handle %p (%p)\n", handle, parent_handle); if (oti != NULL) { if (parent_handle == NULL) { @@ -258,7 +258,7 @@ fsfilt_commit_ops(struct fsfilt_operations *ops, struct super_block *sb, { unsigned long now = jiffies; int rc = ops->fs_commit(sb, inode, handle, force_sync); - CDEBUG(D_HA, "committing handle %p\n", handle); + CDEBUG(D_INFO, "committing handle %p\n", handle); if (time_after(jiffies, now + 15 * HZ)) CERROR("long journal start time %lus\n", (jiffies - now) / HZ); @@ -291,7 +291,7 @@ fsfilt_brw_start_log(struct obd_device *obd, int objcount, void *parent_handle = oti ? oti->oti_handle : NULL; void *handle = obd->obd_fsops->fs_brw_start(objcount, fso, niocount, nb, parent_handle, logs); - CDEBUG(D_HA, "started handle %p (%p)\n", handle, parent_handle); + CDEBUG(D_INFO, "started handle %p (%p)\n", handle, parent_handle); if (oti != NULL) { if (parent_handle == NULL) { @@ -323,7 +323,7 @@ fsfilt_commit_async(struct obd_device *obd, struct inode *inode, unsigned long now = jiffies; int rc = obd->obd_fsops->fs_commit_async(inode, handle, wait_handle); - CDEBUG(D_HA, "committing handle %p (async)\n", *wait_handle); + CDEBUG(D_INFO, "committing handle %p (async)\n", *wait_handle); if (time_after(jiffies, now + 15 * HZ)) CERROR("long journal start time %lus\n", (jiffies - now) / HZ); @@ -335,7 +335,7 @@ fsfilt_commit_wait(struct obd_device *obd, struct inode *inode, void *handle) { unsigned long now = jiffies; int rc = obd->obd_fsops->fs_commit_wait(inode, handle); - CDEBUG(D_HA, "waiting for completion %p\n", handle); + CDEBUG(D_INFO, "waiting for completion %p\n", handle); if (time_after(jiffies, now + 15 * HZ)) CERROR("long journal start time %lus\n", (jiffies - now) / HZ); return rc; diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index b57146d228ed653a06f01d8fcdf2cb2af7ad3de5..070f51af20b7a318cf9de8d234dcbf4c5993bf2e 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -75,7 +75,7 @@ struct obd_export; #include <linux/lustre_compat25.h> #include <linux/lvfs.h> -int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler); +int target_handle_connect(struct ptlrpc_request *req); int target_handle_disconnect(struct ptlrpc_request *req); void target_destroy_export(struct obd_export *exp); int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, @@ -84,8 +84,10 @@ int target_handle_ping(struct ptlrpc_request *req); void target_cancel_recovery_timer(struct obd_device *obd); #define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */ -void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler); -void target_abort_recovery(void *data); +void target_start_recovery_timer(struct obd_device *obd); +int target_start_recovery_thread(struct obd_device *obd, + svc_handler_t handler); +void target_stop_recovery_thread(struct obd_device *obd); int target_queue_recovery_request(struct ptlrpc_request *req, struct obd_device *obd); int target_queue_final_reply(struct ptlrpc_request *req, int rc); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index e74997322eb5b8e1beda5b62885b683cbb644c67..876f75dd8188577e9456f259f7fd12564244772e 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -531,6 +531,13 @@ struct obd_llogs { struct llog_ctxt *llog_ctxt[LLOG_MAX_CTXTS]; }; +struct target_recovery_data { + svc_handler_t trd_recovery_handler; + pid_t trd_processing_task; + struct completion trd_starting; + struct completion trd_finishing; +}; + /* corresponds to one of the obd's */ struct obd_device { struct obd_type *obd_type; @@ -563,13 +570,12 @@ struct obd_device { struct obd_device *obd_observer; struct obd_export *obd_self_export; - /* XXX encapsulate all this recovery data into one struct */ - svc_handler_t obd_recovery_handler; + struct target_recovery_data obd_recovery_data; + /* XXX encapsulate all this recovery data into target_recovery_data */ int obd_max_recoverable_clients; int obd_connected_clients; int obd_recoverable_clients; spinlock_t obd_processing_task_lock; - pid_t obd_processing_task; __u64 obd_next_recovery_transno; int obd_replayed_requests; int obd_requests_queued_for_recovery; diff --git a/lustre/include/linux/obd_ost.h b/lustre/include/linux/obd_ost.h index 215e8b1d9dea41e1e0cc0fedfc6cbde1114bc751..edb700902023ed37e969605f6f448d140ce79603 100644 --- a/lustre/include/linux/obd_ost.h +++ b/lustre/include/linux/obd_ost.h @@ -48,4 +48,7 @@ struct osc_getattr_async_args { struct obdo *aa_oa; }; +extern int ost_brw_write(struct ptlrpc_request *, struct obd_trans_info *); +extern int ost_handle(struct ptlrpc_request *req); + #endif diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 1183afe3147a04bff1c0e0d9440ab181bd2447a4..482124d3dc0a350bd64bfce639d743a46a96ed6d 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -361,7 +361,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, RETURN(0); } -int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) +int target_handle_connect(struct ptlrpc_request *req) { struct obd_device *target; struct obd_export *export = NULL; @@ -372,7 +372,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) struct obd_uuid remote_uuid; struct list_head *p; char *str, *tmp; - int rc = 0, abort_recovery; + int rc = 0; unsigned long flags; int initial_conn = 0; char peer_str[PTL_NALFMT_SIZE]; @@ -423,12 +423,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) LBUG(); } - spin_lock_bh(&target->obd_processing_task_lock); - abort_recovery = target->obd_abort_recovery; - spin_unlock_bh(&target->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(target); - tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn); if (tmp == NULL) GOTO(out, rc = -EPROTO); @@ -490,7 +484,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) target->obd_recovering ? "(recovering)" : ""); if (target->obd_recovering) { lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING); - target_start_recovery_timer(target, handler); + target_start_recovery_timer(target); } #if 0 /* Tell the client if we support replayable requests */ @@ -621,6 +615,40 @@ void target_destroy_export(struct obd_export *exp) * Recovery functions */ +struct ptlrpc_request * +ptlrpc_clone_req( struct ptlrpc_request *orig_req) +{ + struct ptlrpc_request *copy_req; + struct lustre_msg *copy_reqmsg; + + OBD_ALLOC(copy_req, sizeof *copy_req); + if (!copy_req) + return NULL; + OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen); + if (!copy_reqmsg){ + OBD_FREE(copy_req, sizeof *copy_req); + return NULL; + } + + memcpy(copy_req, orig_req, sizeof *copy_req); + memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen); + /* the copied req takes over the reply state */ + orig_req->rq_reply_state = NULL; + + copy_req->rq_reqmsg = copy_reqmsg; + class_export_get(copy_req->rq_export); + INIT_LIST_HEAD(©_req->rq_list); + + return copy_req; +} +void ptlrpc_free_clone( struct ptlrpc_request *req) +{ + class_export_put(req->rq_export); + list_del(&req->rq_list); + OBD_FREE(req->rq_reqmsg, req->rq_reqlen); + OBD_FREE(req, sizeof *req); +} + static void abort_delayed_replies(struct obd_device *obd) { struct ptlrpc_request *req; @@ -663,24 +691,13 @@ static void abort_recovery_queue(struct obd_device *obd) } } -void target_abort_recovery(void *data) +static void target_abort_recovery(void *data) { struct obd_device *obd = data; int rc; CERROR("disconnecting clients and aborting recovery\n"); - spin_lock_bh(&obd->obd_processing_task_lock); - if (!obd->obd_recovering) { - spin_unlock_bh(&obd->obd_processing_task_lock); - EXIT; - return; - } - - obd->obd_recovering = obd->obd_abort_recovery = 0; - - wake_up(&obd->obd_next_transno_waitq); - target_cancel_recovery_timer(obd); - spin_unlock_bh(&obd->obd_processing_task_lock); + LASSERT(!obd->obd_recovering); class_disconnect_exports(obd, 0); @@ -688,7 +705,8 @@ void target_abort_recovery(void *data) if (OBT(obd) && OBP(obd, postrecov)) { rc = OBP(obd, postrecov)(obd); if (rc >= 0) - CWARN("Cleanup %d orphans after recovery was aborted\n", rc); + CWARN("Cleanup %d orphans after recovery was aborted\n", + rc); else CERROR("postrecov failed %d\n", rc); } @@ -731,21 +749,50 @@ static void reset_recovery_timer(struct obd_device *obd) /* Only start it the first time called */ -void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) +void target_start_recovery_timer(struct obd_device *obd) { spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovery_handler) { + if (!obd->obd_recovering || timer_pending(&obd->obd_recovery_timer)) { spin_unlock_bh(&obd->obd_processing_task_lock); return; } CWARN("%s: starting recovery timer (%us)\n", obd->obd_name, OBD_RECOVERY_TIMEOUT / HZ); - obd->obd_recovery_handler = handler; obd->obd_recovery_timer.function = target_recovery_expired; obd->obd_recovery_timer.data = (unsigned long)obd; + mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT); spin_unlock_bh(&obd->obd_processing_task_lock); +} + +static void target_finish_recovery(struct obd_device *obd) +{ + struct list_head *tmp, *n; + int rc2; + + ldlm_reprocess_all_ns(obd->obd_namespace); + + CWARN("%s: all clients recovered, calling postrecov\n", + obd->obd_name); + /* when recovery finished, cleanup orphans on mds and ost */ + if (OBT(obd) && OBP(obd, postrecov)) { + rc2 = OBP(obd, postrecov)(obd); + if (rc2 >= 0) + CWARN("%s: all clients recovered, %d MDS " + "orphans deleted\n", obd->obd_name, rc2); + else + CERROR("postrecov failed %d\n", rc2); + } - reset_recovery_timer(obd); + CWARN("%s: recovery over, sending delayed replies\n", + obd->obd_name); + list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { + struct ptlrpc_request *req; + req = list_entry(tmp, struct ptlrpc_request, rq_list); + DEBUG_REQ(D_ERROR, req, "delayed:"); + ptlrpc_reply(req); + ptlrpc_free_clone(req); + } + ptlrpc_run_recovery_over_upcall(obd); } static int check_for_next_transno(struct obd_device *obd) @@ -755,10 +802,15 @@ static int check_for_next_transno(struct obd_device *obd) __u64 next_transno, req_transno; spin_lock_bh(&obd->obd_processing_task_lock); - req = list_entry(obd->obd_recovery_queue.next, - struct ptlrpc_request, rq_list); + if (!list_empty(&obd->obd_recovery_queue)) { + req = list_entry(obd->obd_recovery_queue.next, + struct ptlrpc_request, rq_list); + req_transno = req->rq_reqmsg->transno; + } else { + req_transno = 0; + } + max = obd->obd_max_recoverable_clients; - req_transno = req->rq_reqmsg->transno; connected = obd->obd_connected_clients; completed = max - obd->obd_recoverable_clients; queue_len = obd->obd_requests_queued_for_recovery; @@ -770,13 +822,14 @@ static int check_for_next_transno(struct obd_device *obd) if (obd->obd_abort_recovery) { CDEBUG(D_HA, "waking for aborted recovery\n"); wake_up = 1; - } else if (!obd->obd_recovering) { - CDEBUG(D_HA, "waking for completed recovery (?)\n"); + } else if (max == completed) { + CDEBUG(D_HA, "waking for completed recovery\n"); wake_up = 1; } else if (req_transno == next_transno) { CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); wake_up = 1; } else if (queue_len + completed == max) { + LASSERT(req->rq_reqmsg->transno >= next_transno); CDEBUG(D_ERROR, "waking for skipped transno (skip: "LPD64 ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n", @@ -785,63 +838,127 @@ static int check_for_next_transno(struct obd_device *obd) wake_up = 1; } spin_unlock_bh(&obd->obd_processing_task_lock); - LASSERT(req->rq_reqmsg->transno >= next_transno); + return wake_up; } -static void process_recovery_queue(struct obd_device *obd) +static struct ptlrpc_request * +target_next_replay_req(struct obd_device *obd) { - struct ptlrpc_request *req; - int abort_recovery = 0; struct l_wait_info lwi = { 0 }; - ENTRY; + struct ptlrpc_request *req; - for (;;) { - spin_lock_bh(&obd->obd_processing_task_lock); - LASSERT(obd->obd_processing_task == current->pid); + CDEBUG(D_HA, "Waiting for transno "LPD64"\n", + obd->obd_next_recovery_transno); + l_wait_event(obd->obd_next_transno_waitq, + check_for_next_transno(obd), &lwi); + + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_abort_recovery) { + req = NULL; + } else if (!list_empty(&obd->obd_recovery_queue)) { req = list_entry(obd->obd_recovery_queue.next, struct ptlrpc_request, rq_list); + list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; + } else { + req = NULL; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + return req; +} - if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) { - spin_unlock_bh(&obd->obd_processing_task_lock); - CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is " - LPD64")\n", - obd->obd_next_recovery_transno, - req->rq_reqmsg->transno); - l_wait_event(obd->obd_next_transno_waitq, - check_for_next_transno(obd), &lwi); + +static int target_recovery_thread(void *arg) +{ + struct obd_device *obd = arg; + struct ptlrpc_request *req; + struct target_recovery_data *trd = &obd->obd_recovery_data; + unsigned long flags; + ENTRY; + + kportal_daemonize("tgt-recov"); + + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + + CERROR("%s: started recovery thread pid %d\n", obd->obd_name, + current->pid); + trd->trd_processing_task = current->pid; + + obd->obd_recovering = 1; + complete(&trd->trd_starting); + + while (obd->obd_recovering) { + LASSERT(trd->trd_processing_task == current->pid); + req = target_next_replay_req(obd); + if (req != NULL) { + DEBUG_REQ(D_HA, req, "processing t"LPD64" : ", + req->rq_reqmsg->transno); + (void)trd->trd_recovery_handler(req); + obd->obd_replayed_requests++; + reset_recovery_timer(obd); + /* bug 1580: decide how to properly sync() in recovery*/ + //mds_fsync_super(mds->mds_sb); + ptlrpc_free_clone(req); spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; + obd->obd_next_recovery_transno++; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - return; + } else { + /* recovery is over */ + spin_lock_bh(&obd->obd_processing_task_lock); + obd->obd_recovering = 0; + target_cancel_recovery_timer(obd); + if (obd->obd_abort_recovery) { + obd->obd_abort_recovery = 0; + spin_unlock_bh(&obd->obd_processing_task_lock); + target_abort_recovery(obd); + } else { + LASSERT(obd->obd_recoverable_clients == 0); + spin_unlock_bh(&obd->obd_processing_task_lock); + target_finish_recovery(obd); } - continue; } - list_del_init(&req->rq_list); - obd->obd_requests_queued_for_recovery--; - spin_unlock_bh(&obd->obd_processing_task_lock); + } - DEBUG_REQ(D_HA, req, "processing: "); - (void)obd->obd_recovery_handler(req); - obd->obd_replayed_requests++; - reset_recovery_timer(obd); - /* bug 1580: decide how to properly sync() in recovery */ - //mds_fsync_super(mds->mds_sb); - class_export_put(req->rq_export); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); - spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_next_recovery_transno++; - if (list_empty(&obd->obd_recovery_queue)) { - obd->obd_processing_task = 0; - spin_unlock_bh(&obd->obd_processing_task_lock); - break; - } + trd->trd_processing_task = 0; + complete(&trd->trd_finishing); + return 0; +} + +int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler) +{ + int rc = 0; + struct target_recovery_data *trd = &obd->obd_recovery_data; + + memset(trd, 0, sizeof(*trd)); + init_completion(&trd->trd_starting); + init_completion(&trd->trd_finishing); + trd->trd_recovery_handler = handler; + + if (kernel_thread(target_recovery_thread, obd, 0) == 0) + wait_for_completion(&trd->trd_starting); + else + rc = -ECHILD; + + return rc; +} + +void target_stop_recovery_thread(struct obd_device *obd) +{ + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_recovery_data.trd_processing_task > 0) { + struct target_recovery_data *trd = &obd->obd_recovery_data; + CERROR("%s: aborting recovery\n", obd->obd_name); + obd->obd_abort_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); + spin_unlock_bh(&obd->obd_processing_task_lock); + wait_for_completion(&trd->trd_finishing); + } else { spin_unlock_bh(&obd->obd_processing_task_lock); } - EXIT; } int target_queue_recovery_request(struct ptlrpc_request *req, @@ -850,8 +967,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, struct list_head *tmp; int inserted = 0; __u64 transno = req->rq_reqmsg->transno; - struct ptlrpc_request *saved_req; - struct lustre_msg *reqmsg; /* CAVEAT EMPTOR: The incoming request message has been swabbed * (i.e. buflens etc are in my own byte order), but type-dependent @@ -863,16 +978,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, return 1; } - /* XXX If I were a real man, these LBUGs would be sane cleanups. */ - /* XXX just like the request-dup code in queue_final_reply */ - OBD_ALLOC(saved_req, sizeof *saved_req); - if (!saved_req) - LBUG(); - OBD_ALLOC(reqmsg, req->rq_reqlen); - if (!reqmsg) - LBUG(); - - spin_lock_bh(&obd->obd_processing_task_lock); /* If we're processing the queue, we want don't want to queue this * message. @@ -884,34 +989,30 @@ int target_queue_recovery_request(struct ptlrpc_request *req, * Also, a resent, replayed request that has already been * handled will pass through here and be processed immediately. */ - if (obd->obd_processing_task == current->pid || + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_recovery_data.trd_processing_task == current->pid || transno < obd->obd_next_recovery_transno) { /* Processing the queue right now, don't re-add. */ lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); LASSERT(list_empty(&req->rq_list)); spin_unlock_bh(&obd->obd_processing_task_lock); - OBD_FREE(reqmsg, req->rq_reqlen); - OBD_FREE(saved_req, sizeof *saved_req); return 1; } + spin_unlock_bh(&obd->obd_processing_task_lock); /* A resent, replayed request that is still on the queue; just drop it. The queued request will handle this. */ - if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) == - (MSG_RESENT | MSG_REPLAY)) { + if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) + == (MSG_RESENT | MSG_REPLAY)) { DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); - spin_unlock_bh(&obd->obd_processing_task_lock); - OBD_FREE(reqmsg, req->rq_reqlen); - OBD_FREE(saved_req, sizeof *saved_req); return 0; } - memcpy(saved_req, req, sizeof *req); - memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - req = saved_req; - req->rq_reqmsg = reqmsg; - class_export_get(req->rq_export); - INIT_LIST_HEAD(&req->rq_list); + req = ptlrpc_clone_req(req); + if (req == NULL) + return -ENOMEM; + + spin_lock_bh(&obd->obd_processing_task_lock); /* XXX O(n^2) */ list_for_each(tmp, &obd->obd_recovery_queue) { @@ -930,23 +1031,9 @@ int target_queue_recovery_request(struct ptlrpc_request *req, } obd->obd_requests_queued_for_recovery++; - - if (obd->obd_processing_task != 0) { - /* Someone else is processing this queue, we'll leave it to - * them. - */ - wake_up(&obd->obd_next_transno_waitq); - spin_unlock_bh(&obd->obd_processing_task_lock); - return 0; - } - - /* Nobody is processing, and we know there's (at least) one to process - * now, so we'll do the honours. - */ - obd->obd_processing_task = current->pid; + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); - process_recovery_queue(obd); return 0; } @@ -958,10 +1045,6 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req) int target_queue_final_reply(struct ptlrpc_request *req, int rc) { struct obd_device *obd = target_req2obd(req); - struct ptlrpc_request *saved_req; - struct lustre_msg *reqmsg; - int recovery_done = 0; - int rc2; LASSERT ((rc == 0) == (req->rq_reply_state != NULL)); @@ -974,68 +1057,23 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) LASSERT (!req->rq_reply_state->rs_difficult); LASSERT(list_empty(&req->rq_list)); - /* XXX a bit like the request-dup code in queue_recovery_request */ - OBD_ALLOC(saved_req, sizeof *saved_req); - if (!saved_req) - LBUG(); - OBD_ALLOC(reqmsg, req->rq_reqlen); - if (!reqmsg) - LBUG(); - memcpy(saved_req, req, sizeof *saved_req); - memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - /* the copied req takes over the reply state */ - req->rq_reply_state = NULL; - req = saved_req; - req->rq_reqmsg = reqmsg; - class_export_get(req->rq_export); - list_add(&req->rq_list, &obd->obd_delayed_reply_queue); + + req = ptlrpc_clone_req(req); spin_lock_bh(&obd->obd_processing_task_lock); + + list_add(&req->rq_list, &obd->obd_delayed_reply_queue); + /* only count the first "replay over" request from each export */ if (req->rq_export->exp_replay_needed) { --obd->obd_recoverable_clients; req->rq_export->exp_replay_needed = 0; - } - recovery_done = (obd->obd_recoverable_clients == 0); - spin_unlock_bh(&obd->obd_processing_task_lock); - - if (recovery_done) { - struct list_head *tmp, *n; - ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace); - CWARN("%s: all clients recovered, sending delayed replies\n", - obd->obd_name); - spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_recovering = 0; - target_cancel_recovery_timer(obd); - spin_unlock_bh(&obd->obd_processing_task_lock); - - /* when recovery finished, cleanup orphans on mds and ost */ - if (OBT(obd) && OBP(obd, postrecov)) { - rc2 = OBP(obd, postrecov)(obd); - if (rc2 >= 0) - CWARN("%s: all clients recovered, %d MDS " - "orphans deleted\n", obd->obd_name, rc2); - else - CERROR("postrecov failed %d\n", rc2); - } - - list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { - req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "delayed:"); - ptlrpc_reply(req); - class_export_put(req->rq_export); - list_del(&req->rq_list); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); - } - ptlrpc_run_recovery_over_upcall(obd); - } else { CWARN("%s: %d recoverable clients remain\n", - obd->obd_name, obd->obd_recoverable_clients); - wake_up(&obd->obd_next_transno_waitq); + obd->obd_name, obd->obd_recoverable_clients); } - + wake_up(&obd->obd_next_transno_waitq); + spin_unlock_bh(&obd->obd_processing_task_lock); return 1; } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 93fad5ea68858af221434af73f814889548f87ec..8bb18d20a6390a6810a1c08063c93d138d628a59 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1510,7 +1510,8 @@ EXPORT_SYMBOL(client_obd_setup); EXPORT_SYMBOL(client_obd_cleanup); EXPORT_SYMBOL(client_connect_import); EXPORT_SYMBOL(client_disconnect_export); -EXPORT_SYMBOL(target_abort_recovery); +EXPORT_SYMBOL(target_start_recovery_thread); +EXPORT_SYMBOL(target_stop_recovery_thread); EXPORT_SYMBOL(target_handle_connect); EXPORT_SYMBOL(target_destroy_export); EXPORT_SYMBOL(target_cancel_recovery_timer); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 8e80be468709394f34a10b92e6d36a1433c9f7ac..4dfa0b8d65850dae9603e3b13ecef633e6b16ad4 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -49,6 +49,7 @@ # include <linux/locks.h> #endif #include <linux/obd_lov.h> +#include <linux/obd_ost.h> #include <linux/lustre_mds.h> #include <linux/lustre_fsfilt.h> #include <linux/lustre_snap.h> @@ -344,19 +345,12 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, struct obd_export *exp; struct mds_export_data *med; /* */ struct mds_client_data *mcd; - int rc, abort_recovery; + int rc; ENTRY; if (!conn || !obd || !cluuid) RETURN(-EINVAL); - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(obd); - /* XXX There is a small race between checking the list and adding a * new connection for the same UUID, but the real threat (list * corruption when multiple different clients connect) is solved. @@ -1618,7 +1612,6 @@ static int mdt_set_info(struct ptlrpc_request *req) RETURN(-EINVAL); } -extern int ost_brw_write(struct ptlrpc_request *, struct obd_trans_info *); int mds_handle(struct ptlrpc_request *req) { int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; @@ -1633,7 +1626,7 @@ int mds_handle(struct ptlrpc_request *req) /* XXX identical to OST */ if (req->rq_reqmsg->opc != MDS_CONNECT) { struct mds_export_data *med; - int recovering, abort_recovery; + int recovering; if (req->rq_export == NULL) { CERROR("lustre_mds: operation %d on unconnected MDS\n", @@ -1661,18 +1654,19 @@ int mds_handle(struct ptlrpc_request *req) * match the last xid, however it could for a * committed, but still retained, open. */ - /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { + if (recovering) { rc = mds_filter_recovery_request(req, obd, &should_process); - if (rc || !should_process) + if (rc || should_process == 0) { + RETURN(rc); + } else if (should_process < 0) { + req->rq_status = should_process; + rc = ptlrpc_error(req); RETURN(rc); + } } } @@ -1680,7 +1674,7 @@ int mds_handle(struct ptlrpc_request *req) case MDS_CONNECT: DEBUG_REQ(D_INODE, req, "connect"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); - rc = target_handle_connect(req, mds_handle); + rc = target_handle_connect(req); if (!rc) /* Now that we have an export, set mds. */ mds = mds_req2mds(req); diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 38654a5d1bd4b0a3c94efe60ff55fc322456d63a..0561d6faeed21bf909ec3d1bdd4a32ce9bbe7fe2 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -351,7 +351,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) "last_transno "LPU64"\n", obd->obd_name, obd->obd_recoverable_clients, mds->mds_last_transno); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; + target_start_recovery_thread(obd, mds_handle); } if (mcd) diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index d29398fc1d348ffbf332d3536c608af609def0a7..8adf159ac4db6a1c0cf6a527deaea1d5a31e90e0 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -111,6 +111,7 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, struct obd_trans_info *oti); /* mds/handler.c */ +int mds_handle(struct ptlrpc_request *req); extern struct lvfs_callback_ops mds_lvfs_ops; int mds_lov_clean(struct obd_device *obd); extern int mds_iocontrol(unsigned int cmd, struct obd_export *exp, diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 1f7c19e4c3bbcffaa419fb0116a026953cfb8eda..81b22b3f8ce17994caceaef07436daad3d817c05 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -511,8 +511,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, } case OBD_IOC_ABORT_RECOVERY: - CERROR("aborting recovery for device %s\n", obd->obd_name); - target_abort_recovery(obd); + target_stop_recovery_thread(obd); RETURN(0); default: diff --git a/lustre/mgmt/mgmt_svc.c b/lustre/mgmt/mgmt_svc.c index 88fbf48de3801ae4bc50dd18899558b4788a6613..0dff68971195c80fdd7331649dedf8cd11ee46bc 100644 --- a/lustre/mgmt/mgmt_svc.c +++ b/lustre/mgmt/mgmt_svc.c @@ -61,7 +61,7 @@ static int mgmt_handler(struct ptlrpc_request *req) break; case MGMT_CONNECT: DEBUG_REQ(D_RPCTRACE, req, "connect"); - rc = target_handle_connect(req, NULL /* no recovery handler */); + rc = target_handle_connect(req); break; case MGMT_DISCONNECT: DEBUG_REQ(D_RPCTRACE, req, "disconnect"); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index b464c7b106e0bada1c5449911112bbe58b2ef999..a078bb515ae5bf5554ea3422fb7ca84e357ca409 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -48,6 +48,7 @@ #include <linux/obd_class.h> #include <linux/obd_lov.h> +#include <linux/obd_ost.h> #include <linux/lustre_dlm.h> #include <linux/lustre_fsfilt.h> #include <linux/lprocfs_status.h> @@ -509,7 +510,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) LPU64"\n", obd->obd_recoverable_clients, le64_to_cpu(fsd->fsd_last_transno)); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; + target_start_recovery_thread(obd, ost_handle); } if (fcd) @@ -2717,8 +2718,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp, switch (cmd) { case OBD_IOC_ABORT_RECOVERY: - CERROR("aborting recovery for device %s\n", obd->obd_name); - target_abort_recovery(obd); + target_stop_recovery_thread(obd); RETURN(0); case OBD_IOC_SET_READONLY: { diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 9015771fa77e28fbdffdc169b008f82214cc9393..958c73f7f7ec41bff1392ac56421550a2724aa57 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -752,6 +752,7 @@ int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) } RETURN(rc); } +EXPORT_SYMBOL(ost_brw_write); static int ost_san_brw(struct ptlrpc_request *req, int cmd) { @@ -925,7 +926,7 @@ static int ost_filter_recovery_request(struct ptlrpc_request *req, -static int ost_handle(struct ptlrpc_request *req) +int ost_handle(struct ptlrpc_request *req) { struct obd_trans_info trans_info = { 0, }; struct obd_trans_info *oti = &trans_info; @@ -937,7 +938,7 @@ static int ost_handle(struct ptlrpc_request *req) /* XXX identical to MDS */ if (req->rq_reqmsg->opc != OST_CONNECT) { struct obd_device *obd; - int abort_recovery, recovering; + int recovering; exp = req->rq_export; @@ -952,16 +953,18 @@ static int ost_handle(struct ptlrpc_request *req) /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { + if (recovering) { rc = ost_filter_recovery_request(req, obd, &should_process); if (rc || !should_process) RETURN(rc); + if (should_process < 0) { + req->rq_status = should_process; + rc = ptlrpc_error(req); + RETURN(rc); + } } } @@ -971,7 +974,7 @@ static int ost_handle(struct ptlrpc_request *req) case OST_CONNECT: { CDEBUG(D_INODE, "connect\n"); OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0); - rc = target_handle_connect(req, ost_handle); + rc = target_handle_connect(req); break; } case OST_DISCONNECT: @@ -1135,6 +1138,7 @@ out: target_send_reply(req, rc, fail); return 0; } +EXPORT_SYMBOL(ost_handle); int ost_attach(struct obd_device *dev, obd_count len, void *data) { diff --git a/lustre/ptlbd/rpc.c b/lustre/ptlbd/rpc.c index 5af32494f516195c570265bca2ed04292e403f6d..d7407109681343e8c804467c5e82471daa78397e 100644 --- a/lustre/ptlbd/rpc.c +++ b/lustre/ptlbd/rpc.c @@ -354,7 +354,7 @@ int ptlbd_handle(struct ptlrpc_request *req) swab = lustre_msg_swabbed (req->rq_reqmsg); if (req->rq_reqmsg->opc == PTLBD_CONNECT) { - rc = target_handle_connect(req, ptlbd_handle); + rc = target_handle_connect(req); target_send_reply(req, rc, OBD_FAIL_PTLRPC); RETURN(0); } diff --git a/lustre/tests/replay-dual.sh b/lustre/tests/replay-dual.sh index 9954e0cba966a7d1f72ac3cf63eef8df8ec45290..73c922d48bbd17c0b3c7385f5fb051b239a41f69 100755 --- a/lustre/tests/replay-dual.sh +++ b/lustre/tests/replay-dual.sh @@ -37,6 +37,9 @@ gen_config() { build_test_filter +SETUP=${SETUP:-"setup"} +CLEANUP=${CLEANUP:-"cleanup"} + cleanup() { # make sure we are using the primary MDS, so the config log will # be able to clean up properly. @@ -61,29 +64,33 @@ if [ "$ONLY" == "cleanup" ]; then exit fi -gen_config -start ost --reformat $OSTLCONFARGS -PINGER=`cat /proc/fs/lustre/pinger` +setup() { + gen_config + start ost --reformat $OSTLCONFARGS + PINGER=`cat /proc/fs/lustre/pinger` -if [ "$PINGER" != "on" ]; then - echo "ERROR: Lustre must be built with --enable-pinger for replay-dual" - stop ost - exit 1 -fi + if [ "$PINGER" != "on" ]; then + echo "ERROR: Lustre must be built with --enable-pinger for replay-dual" + stop ost + exit 1 + fi -start ost2 --reformat $OSTLCONFARGS -[ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE -for mds in `mds_list`; do - start $mds --reformat $MDSLCONFARGS -done -grep " $MOUNT " /proc/mounts || zconf_mount `hostname` $MOUNT -grep " $MOUNT2 " /proc/mounts || zconf_mount `hostname` $MOUNT2 + start ost2 --reformat $OSTLCONFARGS + [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE + for mds in `mds_list`; do + start $mds --reformat $MDSLCONFARGS + done + grep " $MOUNT " /proc/mounts || zconf_mount `hostname` $MOUNT + grep " $MOUNT2 " /proc/mounts || zconf_mount `hostname` $MOUNT2 -echo $TIMEOUT > /proc/sys/lustre/timeout -echo $UPCALL > /proc/sys/lustre/upcall + echo $TIMEOUT > /proc/sys/lustre/timeout + echo $UPCALL > /proc/sys/lustre/upcall +} +$SETUP [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE + test_1() { touch $MOUNT1/a replay_barrier mds1 @@ -341,7 +348,25 @@ test_13() { } run_test 13 "close resend timeout" +test_20 () { + replay_barrier mds1 + multiop $MOUNT2/$tfile O_c & + pid2=$! + multiop $MOUNT1/$tfile O_c & + pid1=$! + # give multiop a chance to open + sleep 1 + kill -USR1 $pid2 + kill -USR1 $pid1 + sleep 1 + umount $MOUNT2 + facet_failover mds1 + df || df || return 1 + zconf_mount `hostname` $MOUNT2 +} +run_test 20 "replay open, Abort recovery, don't assert (3892)" + if [ "$ONLY" != "setup" ]; then equals_msg test complete, cleaning up - cleanup + $CLEANUP fi