diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index d9787e5ba30ff4447ee6bf6c2085671a73762688..226e23d005d9ea0cd7576af265184eb4e051d0f3 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -37,11 +37,10 @@ SUBDIRS = linux lustre EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h \ - lustre_commit_confd.h lustre_debug.h lustre_disk.h \ - lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \ - lustre_handles.h lustre_import.h lustre_lib.h class_hash.h \ - lustre_lite.h lustre_log.h lustre_mds.h lustre_net.h \ - lustre_param.h lustre_quota.h lustre_ucache.h lvfs.h \ - obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \ - obd_ost.h obd_support.h lustre_cache.h lustre_ver.h \ - interval_tree.h + lustre_debug.h lustre_disk.h lustre_dlm.h lustre_export.h \ + lustre_fsfilt.h lustre_ha.h lustre_handles.h lustre_import.h \ + lustre_lib.h lustre_sec.h lustre_lite.h lustre_log.h lustre_mds.h \ + lustre_mdc.h lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h \ + class_hash.h obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \ + obd_ost.h obd_support.h lustre_cache.h lustre_ver.h \ + interval_tree.h diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index 91e1f9343fd464707d57c279215695cfcaefdcc5..c5afafb8ce6a99fabbba9624074a96ee64064b02 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -112,16 +112,44 @@ extern int llog_cancel_rec(struct llog_handle *loghandle, int index); extern int llog_close(struct llog_handle *cathandle); extern int llog_get_size(struct llog_handle *loghandle); -/* llog_cat.c - catalog api */ +/* llog_cat.c - catalog api */ struct llog_process_data { - void *lpd_data; - llog_cb_t lpd_cb; + /** + * Any useful data needed while processing catalog. This is + * passed later to process callback. + */ + void *lpd_data; + /** + * Catalog process callback function, called for each record + * in catalog. + */ + llog_cb_t lpd_cb; }; struct llog_process_cat_data { - int first_idx; - int last_idx; - /* to process catalog across zero record */ + /** + * Temporary stored first_idx while scanning log. + */ + int lpcd_first_idx; + /** + * Temporary stored last_idx while scanning log. + */ + int lpcd_last_idx; +}; + +struct llog_process_cat_args { + /** + * Llog context used in recovery thread on OST (recov_thread.c) + */ + struct llog_ctxt *lpca_ctxt; + /** + * Llog callback used in recovery thread on OST (recov_thread.c) + */ + void *lpca_cb; + /** + * Data pointer for llog callback. + */ + void *lpca_arg; }; int llog_cat_put(struct llog_handle *cathandle); @@ -130,6 +158,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, int llog_cat_cancel_records(struct llog_handle *cathandle, int count, struct llog_cookie *cookies); int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); +int llog_cat_process_thread(void *data); int llog_cat_reverse_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); int llog_cat_set_first_idx(struct llog_handle *cathandle, int index); @@ -178,9 +207,9 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm, int count, struct llog_cookie *cookies, int flags); int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp); -int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid); +int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid); struct llog_operations { int (*lop_write_rec)(struct llog_handle *loghandle, @@ -234,6 +263,73 @@ struct llog_ctxt { void *llog_proc_cb; }; +#define LCM_NAME_SIZE 64 + +struct llog_commit_master { + /** + * Thread control flags (start, stop, etc.) + */ + long lcm_flags; + /** + * Number of llcds onthis lcm. + */ + atomic_t lcm_count; + /** + * Ptlrpc requests set. All cancel rpcs go via this request set. + */ + struct ptlrpc_request_set *lcm_set; + /** + * Thread control structure. Used for control commit thread. + */ + struct ptlrpcd_ctl lcm_pc; + /** + * Busy resources waitq + */ + cfs_waitq_t lcm_waitq; + /** + * Commit thread name buffer. Only used for thread start. + */ + char lcm_name[LCM_NAME_SIZE]; +}; + +struct llog_canceld_ctxt { + /** + * Llog context this llcd is attached to. Used for accessing + * ->loc_import and others in process of canceling cookies + * gathered in this llcd. + */ + struct llog_ctxt *llcd_ctxt; + /** + * Cancel thread control stucture pointer. Used for accessing + * it to see if should stop processing and other needs. + */ + struct llog_commit_master *llcd_lcm; + /** + * Maximal llcd size. Used in calculations on how much of room + * left in llcd to cookie comming cookies. + */ + int llcd_size; + /** + * Current llcd size while gathering cookies. This should not be + * more than ->llcd_size. Used for determining if we need to + * send this llcd (if full) and allocate new one. This is also + * used for copying new cookie at the end of buffer. + */ + int llcd_cookiebytes; + /** + * Pointer to the start of cookies buffer. + */ + struct llog_cookie llcd_cookies[0]; +}; + +/* ptlrpc/recov_thread.c */ +extern struct llog_commit_master *llog_recov_thread_init(char *name); +extern void llog_recov_thread_fini(struct llog_commit_master *lcm, + int force); +extern int llog_recov_thread_start(struct llog_commit_master *lcm); +extern void llog_recov_thread_stop(struct llog_commit_master *lcm, + int force); + #ifndef __KERNEL__ #define cap_raise(c, flag) do {} while(0) @@ -308,10 +404,10 @@ static inline int llog_data_len(int len) do { \ if ((ctxt) == NULL) \ break; \ - CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", (ctxt), \ - atomic_read(&(ctxt)->loc_refcount) - 1); \ LASSERT(atomic_read(&(ctxt)->loc_refcount) > 0); \ LASSERT(atomic_read(&(ctxt)->loc_refcount) < 0x5a5a5a); \ + CDEBUG(D_INFO, "PUTting ctxt %p : new refcount %d\n", (ctxt), \ + atomic_read(&(ctxt)->loc_refcount) - 1); \ __llog_ctxt_put(ctxt); \ } while (0) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index d848763a95c1fa7cb0cce8c00ea02703fd3e2e85..3bf9d5461defbdb7f284f47851503097fe8d0ce2 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -620,6 +620,69 @@ struct ptlrpc_service { //struct ptlrpc_srv_ni srv_interfaces[0]; }; +struct ptlrpcd_ctl { + /** + * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_STOP_FORCE) + */ + unsigned long pc_flags; + /** + * Thread lock protecting structure fields. + */ + spinlock_t pc_lock; + /** + * Start completion. + */ + struct completion pc_starting; + /** + * Stop completion. + */ + struct completion pc_finishing; + /** + * Thread requests set. + */ + struct ptlrpc_request_set *pc_set; + /** + * Thread name used in cfs_daemonize() + */ + char pc_name[16]; +#ifndef __KERNEL__ + /** + * Async rpcs flag to make sure that ptlrpcd_check() is called only + * once. + */ + int pc_recurred; + /** + * Currently not used. + */ + void *pc_callback; + /** + * User-space async rpcs callback. + */ + void *pc_wait_callback; + /** + * User-space check idle rpcs callback. + */ + void *pc_idle_callback; +#endif +}; + +/* Bits for pc_flags */ +enum ptlrpcd_ctl_flags { + /** + * Ptlrpc thread start flag. + */ + LIOD_START = 1 << 0, + /** + * Ptlrpc thread stop flag. + */ + LIOD_STOP = 1 << 1, + /** + * Ptlrpc thread stop force flag. This will cause also + * aborting any inflight rpcs handled by thread. + */ + LIOD_STOP_FORCE = 1 << 2 +}; + /* ptlrpc/events.c */ extern lnet_handle_eq_t ptlrpc_eq_h; extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, @@ -700,6 +763,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req); void ptlrpc_unregister_reply(struct ptlrpc_request *req); void ptlrpc_restart_req(struct ptlrpc_request *req); void ptlrpc_abort_inflight(struct obd_import *imp); +void ptlrpc_abort_set(struct ptlrpc_request_set *set); struct ptlrpc_request_set *ptlrpc_prep_set(void); int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, @@ -712,13 +776,16 @@ void ptlrpc_interrupted_set(void *data); void ptlrpc_mark_interrupted(struct ptlrpc_request *req); void ptlrpc_set_destroy(struct ptlrpc_request_set *); void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *); -void ptlrpc_set_add_new_req(struct ptlrpc_request_set *, - struct ptlrpc_request *); +int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, + struct ptlrpc_request *req); void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool); void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq); -struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int, - void (*populate_pool)(struct ptlrpc_request_pool *, int)); + +struct ptlrpc_request_pool * +ptlrpc_init_rq_pool(int, int, + void (*populate_pool)(struct ptlrpc_request_pool *, int)); + void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req); struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count, __u32 *lengths, @@ -928,6 +995,8 @@ void ping_evictor_stop(void); #endif /* ptlrpc/ptlrpcd.c */ +int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc); +void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force); void ptlrpcd_wake(struct ptlrpc_request *req); void ptlrpcd_add_req(struct ptlrpc_request *req); int ptlrpcd_addref(void); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 3477fefbe6f0a9855b72775c44a8d73f09b8b540..1a231e9c41ec78a0ef6c7be806d3cb76689cc873 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -354,7 +354,7 @@ struct filter_obd { int fo_fmd_max_num; /* per exp filter_mod_data */ int fo_fmd_max_age; /* jiffies to fmd expiry */ - void *fo_lcm; + struct llog_commit_master *fo_lcm; }; #define OSC_MAX_RIF_DEFAULT 8 diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 722a669c75f489543e3a27c981e44735f38d9807..7dfe16dd4bd478ed6bcf203bbc23ceb80afadc84 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -66,7 +66,6 @@ #include <obd_lov.h> #include <lustre_fsfilt.h> #include <lprocfs_status.h> -#include <lustre_commit_confd.h> #include <lustre_quota.h> #include <lustre_disk.h> #include <lustre_param.h> diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 96cf600d383c87400ba868ae8e6ecd948b2854d7..a6ba622d6d7066a211fae411f15002ef741a2137 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -52,9 +52,7 @@ #include <obd_class.h> #include <lustre_fsfilt.h> #include <lustre_mds.h> -#include <lustre_commit_confd.h> #include <lustre_log.h> - #include "mds_internal.h" static int mds_llog_origin_add(struct llog_ctxt *ctxt, diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index 8c9d55ad75f0cb96aba86669adf6c3571ab74e4c..f24a9dc4d2a43f98af0da6f18d00187db95b1664 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -54,7 +54,6 @@ #include <obd_class.h> #include <lustre_fsfilt.h> #include <lustre_mds.h> -#include <lustre_commit_confd.h> #include <lvfs.h> #include "mds_internal.h" diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index 62d333e46a6e4c0f8903ed185f874d69570f469c..a3890846737fcd4d09f04ed7970e233832bf0a6a 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -57,7 +57,6 @@ #include <lustre_dlm.h> #include <lprocfs_status.h> #include <lustre_fsfilt.h> -#include <lustre_commit_confd.h> #include <lustre_disk.h> #include "mgs_internal.h" diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index de5a7c5aa9c0033c6d3e736b955401cf69155929..14cd8fd9c518aacbbfc5c8a319b6b55bcb0c8194 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -781,13 +781,13 @@ void class_import_put(struct obd_import *import) { ENTRY; - CDEBUG(D_INFO, "import %p refcount=%d\n", import, - atomic_read(&import->imp_refcount) - 1); - LASSERT(atomic_read(&import->imp_refcount) > 0); LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a); LASSERT(list_empty(&import->imp_zombie_chain)); + CDEBUG(D_INFO, "import %p refcount=%d\n", import, + atomic_read(&import->imp_refcount) - 1); + if (atomic_dec_and_test(&import->imp_refcount)) { CDEBUG(D_INFO, "final put import %p\n", import); diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 2a36b27e84dfafa0b56c2563a25c2851fb5f9f5d..62105b3c0a94fad4ad236dddea3c4c7ce89e3c25 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -245,11 +245,11 @@ static int llog_process_thread(void *arg) cfs_daemonize_ctxt("llog_process_thread"); if (cd != NULL) { - last_called_index = cd->first_idx; - index = cd->first_idx + 1; + last_called_index = cd->lpcd_first_idx; + index = cd->lpcd_first_idx + 1; } - if (cd != NULL && cd->last_idx) - last_index = cd->last_idx; + if (cd != NULL && cd->lpcd_last_idx) + last_index = cd->lpcd_last_idx; else last_index = LLOG_BITMAP_BYTES * 8 - 1; @@ -347,7 +347,7 @@ static int llog_process_thread(void *arg) out: if (cd != NULL) - cd->last_idx = last_called_index; + cd->lpcd_last_idx = last_called_index; if (buf) OBD_FREE(buf, LLOG_CHUNK_SIZE); lpi->lpi_rc = rc; @@ -414,9 +414,9 @@ int llog_reverse_process(struct llog_handle *loghandle, llog_cb_t cb, RETURN(-ENOMEM); if (cd != NULL) - first_index = cd->first_idx + 1; - if (cd != NULL && cd->last_idx) - index = cd->last_idx; + first_index = cd->lpcd_first_idx + 1; + if (cd != NULL && cd->lpcd_last_idx) + index = cd->lpcd_last_idx; else index = LLOG_BITMAP_BYTES * 8 - 1; diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 4678209b8bb48800df3dd71b51f701a645bb4c6f..3470611d78393952afcc6235735e1c925e65f25c 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -394,14 +394,14 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) CWARN("catlog "LPX64" crosses index zero\n", cat_llh->lgh_id.lgl_oid); - cd.first_idx = llh->llh_cat_idx; - cd.last_idx = 0; + cd.lpcd_first_idx = llh->llh_cat_idx; + cd.lpcd_last_idx = 0; rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); if (rc != 0) RETURN(rc); - cd.first_idx = 0; - cd.last_idx = cat_llh->lgh_last_idx; + cd.lpcd_first_idx = 0; + cd.lpcd_last_idx = cat_llh->lgh_last_idx; rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd); } else { rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL); @@ -411,6 +411,56 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data) } EXPORT_SYMBOL(llog_cat_process); +#ifdef __KERNEL__ +int llog_cat_process_thread(void *data) +{ + struct llog_process_cat_args *args = data; + struct llog_ctxt *ctxt = args->lpca_ctxt; + struct llog_handle *llh = NULL; + void *cb = args->lpca_cb; + struct llog_logid logid; + int rc; + ENTRY; + + cfs_daemonize_ctxt("ll_log_process"); + + logid = *(struct llog_logid *)(args->lpca_arg); + rc = llog_create(ctxt, &llh, &logid, NULL); + if (rc) { + CERROR("llog_create() failed %d\n", rc); + GOTO(out, rc); + } + rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); + if (rc) { + CERROR("llog_init_handle failed %d\n", rc); + GOTO(release_llh, rc); + } + + if (cb) { + rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); + if (rc != LLOG_PROC_BREAK) + CERROR("llog_cat_process() failed %d\n", rc); + } else { + CWARN("No callback function for recovery\n"); + } + + /* + * Make sure that all cached data is sent. + */ + llog_sync(ctxt, NULL); + EXIT; +release_llh: + rc = llog_cat_put(llh); + if (rc) + CERROR("llog_cat_put() failed %d\n", rc); +out: + llog_ctxt_put(ctxt); + OBD_FREE(args, sizeof(*args)); + return rc; +} +EXPORT_SYMBOL(llog_cat_process_thread); +#endif + static int llog_cat_reverse_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data) { @@ -455,15 +505,15 @@ int llog_cat_reverse_process(struct llog_handle *cat_llh, CWARN("catalog "LPX64" crosses index zero\n", cat_llh->lgh_id.lgl_oid); - cd.first_idx = 0; - cd.last_idx = cat_llh->lgh_last_idx; + cd.lpcd_first_idx = 0; + cd.lpcd_last_idx = cat_llh->lgh_last_idx; rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, &d, &cd); if (rc != 0) RETURN(rc); - cd.first_idx = le32_to_cpu(llh->llh_cat_idx); - cd.last_idx = 0; + cd.lpcd_first_idx = le32_to_cpu(llh->llh_cat_idx); + cd.lpcd_last_idx = 0; rc = llog_reverse_process(cat_llh, llog_cat_reverse_process_cb, &d, &cd); } else { diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 95d102d15a6ed15380f9c4285edb30bc3187e3c5..7d371c25c4aa24eba849f2eb05590001b0c23d9c 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -124,6 +124,9 @@ int llog_cleanup(struct llog_ctxt *ctxt) idx = ctxt->loc_idx; /*try to free the ctxt */ rc = __llog_ctxt_put(ctxt); + if (rc) + CERROR("Error %d while cleaning up ctxt %p\n", + rc, ctxt); l_wait_event(obd->obd_llog_waitq, llog_ctxt_null(obd, idx), &lwi); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 2d3283a31df0ff5494cf239518d15adfe166f8e4..1cea0a0e1d57b3fa5aae346a6795f44bc8ba5668 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -492,7 +492,6 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) if (err) CERROR("Precleanup %s returned %d\n", obd->obd_name, err); - class_decref(obd); obd->obd_set_up = 0; @@ -1109,15 +1108,15 @@ int class_config_parse_llog(struct llog_ctxt *ctxt, char *name, /* continue processing from where we last stopped to end-of-log */ if (cfg) - cd.first_idx = cfg->cfg_last_idx; - cd.last_idx = 0; + cd.lpcd_first_idx = cfg->cfg_last_idx; + cd.lpcd_last_idx = 0; rc = llog_process(llh, class_config_llog_handler, cfg, &cd); CDEBUG(D_CONFIG, "Processed log %s gen %d-%d (rc=%d)\n", name, - cd.first_idx + 1, cd.last_idx, rc); + cd.lpcd_first_idx + 1, cd.lpcd_last_idx, rc); if (cfg) - cfg->cfg_last_idx = cd.last_idx; + cfg->cfg_last_idx = cd.lpcd_last_idx; parse_out: rc2 = llog_close(llh); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 681e3d1f02e45bbd589b83ef7448fef30e8ce0e8..3dea57cc87c6fc2455b6b6b38385f2bda7ad7112 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -70,7 +70,6 @@ #include <lustre_fsfilt.h> #include <lprocfs_status.h> #include <lustre_log.h> -#include <lustre_commit_confd.h> #include <libcfs/list.h> #include <lustre_disk.h> #include <lustre_quota.h> @@ -1956,70 +1955,74 @@ static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt, int rc; ENTRY; - OBD_ALLOC(filter->fo_lcm, sizeof(struct llog_commit_master)); + filter->fo_lcm = llog_recov_thread_init(obd->obd_name); if (!filter->fo_lcm) RETURN(-ENOMEM); - rc = llog_init_commit_master((struct llog_commit_master *) - filter->fo_lcm); - if (rc) - GOTO(cleanup, rc); - filter_mds_ost_repl_logops = llog_client_ops; filter_mds_ost_repl_logops.lop_cancel = llog_obd_repl_cancel; - filter_mds_ost_repl_logops.lop_connect = llog_repl_connect; + filter_mds_ost_repl_logops.lop_connect = llog_obd_repl_connect; filter_mds_ost_repl_logops.lop_sync = llog_obd_repl_sync; rc = llog_setup(obd, LLOG_MDS_OST_REPL_CTXT, tgt, 0, NULL, &filter_mds_ost_repl_logops); if (rc) - GOTO(cleanup, rc); + GOTO(cleanup_lcm, rc); /* FIXME - assign unlink_cb for filter's recovery */ ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT); ctxt->llog_proc_cb = filter_recov_log_mds_ost_cb; - ctxt->loc_lcm = obd->u.filter.fo_lcm; - rc = llog_start_commit_thread(ctxt->loc_lcm); + ctxt->loc_lcm = filter->fo_lcm; llog_ctxt_put(ctxt); - if (rc) - GOTO(cleanup, rc); rc = llog_setup(obd, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL, &filter_size_orig_logops); - -cleanup: - if (rc) { - llog_cleanup_commit_master(filter->fo_lcm, 0); - OBD_FREE(filter->fo_lcm, sizeof(struct llog_commit_master)); - filter->fo_lcm = NULL; - } + if (rc) + GOTO(cleanup_ctxt, rc); RETURN(rc); +cleanup_ctxt: + ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT); + if (ctxt) + llog_cleanup(ctxt); +cleanup_lcm: + llog_recov_thread_fini(filter->fo_lcm, 1); + filter->fo_lcm = NULL; + return rc; } static int filter_llog_finish(struct obd_device *obd, int count) { + struct filter_obd *filter = &obd->u.filter; struct llog_ctxt *ctxt; int rc = 0, rc2 = 0; ENTRY; - if (obd->u.filter.fo_lcm) { - llog_cleanup_commit_master((struct llog_commit_master *) - obd->u.filter.fo_lcm, 1); - OBD_FREE(obd->u.filter.fo_lcm, - sizeof(struct llog_commit_master)); - obd->u.filter.fo_lcm = NULL; - } - ctxt = llog_get_context(obd, LLOG_MDS_OST_REPL_CTXT); - if (ctxt) + if (ctxt) { + /* + * Balance class_import_get() called in llog_receptor_accept(). + * This is safe to do here, as llog is already synchronized and + * its import may go. + */ + mutex_down(&ctxt->loc_sem); + if (ctxt->loc_imp) { + class_import_put(ctxt->loc_imp); + ctxt->loc_imp = NULL; + } + mutex_up(&ctxt->loc_sem); rc = llog_cleanup(ctxt); - + } ctxt = llog_get_context(obd, LLOG_SIZE_ORIG_CTXT); if (ctxt) rc2 = llog_cleanup(ctxt); if (!rc) rc = rc2; + if (filter->fo_lcm) { + llog_recov_thread_fini(filter->fo_lcm, obd->obd_force); + filter->fo_lcm = NULL; + } + RETURN(rc); } diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index aaa429739a639cc74c830753c8126b5975e64d6b..017702756ea312520f7f19416900f17da8c38cb0 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -50,9 +50,8 @@ #include <libcfs/list.h> #include <obd_class.h> +#include <lustre_log.h> #include <lustre_fsfilt.h> -#include <lustre_commit_confd.h> - #include "filter_internal.h" int filter_log_sz_change(struct llog_handle *cathandle, diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 799e12454bb975a78ba99bafa1f149b50b750ca8..f07a0c131310fef1f88d4aab978aadbd07814589 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -52,9 +52,9 @@ #include <lustre_debug.h> #include <linux/init.h> #include <lprocfs_status.h> -#include <lustre_commit_confd.h> #include <libcfs/list.h> #include <lustre_quota.h> +#include <lustre_log.h> #include "ost_internal.h" static int oss_num_threads; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 0fbaa9c5b6baa0401461fb9562fa701f59d69264..62444d189c9a39a30ccd2bde0451fcaa60a5f8eb 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -672,16 +672,36 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, atomic_inc(&req->rq_import->imp_inflight); } -/* lock so many callers can add things, the context that owns the set - * is supposed to notice these and move them into the set proper. */ -void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set, - struct ptlrpc_request *req) +/** + * Lock so many callers can add things, the context that owns the set + * is supposed to notice these and move them into the set proper. + */ +int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, + struct ptlrpc_request *req) { + struct ptlrpc_request_set *set = pc->pc_set; + + /* + * Let caller know that we stopped and will not handle this request. + * It needs to take care itself of request. + */ + if (test_bit(LIOD_STOP, &pc->pc_flags)) + return -EALREADY; + spin_lock(&set->set_new_req_lock); - /* The set takes over the caller's request reference */ + /* + * The set takes over the caller's request reference. + */ list_add_tail(&req->rq_set_chain, &set->set_new_requests); req->rq_set = set; spin_unlock(&set->set_new_req_lock); + + /* + * Let thead know that we added something and better it to wake up + * and process. + */ + cfs_waitq_signal(&set->set_waitq); + return 0; } /* @@ -2132,6 +2152,29 @@ void ptlrpc_abort_inflight(struct obd_import *imp) EXIT; } +void ptlrpc_abort_set(struct ptlrpc_request_set *set) +{ + struct list_head *tmp, *n; + + LASSERT(set != NULL); + + list_for_each_safe(tmp, n, &set->set_requests) { + struct ptlrpc_request *req = + list_entry(tmp, struct ptlrpc_request, rq_set_chain); + + spin_lock (&req->rq_lock); + if (req->rq_phase != RQ_PHASE_RPC) { + spin_unlock (&req->rq_lock); + continue; + } + + req->rq_err = 1; + req->rq_status = -EINTR; + ptlrpc_wake_client_req(req); + spin_unlock (&req->rq_lock); + } +} + static __u64 ptlrpc_last_xid = 0; spinlock_t ptlrpc_last_xid_lock; diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 5d2e294576dd8ba4f3fa28f98aa0aad64e55f274..439a84a63b25eef0b5b25961f54fa6ef60da7881 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -94,6 +94,10 @@ int ping_evictor_wake(struct obd_export *exp); #define ping_evictor_wake(exp) 1 #endif +/* recov_thread.c */ +int llog_recov_init(void); +void llog_recov_fini(void); + static inline int ll_rpc_recoverable_error(int rc) { return (rc == -ENOTCONN || rc == -ENODEV); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index be33fef5cf918ff329b9a99b5a3307b18c608da2..be37df96439ad564be7bafb65abc2d1ecc281ee9 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -97,10 +97,17 @@ __init int ptlrpc_init(void) if (ptlrpc_cbdata_slab == NULL) GOTO(cleanup, rc); + cleanup_phase = 5; + rc = llog_recov_init(); + if (rc) + GOTO(cleanup, rc); + RETURN(0); cleanup: switch(cleanup_phase) { + case 5: + cfs_mem_cache_destroy(ptlrpc_cbdata_slab); case 4: ldlm_exit(); case 3: @@ -118,6 +125,7 @@ cleanup: #ifdef __KERNEL__ static void __exit ptlrpc_exit(void) { + llog_recov_fini(); ldlm_exit(); ptlrpc_stop_pinger(); ptlrpc_exit_portals(); diff --git a/lustre/ptlrpc/ptlrpcd.c b/lustre/ptlrpc/ptlrpcd.c index ac49fac80e1a19734b29f0a26a153be7e5df9d60..94f4925cad71ffb79d84acbb8ff73f1226056d95 100644 --- a/lustre/ptlrpc/ptlrpcd.c +++ b/lustre/ptlrpc/ptlrpcd.c @@ -54,21 +54,6 @@ #include <obd_support.h> /* for OBD_FAIL_CHECK */ #include <lprocfs_status.h> -#define LIOD_STOP 0 -struct ptlrpcd_ctl { - unsigned long pc_flags; - spinlock_t pc_lock; - struct completion pc_starting; - struct completion pc_finishing; - struct ptlrpc_request_set *pc_set; - char pc_name[16]; -#ifndef __KERNEL__ - int pc_recurred; - void *pc_wait_callback; - void *pc_idle_callback; -#endif -}; - static struct ptlrpcd_ctl ptlrpcd_pc; static struct ptlrpcd_ctl ptlrpcd_recovery_pc; @@ -84,19 +69,40 @@ void ptlrpcd_wake(struct ptlrpc_request *req) cfs_waitq_signal(&rq_set->set_waitq); } -/* requests that are added to the ptlrpcd queue are sent via - * ptlrpcd_check->ptlrpc_check_set() */ +/* + * Requests that are added to the ptlrpcd queue are sent via + * ptlrpcd_check->ptlrpc_check_set(). + */ void ptlrpcd_add_req(struct ptlrpc_request *req) { struct ptlrpcd_ctl *pc; + int rc; if (req->rq_send_state == LUSTRE_IMP_FULL) pc = &ptlrpcd_pc; else pc = &ptlrpcd_recovery_pc; - ptlrpc_set_add_new_req(pc->pc_set, req); - cfs_waitq_signal(&pc->pc_set->set_waitq); + rc = ptlrpc_set_add_new_req(pc, req); + if (rc) { + int (*interpreter)(struct ptlrpc_request *, + void *, int); + + interpreter = req->rq_interpret_reply; + + /* + * Thread is probably in stop now so we need to + * kill this rpc as it was not added. Let's call + * interpret for it to let know we're killing it + * so that higher levels might free assosiated + * resources. + */ + req->rq_status = -EBADR; + interpreter(req, &req->rq_async_args, + req->rq_status); + req->rq_set = NULL; + ptlrpc_req_finished(req); + } } static int ptlrpcd_check(struct ptlrpcd_ctl *pc) @@ -114,15 +120,20 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) req = list_entry(pos, struct ptlrpc_request, rq_set_chain); list_del_init(&req->rq_set_chain); ptlrpc_set_add_req(pc->pc_set, req); - rc = 1; /* need to calculate its timeout */ + /* + * Need to calculate its timeout. + */ + rc = 1; } spin_unlock(&pc->pc_set->set_new_req_lock); if (pc->pc_set->set_remaining) { rc = rc | ptlrpc_check_set(pc->pc_set); - /* XXX our set never completes, so we prune the completed - * reqs after each iteration. boy could this be smarter. */ + /* + * XXX: our set never completes, so we prune the completed + * reqs after each iteration. boy could this be smarter. + */ list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) { req = list_entry(pos, struct ptlrpc_request, rq_set_chain); @@ -136,7 +147,9 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } if (rc == 0) { - /* If new requests have been added, make sure to wake up */ + /* + * If new requests have been added, make sure to wake up. + */ spin_lock(&pc->pc_set->set_new_req_lock); rc = !list_empty(&pc->pc_set->set_new_requests); spin_unlock(&pc->pc_set->set_new_req_lock); @@ -146,9 +159,11 @@ static int ptlrpcd_check(struct ptlrpcd_ctl *pc) } #ifdef __KERNEL__ -/* ptlrpc's code paths like to execute in process context, so we have this - * thread which spins on a set which contains the io rpcs. llite specifies - * ptlrpcd's set when it pushes pages down into the oscs */ +/* + * ptlrpc's code paths like to execute in process context, so we have this + * thread which spins on a set which contains the io rpcs. llite specifies + * ptlrpcd's set when it pushes pages down into the oscs. + */ static int ptlrpcd(void *arg) { struct ptlrpcd_ctl *pc = arg; @@ -157,16 +172,17 @@ static int ptlrpcd(void *arg) if ((rc = cfs_daemonize_ctxt(pc->pc_name))) { complete(&pc->pc_starting); - return rc; + goto out; } complete(&pc->pc_starting); - /* this mainloop strongly resembles ptlrpc_set_wait except - * that our set never completes. ptlrpcd_check calls ptlrpc_check_set - * when there are requests in the set. new requests come in - * on the set's new_req_list and ptlrpcd_check moves them into - * the set. */ + /* + * This mainloop strongly resembles ptlrpc_set_wait() except that our + * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when + * there are requests in the set. New requests come in on the set's + * new_req_list and ptlrpcd_check() moves them into the set. + */ while (1) { struct l_wait_info lwi; cfs_duration_t timeout; @@ -176,13 +192,26 @@ static int ptlrpcd(void *arg) l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi); + /* + * Abort inflight rpcs for forced stop case. + */ + if (test_bit(LIOD_STOP_FORCE, &pc->pc_flags)) + ptlrpc_abort_set(pc->pc_set); + if (test_bit(LIOD_STOP, &pc->pc_flags)) break; } - /* wait for inflight requests to drain */ + + /* + * Wait for inflight requests to drain. + */ if (!list_empty(&pc->pc_set->set_requests)) ptlrpc_set_wait(pc->pc_set); + complete(&pc->pc_finishing); +out: + clear_bit(LIOD_START, &pc->pc_flags); + clear_bit(LIOD_STOP, &pc->pc_flags); return 0; } @@ -193,14 +222,18 @@ int ptlrpcd_check_async_rpcs(void *arg) struct ptlrpcd_ctl *pc = arg; int rc = 0; - /* single threaded!! */ + /* + * Single threaded!! + */ pc->pc_recurred++; if (pc->pc_recurred == 1) { rc = ptlrpcd_check(pc); if (!rc) ptlrpc_expired_set(pc->pc_set); - /*XXX send replay requests */ + /* + * XXX: send replay requests. + */ if (pc == &ptlrpcd_recovery_pc) rc = ptlrpcd_check(pc); } @@ -219,29 +252,37 @@ int ptlrpcd_idle(void *arg) #endif -static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) +int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) { - int rc; - + int rc = 0; ENTRY; - memset(pc, 0, sizeof(*pc)); + + /* + * Do not allow start second thread for one pc. + */ + if (test_bit(LIOD_START, &pc->pc_flags)) { + CERROR("Starting second thread (%s) for same pc %p\n", + name, pc); + RETURN(-EALREADY); + } + + set_bit(LIOD_START, &pc->pc_flags); init_completion(&pc->pc_starting); init_completion(&pc->pc_finishing); - pc->pc_flags = 0; spin_lock_init(&pc->pc_lock); snprintf (pc->pc_name, sizeof (pc->pc_name), name); pc->pc_set = ptlrpc_prep_set(); if (pc->pc_set == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); #ifdef __KERNEL__ rc = cfs_kernel_thread(ptlrpcd, pc, 0); if (rc < 0) { ptlrpc_set_destroy(pc->pc_set); - RETURN(rc); + GOTO(out, rc); } - + rc = 0; wait_for_completion(&pc->pc_starting); #else pc->pc_wait_callback = @@ -250,14 +291,23 @@ static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc) pc->pc_idle_callback = liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs", &ptlrpcd_idle, pc); - (void)rc; #endif - RETURN(0); +out: + if (rc) + clear_bit(LIOD_START, &pc->pc_flags); + RETURN(rc); } -static void ptlrpcd_stop(struct ptlrpcd_ctl *pc) +void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force) { + if (!test_bit(LIOD_START, &pc->pc_flags)) { + CERROR("Thread for pc %p was not started\n", pc); + return; + } + set_bit(LIOD_STOP, &pc->pc_flags); + if (force) + set_bit(LIOD_STOP_FORCE, &pc->pc_flags); cfs_waitq_signal(&pc->pc_set->set_waitq); #ifdef __KERNEL__ wait_for_completion(&pc->pc_finishing); @@ -285,7 +335,7 @@ int ptlrpcd_addref(void) rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc); if (rc) { - ptlrpcd_stop(&ptlrpcd_pc); + ptlrpcd_stop(&ptlrpcd_pc, 0); --ptlrpcd_users; GOTO(out, rc); } @@ -298,8 +348,8 @@ void ptlrpcd_decref(void) { mutex_down(&ptlrpcd_sem); if (--ptlrpcd_users == 0) { - ptlrpcd_stop(&ptlrpcd_pc); - ptlrpcd_stop(&ptlrpcd_recovery_pc); + ptlrpcd_stop(&ptlrpcd_pc, 0); + ptlrpcd_stop(&ptlrpcd_recovery_pc, 0); } mutex_up(&ptlrpcd_sem); } diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index e63e7b7604214bd0e76086cdc0f861ca53308663..0ba82d9661c96ba0f81efa470f2edebff5a6c7b5 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -40,7 +40,9 @@ * - we do not share logs among different OST<->MDS connections, so that * if an OST or MDS fails it need only look at log(s) relevant to itself * - * Author: Andreas Dilger <adilger@clusterfs.com> + * Author: Andreas Dilger <adilger@clusterfs.com> + * Yury Umanets <yury.umanets@sun.com> + * Alexey Lyashkov <alexey.lyashkov@sun.com> */ #define DEBUG_SUBSYSTEM S_LOG @@ -56,9 +58,7 @@ # include <liblustre.h> #endif -#include <libcfs/kp30.h> #include <obd_class.h> -#include <lustre_commit_confd.h> #include <obd_support.h> #include <obd_class.h> #include <lustre_net.h> @@ -67,623 +67,576 @@ #include <lustre_log.h> #include "ptlrpc_internal.h" -#ifdef __KERNEL__ +static atomic_t llcd_count = ATOMIC_INIT(0); +static cfs_mem_cache_t *llcd_cache = NULL; -/* Allocate new commit structs in case we do not have enough. - * Make the llcd size small enough that it fits into a single page when we - * are sending/receiving it. */ -static int llcd_alloc(struct llog_commit_master *lcm) +#ifdef __KERNEL__ +enum { + LLOG_LCM_FL_START = 1 << 0, + LLOG_LCM_FL_EXIT = 1 << 1 +}; + +/** + * Allocate new llcd from cache, init it and return to caller. + * Bumps number of objects allocated. + */ +static struct llog_canceld_ctxt *llcd_alloc(void) { struct llog_canceld_ctxt *llcd; int llcd_size; - /* payload of lustre_msg V2 is bigger */ - llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); - OBD_ALLOC(llcd, - llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies)); - if (llcd == NULL) - return -ENOMEM; + /* + * Payload of lustre_msg V2 is bigger. + */ + llcd_size = CFS_PAGE_SIZE - + lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); + llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies); + OBD_SLAB_ALLOC(llcd, llcd_cache, CFS_ALLOC_STD, llcd_size); + if (!llcd) + return NULL; llcd->llcd_size = llcd_size; - llcd->llcd_lcm = lcm; - - spin_lock(&lcm->lcm_llcd_lock); - list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); - atomic_inc(&lcm->lcm_llcd_numfree); - spin_unlock(&lcm->lcm_llcd_lock); - - return 0; -} - -/* Get a free cookie struct from the list */ -static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm) -{ - struct llog_canceld_ctxt *llcd; - -repeat: - spin_lock(&lcm->lcm_llcd_lock); - if (list_empty(&lcm->lcm_llcd_free)) { - spin_unlock(&lcm->lcm_llcd_lock); - if (llcd_alloc(lcm) < 0) { - CERROR("unable to allocate log commit data!\n"); - return NULL; - } - /* check new llcd wasn't grabbed while lock dropped, b=7407 */ - goto repeat; - } - - llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list); - list_del(&llcd->llcd_list); - atomic_dec(&lcm->lcm_llcd_numfree); - spin_unlock(&lcm->lcm_llcd_lock); - llcd->llcd_cookiebytes = 0; - + atomic_inc(&llcd_count); return llcd; } -static void llcd_put(struct llog_canceld_ctxt *llcd) +/** + * Returns passed llcd to cache. + */ +static void llcd_free(struct llog_canceld_ctxt *llcd) { - struct llog_commit_master *lcm = llcd->llcd_lcm; - - llog_ctxt_put(llcd->llcd_ctxt); - if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { - int llcd_size = llcd->llcd_size + - offsetof(struct llog_canceld_ctxt, llcd_cookies); - OBD_FREE(llcd, llcd_size); - } else { - spin_lock(&lcm->lcm_llcd_lock); - list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); - atomic_inc(&lcm->lcm_llcd_numfree); - spin_unlock(&lcm->lcm_llcd_lock); - } + LASSERT(atomic_read(&llcd_count) > 0); + OBD_SLAB_FREE(llcd, llcd_cache, llcd->llcd_size); + atomic_dec(&llcd_count); } -/* Send some cookies to the appropriate target */ -static void llcd_send(struct llog_canceld_ctxt *llcd) +/** + * Copy passed @cookies to @llcd. + */ +static void llcd_copy(struct llog_canceld_ctxt *llcd, + struct llog_cookie *cookies) { - if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) { - spin_lock(&llcd->llcd_lcm->lcm_llcd_lock); - list_add_tail(&llcd->llcd_list, - &llcd->llcd_lcm->lcm_llcd_pending); - spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock); - } - cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1); + memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, + cookies, sizeof(*cookies)); + llcd->llcd_cookiebytes += sizeof(*cookies); } /** - * Grab llcd and assign it to passed @ctxt. Also set up backward link - * and get ref on @ctxt. + * Checks if passed cookie fits into llcd free space buffer. Returns + * 1 if yes and 0 otherwise. */ -static struct llog_canceld_ctxt *ctxt_llcd_grab(struct llog_ctxt *ctxt) +static int llcd_fit(struct llog_canceld_ctxt *llcd, + struct llog_cookie *cookies) { - struct llog_canceld_ctxt *llcd; - - LASSERT_SEM_LOCKED(&ctxt->loc_sem); - llcd = llcd_grab(ctxt->loc_lcm); - if (llcd == NULL) - return NULL; - - llcd->llcd_ctxt = llog_ctxt_get(ctxt); - ctxt->loc_llcd = llcd; + return (llcd->llcd_size - + llcd->llcd_cookiebytes) >= sizeof(*cookies); +} - CDEBUG(D_RPCTRACE,"grab llcd %p:%p\n", ctxt->loc_llcd, ctxt); - return llcd; +static void llcd_print(struct llog_canceld_ctxt *llcd, + const char *func, int line) +{ + CDEBUG(D_RPCTRACE, "Llcd (%p) at %s:%d:\n", llcd, func, line); + CDEBUG(D_RPCTRACE, " size: %d\n", llcd->llcd_size); + CDEBUG(D_RPCTRACE, " ctxt: %p\n", llcd->llcd_ctxt); + CDEBUG(D_RPCTRACE, " lcm : %p\n", llcd->llcd_lcm); + CDEBUG(D_RPCTRACE, " cookiebytes : %d\n", llcd->llcd_cookiebytes); } /** - * Put llcd in passed @ctxt. Set ->loc_llcd to NULL. + * Llcd completion function. Called uppon llcd send finish regardless + * sending result. Error is passed in @rc. Note, that this will be called + * in cleanup time when all inflight rpcs aborted. */ -static void ctxt_llcd_put(struct llog_ctxt *ctxt) +static int +llcd_interpret(struct ptlrpc_request *req, void *noused, int rc) { - mutex_down(&ctxt->loc_sem); - if (ctxt->loc_llcd != NULL) { - CDEBUG(D_RPCTRACE,"put llcd %p:%p\n", ctxt->loc_llcd, ctxt); - llcd_put(ctxt->loc_llcd); - ctxt->loc_llcd = NULL; - } - if (ctxt->loc_imp) { - class_import_put(ctxt->loc_imp); - ctxt->loc_imp = NULL; - } - mutex_up(&ctxt->loc_sem); + struct llog_canceld_ctxt *llcd = req->rq_async_args.pointer_arg[0]; + CDEBUG(D_RPCTRACE, "Sent llcd %p (%d)\n", llcd, rc); + llcd_free(llcd); + return 0; } - -/* deleted objects have a commit callback that cancels the MDS - * log record for the deletion. The commit callback calls this - * function + +/** + * Send @llcd to remote node. Free llcd uppon completion or error. Sending + * is performed in async style so this function will return asap without + * blocking. */ -int llog_obd_repl_cancel(struct llog_ctxt *ctxt, - struct lov_stripe_md *lsm, int count, - struct llog_cookie *cookies, int flags) +static int llcd_send(struct llog_canceld_ctxt *llcd) { - struct llog_canceld_ctxt *llcd; - int rc = 0; + int size[2] = { sizeof(struct ptlrpc_body), + llcd->llcd_cookiebytes }; + char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; + struct obd_import *import = NULL; + struct llog_commit_master *lcm; + struct ptlrpc_request *request; + struct llog_ctxt *ctxt; + int rc; ENTRY; - LASSERT(ctxt); + ctxt = llcd->llcd_ctxt; + if (!ctxt) { + CERROR("Invalid llcd with NULL ctxt found (%p)\n", + llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + LASSERT_SEM_LOCKED(&ctxt->loc_sem); - mutex_down(&ctxt->loc_sem); - llcd = ctxt->loc_llcd; + if (llcd->llcd_cookiebytes == 0) + GOTO(exit, rc = 0); - if (ctxt->loc_imp == NULL) { - CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt); - GOTO(out, rc = 0); + lcm = llcd->llcd_lcm; + + /* + * Check if we're in exit stage. Do not send llcd in + * this case. + */ + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + GOTO(exit, rc = -ENODEV); + + CDEBUG(D_RPCTRACE, "Sending llcd %p\n", llcd); + + import = llcd->llcd_ctxt->loc_imp; + if (!import || (import == LP_POISON) || + (import->imp_client == LP_POISON)) { + CERROR("Invalid import %p for llcd %p\n", + import, llcd); + GOTO(exit, rc = -ENODEV); } - if (count > 0 && cookies != NULL) { - if (llcd == NULL) { - llcd = ctxt_llcd_grab(ctxt); - if (llcd == NULL) { - CERROR("couldn't get an llcd - dropped "LPX64 - ":%x+%u\n", - cookies->lgc_lgl.lgl_oid, - cookies->lgc_lgl.lgl_ogen, - cookies->lgc_index); - GOTO(out, rc = -ENOMEM); - } - } - - memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, - cookies, sizeof(*cookies)); - llcd->llcd_cookiebytes += sizeof(*cookies); - } else { - if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW)) - GOTO(out, rc); + OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); + + /* + * No need to get import here as it is already done in + * llog_receptor_accept(). + */ + request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION, + OBD_LOG_CANCEL, 2, size,bufs); + if (request == NULL) { + CERROR("Can't allocate request for sending llcd %p\n", + llcd); + GOTO(exit, rc = -ENOMEM); } - if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) || - (flags & OBD_LLOG_FL_SENDNOW)) { - CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); - ctxt->loc_llcd = NULL; - llcd_send(llcd); + /* bug 5515 */ + request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; + request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; + ptlrpc_req_set_repsize(request, 1, NULL); + ptlrpc_at_set_req_timeout(request); + request->rq_interpret_reply = llcd_interpret; + request->rq_async_args.pointer_arg[0] = llcd; + rc = ptlrpc_set_add_new_req(&lcm->lcm_pc, request); + if (rc) { + ptlrpc_req_finished(request); + GOTO(exit, rc); } -out: - mutex_up(&ctxt->loc_sem); + RETURN(0); +exit: + CDEBUG(D_RPCTRACE, "Refused llcd %p\n", llcd); + llcd_free(llcd); return rc; } -EXPORT_SYMBOL(llog_obd_repl_cancel); -int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) +/** + * Attach @llcd to @ctxt. Establish llcd vs. ctxt reserve connection + * so hat they can refer each other. + */ +static int +llcd_attach(struct llog_ctxt *ctxt, struct llog_canceld_ctxt *llcd) { - int rc = 0; - ENTRY; - - if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { - CDEBUG(D_RPCTRACE,"reverse import disconnect\n"); - /* - * We put llcd because it is not going to sending list and - * thus, its refc will not be handled. We will handle it here. - */ - ctxt_llcd_put(ctxt); - } else { - /* - * Sending cancel. This means that ctxt->loc_llcd wil be - * put on sending list in llog_obd_repl_cancel() and in - * this case recovery thread will take care of it refc. - */ - rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); - } - - RETURN(rc); -} -EXPORT_SYMBOL(llog_obd_repl_sync); + struct llog_commit_master *lcm; -static void llog_lcm_dec(struct llog_commit_master *lcm) -{ - atomic_dec(&lcm->lcm_thread_total); - cfs_waitq_signal(&lcm->lcm_waitq); + LASSERT(ctxt != NULL && llcd != NULL); + LASSERT_SEM_LOCKED(&ctxt->loc_sem); + LASSERT(ctxt->loc_llcd == NULL); + lcm = ctxt->loc_lcm; + atomic_inc(&lcm->lcm_count); + CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n", + llcd, ctxt, atomic_read(&lcm->lcm_count)); + llcd->llcd_ctxt = llog_ctxt_get(ctxt); + llcd->llcd_lcm = ctxt->loc_lcm; + ctxt->loc_llcd = llcd; + return 0; } -static int log_commit_thread(void *arg) +/** + * Opposite to llcd_attach(). Detaches llcd from its @ctxt. This makes + * sure that this llcd will not be found another time we try to cancel. + */ +static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) { - struct llog_commit_daemon *lcd = arg; - struct llog_commit_master *lcm = lcd->lcd_lcm; - struct llog_canceld_ctxt *llcd, *n; - struct obd_import *import = NULL; - ENTRY; - - THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1, - "ll_log_comt_%02d", lcd->lcd_index); - - ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */ - CDEBUG(D_HA, "%s started\n", cfs_curproc_comm()); - - do { - struct ptlrpc_request *request; - struct list_head *sending_list; - int rc = 0; - - if (import) - class_import_put(import); - import = NULL; - - /* If we do not have enough pages available, allocate some */ - while (atomic_read(&lcm->lcm_llcd_numfree) < - lcm->lcm_llcd_minfree) { - if (llcd_alloc(lcm) < 0) - break; - } - - spin_lock(&lcm->lcm_thread_lock); - atomic_inc(&lcm->lcm_thread_numidle); - list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_idle); - spin_unlock(&lcm->lcm_thread_lock); - - wait_event_interruptible(lcm->lcm_waitq, - !list_empty(&lcm->lcm_llcd_pending) || - lcm->lcm_flags & LLOG_LCM_FL_EXIT); - - /* If we are the last available thread, start a new one in case - * we get blocked on an RPC (nobody else will start a new one)*/ - spin_lock(&lcm->lcm_thread_lock); - atomic_dec(&lcm->lcm_thread_numidle); - list_move(&lcd->lcd_lcm_list, &lcm->lcm_thread_busy); - spin_unlock(&lcm->lcm_thread_lock); - - sending_list = &lcm->lcm_llcd_pending; - resend: - if (import) - class_import_put(import); - import = NULL; - if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) { - lcm->lcm_llcd_maxfree = 0; - lcm->lcm_llcd_minfree = 0; - lcm->lcm_thread_max = 0; - - if (list_empty(&lcm->lcm_llcd_pending) || - lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) - break; - } + struct llog_commit_master *lcm; + struct llog_canceld_ctxt *llcd; - if (atomic_read(&lcm->lcm_thread_numidle) <= 1 && - atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) { - rc = llog_start_commit_thread(lcm); - if (rc < 0) - CERROR("error starting thread: rc %d\n", rc); - } + LASSERT(ctxt != NULL); + LASSERT_SEM_LOCKED(&ctxt->loc_sem); - /* Move all of the pending cancels from the same OST off of - * the list, so we don't get multiple threads blocked and/or - * doing upcalls on the same OST in case of failure. */ - spin_lock(&lcm->lcm_llcd_lock); - if (!list_empty(sending_list)) { - list_move_tail(sending_list->next, - &lcd->lcd_llcd_list); - llcd = list_entry(lcd->lcd_llcd_list.next, - typeof(*llcd), llcd_list); - LASSERT(llcd->llcd_lcm == lcm); - import = llcd->llcd_ctxt->loc_imp; - if (import) - class_import_get(import); - } - list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { - LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_ctxt->loc_imp) - list_move_tail(&llcd->llcd_list, - &lcd->lcd_llcd_list); - } - if (sending_list != &lcm->lcm_llcd_resend) { - list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend, - llcd_list) { - LASSERT(llcd->llcd_lcm == lcm); - if (import == llcd->llcd_ctxt->loc_imp) - list_move_tail(&llcd->llcd_list, - &lcd->lcd_llcd_list); - } - } - spin_unlock(&lcm->lcm_llcd_lock); - - /* We are the only one manipulating our local list - no lock */ - list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ - int size[2] = { sizeof(struct ptlrpc_body), - llcd->llcd_cookiebytes }; - char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; - - list_del(&llcd->llcd_list); - if (llcd->llcd_cookiebytes == 0) { - CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n", - llcd, llcd->llcd_ctxt); - llcd_put(llcd); - continue; - } - - mutex_down(&llcd->llcd_ctxt->loc_sem); - if (llcd->llcd_ctxt->loc_imp == NULL) { - mutex_up(&llcd->llcd_ctxt->loc_sem); - CWARN("import will be destroyed, put " - "llcd %p:%p\n", llcd, llcd->llcd_ctxt); - llcd_put(llcd); - continue; - } - mutex_up(&llcd->llcd_ctxt->loc_sem); - - if (!import || (import == LP_POISON) || - (import->imp_client == LP_POISON)) { - CERROR("No import %p (llcd=%p, ctxt=%p)\n", - import, llcd, llcd->llcd_ctxt); - llcd_put(llcd); - continue; - } - - OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); - - request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION, - OBD_LOG_CANCEL, 2, size,bufs); - if (request == NULL) { - rc = -ENOMEM; - CERROR("error preparing commit: rc %d\n", rc); - - spin_lock(&lcm->lcm_llcd_lock); - list_splice_init(&lcd->lcd_llcd_list, - &lcm->lcm_llcd_resend); - spin_unlock(&lcm->lcm_llcd_lock); - break; - } - - /* bug 5515 */ - request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; - request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; - ptlrpc_at_set_req_timeout(request); - - ptlrpc_req_set_repsize(request, 1, NULL); - mutex_down(&llcd->llcd_ctxt->loc_sem); - if (llcd->llcd_ctxt->loc_imp == NULL) { - mutex_up(&llcd->llcd_ctxt->loc_sem); - CWARN("import will be destroyed, put " - "llcd %p:%p\n", llcd, llcd->llcd_ctxt); - llcd_put(llcd); - ptlrpc_req_finished(request); - continue; - } - mutex_up(&llcd->llcd_ctxt->loc_sem); - rc = ptlrpc_queue_wait(request); - ptlrpc_req_finished(request); - - /* If the RPC failed, we put this and the remaining - * messages onto the resend list for another time. */ - if (rc == 0) { - llcd_put(llcd); - continue; - } - - CERROR("commit %p:%p drop %d cookies: rc %d\n", - llcd, llcd->llcd_ctxt, - (int)(llcd->llcd_cookiebytes / - sizeof(*llcd->llcd_cookies)), rc); - llcd_put(llcd); - } + llcd = ctxt->loc_llcd; + if (!llcd) + return NULL; - if (rc == 0) { - sending_list = &lcm->lcm_llcd_resend; - if (!list_empty(sending_list)) - goto resend; - } - } while(1); + lcm = ctxt->loc_lcm; + if (atomic_read(&lcm->lcm_count) == 0) { + CERROR("Invalid detach occured %p:%p\n", ctxt, llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); + } + atomic_dec(&lcm->lcm_count); + ctxt->loc_llcd = NULL; + + CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n", + llcd, ctxt, atomic_read(&lcm->lcm_count)); - if (import) - class_import_put(import); + llog_ctxt_put(ctxt); + return llcd; +} - /* If we are force exiting, just drop all of the cookies. */ - if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) { - spin_lock(&lcm->lcm_llcd_lock); - list_splice_init(&lcm->lcm_llcd_pending, &lcd->lcd_llcd_list); - list_splice_init(&lcm->lcm_llcd_resend, &lcd->lcd_llcd_list); - list_splice_init(&lcm->lcm_llcd_free, &lcd->lcd_llcd_list); - spin_unlock(&lcm->lcm_llcd_lock); +/** + * Return @llcd cached in @ctxt. Allocate new one if required. Attach it + * to ctxt so that it may be used for gathering cookies and sending. + */ +static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) +{ + struct llog_canceld_ctxt *llcd; - list_for_each_entry_safe(llcd, n, &lcd->lcd_llcd_list,llcd_list) - llcd_put(llcd); + llcd = llcd_alloc(); + if (!llcd) { + CERROR("Couldn't alloc an llcd for ctxt %p\n", ctxt); + return NULL; } + llcd_attach(ctxt, llcd); + return llcd; +} +/** + * Deatch llcd from its @ctxt. Free llcd. + */ +static void llcd_put(struct llog_ctxt *ctxt) +{ + struct llog_commit_master *lcm; + struct llog_canceld_ctxt *llcd; - CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm()); - - spin_lock(&lcm->lcm_thread_lock); - list_del(&lcd->lcd_lcm_list); - spin_unlock(&lcm->lcm_thread_lock); - OBD_FREE_PTR(lcd); - llog_lcm_dec(lcm); + lcm = ctxt->loc_lcm; + llcd = llcd_detach(ctxt); + if (llcd) + llcd_free(llcd); - RETURN(0); + if (atomic_read(&lcm->lcm_count) == 0) + cfs_waitq_signal(&lcm->lcm_waitq); } -int llog_start_commit_thread(struct llog_commit_master *lcm) +/** + * Detach llcd from its @ctxt so that nobody will find it with try to + * re-use. Send llcd to remote node. + */ +static int llcd_push(struct llog_ctxt *ctxt) { - struct llog_commit_daemon *lcd; - int rc, index; - ENTRY; + struct llog_canceld_ctxt *llcd; + int rc; - if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max) - RETURN(0); - - /* Check whether it will be cleanup llog commit thread first, - * If not, increate the lcm_thread_total count to prevent the - * lcm being freed when the log_commit_thread is started */ - spin_lock(&lcm->lcm_thread_lock); - if (!lcm->lcm_flags & LLOG_LCM_FL_EXIT) { - atomic_inc(&lcm->lcm_thread_total); - index = atomic_read(&lcm->lcm_thread_total); - spin_unlock(&lcm->lcm_thread_lock); - } else { - spin_unlock(&lcm->lcm_thread_lock); - RETURN(0); + /* + * Make sure that this llcd will not be sent again as we detach + * it from ctxt. + */ + llcd = llcd_detach(ctxt); + if (!llcd) { + CERROR("Invalid detached llcd found %p\n", llcd); + llcd_print(llcd, __FUNCTION__, __LINE__); + LBUG(); } + + rc = llcd_send(llcd); + if (rc) + CERROR("Couldn't send llcd %p (%d)\n", llcd, rc); + return rc; +} - OBD_ALLOC_PTR(lcd); - if (lcd == NULL) - GOTO(cleanup, rc = -ENOMEM); - - CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list); - CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list); - lcd->lcd_index = index; - lcd->lcd_lcm = lcm; +/** + * Start recovery thread which actually deals llcd sending. This + * is all ptlrpc standard thread based so there is not much of work + * to do. + */ +int llog_recov_thread_start(struct llog_commit_master *lcm) +{ + int rc; + ENTRY; - rc = cfs_kernel_thread(log_commit_thread, lcd, CLONE_VM | CLONE_FILES); -cleanup: - if (rc < 0) { - CERROR("error starting thread #%d: %d\n", lcd->lcd_index, rc); - llog_lcm_dec(lcm); - if (lcd) - OBD_FREE_PTR(lcd); + rc = ptlrpcd_start(lcm->lcm_name, &lcm->lcm_pc); + if (rc) { + CERROR("Error %d while starting recovery thread %s\n", + rc, lcm->lcm_name); RETURN(rc); } - RETURN(0); -} -EXPORT_SYMBOL(llog_start_commit_thread); - -static struct llog_process_args { - struct semaphore llpa_sem; - struct llog_ctxt *llpa_ctxt; - void *llpa_cb; - void *llpa_arg; -} llpa; - -int llog_init_commit_master(struct llog_commit_master *lcm) -{ - CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy); - CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle); - spin_lock_init(&lcm->lcm_thread_lock); - atomic_set(&lcm->lcm_thread_numidle, 0); - cfs_waitq_init(&lcm->lcm_waitq); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend); - CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free); - spin_lock_init(&lcm->lcm_llcd_lock); - atomic_set(&lcm->lcm_llcd_numfree, 0); - lcm->lcm_llcd_minfree = 0; - lcm->lcm_thread_max = 5; - /* FIXME initialize semaphore for llog_process_args */ - sema_init(&llpa.llpa_sem, 1); - return 0; + lcm->lcm_set = lcm->lcm_pc.pc_set; + RETURN(rc); } -EXPORT_SYMBOL(llog_init_commit_master); +EXPORT_SYMBOL(llog_recov_thread_start); -int llog_cleanup_commit_master(struct llog_commit_master *lcm, - int force) +/** + * Stop recovery thread. Complement to llog_recov_thread_start(). + */ +void llog_recov_thread_stop(struct llog_commit_master *lcm, int force) { - spin_lock(&lcm->lcm_thread_lock); - lcm->lcm_flags |= LLOG_LCM_FL_EXIT; - if (force) - lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE; - - spin_unlock(&lcm->lcm_thread_lock); - - cfs_waitq_signal(&lcm->lcm_waitq); + struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + ENTRY; - wait_event_interruptible(lcm->lcm_waitq, - atomic_read(&lcm->lcm_thread_total) == 0); - return 0; + /** + * Let all know that we're stopping. This will also make + * llcd_send() refuse any new llcds. + */ + set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags); + + /** + * Stop processing thread. No new rpcs will be accepted for + * for processing now. + */ + ptlrpcd_stop(&lcm->lcm_pc, force); + + /* + * Wait for llcd number == 0. Note, this is infinite wait. + * All other parts should make sure that no lost llcd is left. + */ + l_wait_event(lcm->lcm_waitq, + atomic_read(&lcm->lcm_count) == 0, &lwi); + EXIT; } -EXPORT_SYMBOL(llog_cleanup_commit_master); +EXPORT_SYMBOL(llog_recov_thread_stop); -static int log_process_thread(void *args) +/** + * Initialize commit master structure and start recovery thread on it. + */ +struct llog_commit_master *llog_recov_thread_init(char *name) { - struct llog_process_args *data = args; - struct llog_ctxt *ctxt = data->llpa_ctxt; - void *cb = data->llpa_cb; - struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg); - struct llog_handle *llh = NULL; + struct llog_commit_master *lcm; int rc; ENTRY; - mutex_up(&data->llpa_sem); - ptlrpc_daemonize("llog_process"); /* thread does IO to log files */ + OBD_ALLOC_PTR(lcm); + if (!lcm) + RETURN(NULL); - rc = llog_create(ctxt, &llh, &logid, NULL); + /* + * Try to create threads with unique names. + */ + snprintf(lcm->lcm_name, sizeof(lcm->lcm_name), + "ll_log_commit_%s", name); + + strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name)); + cfs_waitq_init(&lcm->lcm_waitq); + atomic_set(&lcm->lcm_count, 0); + rc = llog_recov_thread_start(lcm); if (rc) { - CERROR("llog_create failed %d\n", rc); + CERROR("Can't start commit thread, rc %d\n", rc); GOTO(out, rc); } - rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); - if (rc) { - CERROR("llog_init_handle failed %d\n", rc); - GOTO(release_llh, rc); - } - - if (cb) { - rc = llog_cat_process(llh, (llog_cb_t)cb, NULL); - if (rc != LLOG_PROC_BREAK) - CERROR("llog_cat_process failed %d\n", rc); - } else { - CWARN("no callback function for recovery\n"); - } - - CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n", - ctxt->loc_llcd, ctxt); - llog_sync(ctxt, NULL); - -release_llh: - rc = llog_cat_put(llh); - if (rc) - CERROR("llog_cat_put failed %d\n", rc); + RETURN(lcm); out: - llog_ctxt_put(ctxt); - RETURN(rc); + OBD_FREE_PTR(lcm); + return NULL; } +EXPORT_SYMBOL(llog_recov_thread_init); -static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg) +/** + * Finalize commit master and its recovery thread. + */ +void llog_recov_thread_fini(struct llog_commit_master *lcm, int force) +{ + ENTRY; + llog_recov_thread_stop(lcm, force); + OBD_FREE_PTR(lcm); + EXIT; +} +EXPORT_SYMBOL(llog_recov_thread_fini); + +static int llog_recov_thread_replay(struct llog_ctxt *ctxt, + void *cb, void *arg) { struct obd_device *obd = ctxt->loc_obd; + struct llog_process_cat_args *lpca; int rc; ENTRY; if (obd->obd_stopping) RETURN(-ENODEV); - mutex_down(&llpa.llpa_sem); - llpa.llpa_cb = handle; - llpa.llpa_arg = arg; - llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx); - if (!llpa.llpa_ctxt) { - mutex_up(&llpa.llpa_sem); + /* + * This will be balanced in llog_cat_process_thread() + */ + OBD_ALLOC_PTR(lpca); + if (!lpca) + RETURN(-ENOMEM); + + lpca->lpca_cb = cb; + lpca->lpca_arg = arg; + + /* + * This will be balanced in llog_cat_process_thread() + */ + lpca->lpca_ctxt = llog_ctxt_get(ctxt); + if (!lpca->lpca_ctxt) { + OBD_FREE_PTR(lpca); RETURN(-ENODEV); } - rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES); + rc = cfs_kernel_thread(llog_cat_process_thread, lpca, + CLONE_VM | CLONE_FILES); if (rc < 0) { + CERROR("Error starting llog_cat_process_thread(): %d\n", rc); + OBD_FREE_PTR(lpca); llog_ctxt_put(ctxt); - CERROR("error starting log_process_thread: %d\n", rc); } else { - CDEBUG(D_HA, "log_process_thread: %d\n", rc); + CDEBUG(D_HA, "Started llog_cat_process_thread(): %d\n", rc); rc = 0; } RETURN(rc); } -int llog_repl_connect(struct llog_ctxt *ctxt, int count, - struct llog_logid *logid, struct llog_gen *gen, - struct obd_uuid *uuid) +int llog_obd_repl_connect(struct llog_ctxt *ctxt, int count, + struct llog_logid *logid, struct llog_gen *gen, + struct obd_uuid *uuid) { - struct llog_canceld_ctxt *llcd; int rc; ENTRY; - /* send back llcd before recovery from llog */ - if (ctxt->loc_llcd != NULL) { - CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt); - llog_sync(ctxt, NULL); - } + /* + * Send back cached llcd from llog before recovery if we have any. + * This is void is nothing cached is found there. + */ + llog_sync(ctxt, NULL); + /* + * Start recovery in separate thread. + */ mutex_down(&ctxt->loc_sem); ctxt->loc_gen = *gen; - llcd = ctxt_llcd_grab(ctxt); - if (llcd == NULL) { - CERROR("couldn't get an llcd\n"); - mutex_up(&ctxt->loc_sem); - RETURN(-ENOMEM); - } + rc = llog_recov_thread_replay(ctxt, ctxt->llog_proc_cb, logid); mutex_up(&ctxt->loc_sem); - rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); - if (rc != 0) { - ctxt_llcd_put(ctxt); - CERROR("error recovery process: %d\n", rc); + RETURN(rc); +} +EXPORT_SYMBOL(llog_obd_repl_connect); + +/** + * Deleted objects have a commit callback that cancels the MDS + * log record for the deletion. The commit callback calls this + * function. + */ +int llog_obd_repl_cancel(struct llog_ctxt *ctxt, + struct lov_stripe_md *lsm, int count, + struct llog_cookie *cookies, int flags) +{ + struct llog_commit_master *lcm; + struct llog_canceld_ctxt *llcd; + int rc = 0; + ENTRY; + + LASSERT(ctxt != NULL); + + mutex_down(&ctxt->loc_sem); + lcm = ctxt->loc_lcm; + + /* + * Let's check if we have all structures alive. We also check for + * possible shutdown. Do nothing if we're stopping. + */ + if (ctxt->loc_imp == NULL) { + CDEBUG(D_RPCTRACE, "No import for ctxt %p\n", ctxt); + GOTO(out, rc = -ENODEV); + } + + if (ctxt->loc_obd->obd_stopping) { + CDEBUG(D_RPCTRACE, "Obd is stopping for ctxt %p\n", ctxt); + GOTO(out, rc = -ENODEV); } + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) { + CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n", + ctxt); + GOTO(out, rc = -ENODEV); + } + + llcd = ctxt->loc_llcd; + + if (count > 0 && cookies != NULL) { + /* + * Get new llcd from ctxt if required. + */ + if (!llcd) { + llcd = llcd_get(ctxt); + if (!llcd) + GOTO(out, rc = -ENOMEM); + /* + * Allocation is successful, let's check for stop + * flag again to fall back as soon as possible. + */ + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + GOTO(out, rc = -ENODEV); + } + + /* + * Llcd does not have enough room for @cookies. Let's push + * it out and allocate new one. + */ + if (!llcd_fit(llcd, cookies)) { + rc = llcd_push(ctxt); + if (rc) + GOTO(out, rc); + llcd = llcd_get(ctxt); + if (!llcd) + GOTO(out, rc = -ENOMEM); + /* + * Allocation is successful, let's check for stop + * flag again to fall back as soon as possible. + */ + if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) + GOTO(out, rc = -ENODEV); + } + + /* + * Copy cookies to @llcd, no matter old or new allocated one. + */ + llcd_copy(llcd, cookies); + } + + /* + * Let's check if we need to send copied @cookies asap. If yes - do it. + */ + if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) { + rc = llcd_push(ctxt); + if (rc) + GOTO(out, rc); + } + EXIT; +out: + if (rc) + llcd_put(ctxt); + mutex_up(&ctxt->loc_sem); + return rc; +} +EXPORT_SYMBOL(llog_obd_repl_cancel); + +int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) +{ + int rc = 0; + ENTRY; + + mutex_down(&ctxt->loc_sem); + if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { + CDEBUG(D_RPCTRACE, "Reverse import disconnect\n"); + /* + * Check for llcd which might be left attached to @ctxt. + * Let's kill it. + */ + llcd_put(ctxt); + mutex_up(&ctxt->loc_sem); + } else { + mutex_up(&ctxt->loc_sem); + rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); + } RETURN(rc); } -EXPORT_SYMBOL(llog_repl_connect); +EXPORT_SYMBOL(llog_obd_repl_sync); #else /* !__KERNEL__ */ @@ -694,3 +647,43 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, return 0; } #endif + +/** + * Module init time fucntion. Initializes slab for llcd objects. + */ +int llog_recov_init(void) +{ + int llcd_size; + + llcd_size = CFS_PAGE_SIZE - + lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); + llcd_size += offsetof(struct llog_canceld_ctxt, llcd_cookies); + llcd_cache = cfs_mem_cache_create("llcd_cache", llcd_size, 0, 0); + if (!llcd_cache) { + CERROR("Error allocating llcd cache\n"); + return -ENOMEM; + } + return 0; +} + +/** + * Module fini time fucntion. Releases slab for llcd objects. + */ +void llog_recov_fini(void) +{ + /* + * Kill llcd cache when thread is stopped and we're sure no + * llcd in use left. + */ + if (llcd_cache) { + /* + * In 2.6.22 cfs_mem_cache_destroy() will not return error + * for busy resources. Let's check it another way. + */ + LASSERTF(atomic_read(&llcd_count) == 0, + "Can't destroy llcd cache! Number of " + "busy llcds: %d\n", atomic_read(&llcd_count)); + cfs_mem_cache_destroy(llcd_cache); + llcd_cache = NULL; + } +}