diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index bc2cf94682a504979f4418b7019f675ac71abc1f..2e18afe77bdef135e57d4664d9b5603453cf9c58 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -1047,7 +1047,7 @@ struct llog_cookie { struct llog_logid lgc_lgl; __u32 lgc_subsys; __u32 lgc_index; - __u32 lgc_padding; + __u32 lgc_flags; } __attribute__((packed)); /* llog protocol */ diff --git a/lustre/include/linux/lustre_log.h b/lustre/include/linux/lustre_log.h index ce42b652e62744370bf85974baaed908595e9b42..aebde2cf73c3a17ad855a4fc98bbab6f77ce3d08 100644 --- a/lustre/include/linux/lustre_log.h +++ b/lustre/include/linux/lustre_log.h @@ -99,7 +99,8 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res, struct llog_logid *logid); int llog_cat_put(struct llog_handle *cathandle); int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, void *buf); + struct llog_cookie *reccookie, void *buf, + struct rw_semaphore **lock, int *lock_count); int llog_cat_cancel_records(struct llog_handle *cathandle, int count, struct llog_cookie *cookies); int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data); @@ -108,7 +109,8 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index); /* llog_obd.c */ int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, - void *buf, struct llog_cookie *reccookie, int, void *data); + void *buf, struct llog_cookie *reccookie, int, void *data, + struct rw_semaphore **lock, int *lock_count); int llog_catalog_cancel(struct llog_ctxt *ctxt, int count, struct llog_cookie *, int flags, void *data); int llog_catalog_setup(struct llog_ctxt **res, char *name, struct obd_export *exp, @@ -126,10 +128,6 @@ extern struct llog_operations llog_lvfs_ops; int llog_obd_origin_setup(struct obd_device *, struct obd_llogs *, int, struct obd_device *, int, struct llog_logid *); -int llog_obd_origin_cleanup(struct llog_ctxt *ctxt); -int llog_obd_origin_add(struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies); int obd_llog_cat_initialize(struct obd_device *, struct obd_llogs *, int, char *); @@ -178,7 +176,8 @@ struct llog_operations { int (*lop_read_header)(struct llog_handle *handle); int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, void *buf, struct llog_cookie *logcookies, - int numcookies, void *data); + int numcookies, void *data, struct rw_semaphore **lock, + int *lock_count); int (*lop_cancel)(struct llog_ctxt *ctxt, int count, struct llog_cookie *cookies, int flags, void *data); int (*lop_write_rec)(struct llog_handle *loghandle, @@ -219,6 +218,11 @@ struct llog_ctxt { struct obd_llogs *loc_llogs; }; +struct llog_create_locks { + int lcl_count; + struct rw_semaphore *lcl_locks[0]; +}; + static inline void llog_gen_init(struct llog_ctxt *ctxt) { struct obd_device *obd = ctxt->loc_exp->exp_obd; @@ -244,6 +248,36 @@ static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b) #define LLOG_PROC_BREAK 0x0001 #define LLOG_DEL_RECORD 0x0002 +#define EQ_LOGID(a, b) (a.lgl_oid == b.lgl_oid && \ + a.lgl_ogr == b.lgl_ogr && \ + a.lgl_ogen == b.lgl_ogen) + +/* Flags are in the bottom 16 bit */ +#define LLOG_COOKIE_FLAG_MASK 0x0000ffff +#define LLOG_COOKIE_REPLAY_NEW 0x00000001 +#define LLOG_COOKIE_REPLAY 0x00000002 + +static inline int llog_cookie_get_flags(struct llog_cookie *logcookie) +{ + return(logcookie->lgc_flags & LLOG_COOKIE_FLAG_MASK); +} + +static inline void llog_cookie_add_flags(struct llog_cookie *logcookie, int flags) +{ + logcookie->lgc_flags |= LLOG_COOKIE_FLAG_MASK & flags; +} + +static inline void llog_cookie_set_flags(struct llog_cookie *logcookie, int flags) +{ + logcookie->lgc_flags &= ~LLOG_COOKIE_FLAG_MASK; + llog_cookie_add_flags(logcookie, flags); +} + +static inline void llog_cookie_clear_flags(struct llog_cookie *logcookie, int flags) +{ + logcookie->lgc_flags &= ~(LLOG_COOKIE_FLAG_MASK & flags); +} + static inline int llog_ctxt2ops(struct llog_ctxt *ctxt, struct llog_operations **lop) { @@ -331,7 +365,8 @@ static inline int llog_read_header(struct llog_handle *handle) static inline int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, void *buf, struct llog_cookie *logcookies, - int numcookies, void *data) + int numcookies, void *data, + struct rw_semaphore **lock, int *lock_count) { struct llog_operations *lop; int rc; @@ -343,7 +378,8 @@ static inline int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, if (lop->lop_add == NULL) RETURN(-EOPNOTSUPP); - rc = lop->lop_add(ctxt, rec, buf, logcookies, numcookies, data); + rc = lop->lop_add(ctxt, rec, buf, logcookies, numcookies, data, + lock, lock_count); RETURN(rc); } @@ -460,4 +496,22 @@ static inline int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp) RETURN(rc); } +static inline void llog_create_lock_free(struct llog_create_locks *lcl) +{ + int size, offset = offsetof(struct llog_create_locks, lcl_locks); + int i; + ENTRY; + + for (i = 0; i < lcl->lcl_count; i ++) { + if (lcl->lcl_locks[i] != NULL) { +#ifdef __KERNEL__ + LASSERT(down_write_trylock(lcl->lcl_locks[i]) == 0); +#endif + up_write(lcl->lcl_locks[i]); + } + } + size = offset + sizeof(struct rw_semaphore *) * lcl->lcl_count; + OBD_FREE(lcl, size); +} + #endif diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 81a652e8fe1103ccc724ec93ac2e5c15f074c746..f94bd90f980f85899a23b246b53b3493354eb9a7 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -274,6 +274,8 @@ struct ptlrpc_reply_state { int rs_nlocks; struct lustre_handle rs_locks[RS_MAX_LOCKS]; ldlm_mode_t rs_modes[RS_MAX_LOCKS]; + struct llog_create_locks *rs_llog_locks; + /* last member: variable sized reply message */ struct lustre_msg rs_msg; }; @@ -639,6 +641,8 @@ __u64 ptlrpc_next_xid(void); /* ptlrpc/service.c */ void ptlrpc_save_lock (struct ptlrpc_request *req, struct lustre_handle *lock, int mode); +void ptlrpc_save_llog_lock (struct ptlrpc_request *req, + struct llog_create_locks *lcl); void ptlrpc_commit_replies (struct obd_device *obd); void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs); struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 7e8cc0d04f64e5429526ba053cfa3a7920af4e63..bebd70ecbd4bbb1d054c94efabbf91d762af4ff2 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -74,7 +74,7 @@ int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode, inode->i_ino, rc); } if (rc == 0) { - rc = ll_objects_destroy(req, file->f_dentry->d_inode); + rc = ll_objects_destroy(req, file->f_dentry->d_inode, 1); if (rc) CERROR("inode %lu ll_objects destroy: rc = %d\n", inode->i_ino, rc); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 03a413554e0e2510e5052935890c3bd598400136..da6bb98df9a88ed7531669d37c35f0ef76da5ef5 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -130,7 +130,8 @@ extern struct file_operations ll_dir_operations; extern struct inode_operations ll_dir_inode_operations; /* llite/namei.c */ -int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir); +int ll_objects_destroy(struct ptlrpc_request *request, + struct inode *dir, int offset); struct inode *ll_iget(struct super_block *sb, ino_t hash, struct lustre_md *lic); struct dentry *ll_find_alias(struct inode *, struct dentry *); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 61cf15b6ca697db84739a2ee7577e3c4ac80943b..6761a831962ab50fc9f33fb2c39cd297b560fa9f 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -683,7 +683,8 @@ static int ll_rmdir_raw(struct nameidata *nd) RETURN(rc); } -int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir) +int ll_objects_destroy(struct ptlrpc_request *request, + struct inode *dir, int offset) { struct mds_body *body; struct lov_mds_md *eadata; @@ -732,14 +733,22 @@ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir) oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP; if (body->valid & OBD_MD_FLCOOKIE) { + int length = sizeof(struct llog_cookie) * + lsm->lsm_stripe_count; oa->o_valid |= OBD_MD_FLCOOKIE; oti.oti_logcookies = - lustre_msg_buf(request->rq_repmsg, 2, - sizeof(struct llog_cookie) * - lsm->lsm_stripe_count); + lustre_msg_buf(request->rq_repmsg, 2, length); if (oti.oti_logcookies == NULL) { oa->o_valid &= ~OBD_MD_FLCOOKIE; body->valid &= ~OBD_MD_FLCOOKIE; + } else { + /* copy llog cookies to request to replay unlink + * so that the same llog file and records as those created + * during fail can be re-created while doing replay + */ + if (offset >= 0) + memcpy(lustre_msg_buf(request->rq_reqmsg, offset, 0), + oti.oti_logcookies, length); } } @@ -771,7 +780,7 @@ static int ll_unlink_raw(struct nameidata *nd) if (rc) GOTO(out, rc); - rc = ll_objects_destroy(request, dir); + rc = ll_objects_destroy(request, dir, 2); out: ptlrpc_req_finished(request); RETURN(rc); @@ -798,7 +807,7 @@ static int ll_rename_raw(struct nameidata *oldnd, struct nameidata *newnd) err = md_rename(sbi->ll_mdc_exp, &op_data, oldname, oldlen, newname, newlen, &request); if (!err) { - err = ll_objects_destroy(request, src); + err = ll_objects_destroy(request, src, 3); } ptlrpc_req_finished(request); diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index 3320032f0952d7b57cb21b69dac206344d08e88a..c6174d7bda7f9e5d3a347631cdd50923e21e7c2b 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -58,7 +58,8 @@ * Unset cookies should be all-zero (which will never occur naturally). */ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, void *buf, struct llog_cookie *logcookies, - int numcookies, void *data) + int numcookies, void *data, + struct rw_semaphore **lock, int *lock_count) { struct obd_device *obd = ctxt->loc_obd; struct lov_obd *lov = &obd->u.lov; @@ -86,7 +87,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, lur->lur_ogen = loi->loi_gr; LASSERT(lsm->lsm_object_gr == loi->loi_gr); rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc, - numcookies - rc, NULL); + numcookies - rc, NULL, + lock != NULL ? lock + rc : NULL, lock_count); } OBD_FREE(lur, sizeof(*lur)); diff --git a/lustre/lvfs/llog_cat.c b/lustre/lvfs/llog_cat.c index 307713a502599be21aaea51167fc78182e91df05..3837b4da5d7f18d3c24540efbdf11ca4163f1a6d 100644 --- a/lustre/lvfs/llog_cat.c +++ b/lustre/lvfs/llog_cat.c @@ -46,7 +46,8 @@ * * Assumes caller has already pushed us into the kernel context and is locking. */ -static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) +static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle, + struct llog_cookie *logcookie) { struct llog_handle *loghandle; struct llog_log_hdr *llh; @@ -76,8 +77,12 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) llh->llh_tail.lrt_index = cpu_to_le32(index); } - rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL, - OBD_LLOG_FL_CREATE); + if (logcookie && llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW) + rc = llog_open(cathandle->lgh_ctxt, &loghandle, + &logcookie->lgc_lgl, NULL, OBD_LLOG_FL_CREATE); + else + rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL, + OBD_LLOG_FL_CREATE); if (rc) { CERROR("cannot create new log, error = %d\n", rc); RETURN(ERR_PTR(rc)); @@ -112,8 +117,14 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle) list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); out_destroy: - if (rc < 0) + if (rc < 0) llog_destroy(loghandle); + else if (logcookie) { + if (llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW) + LASSERT(EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl)); + else + llog_cookie_set_flags(logcookie, LLOG_COOKIE_REPLAY_NEW); + } RETURN(loghandle); } @@ -201,7 +212,9 @@ EXPORT_SYMBOL(llog_cat_put); * NOTE: loghandle is write-locked upon successful return */ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, - int create) + int create, + struct llog_cookie *logcookie, + struct rw_semaphore **lock) { struct llog_handle *loghandle = NULL; ENTRY; @@ -210,12 +223,20 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, loghandle = cathandle->u.chd.chd_current_log; if (loghandle) { struct llog_log_hdr *llh = loghandle->lgh_hdr; - if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) { + if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1 && + (!logcookie || + !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) || + EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) { down_write(&loghandle->lgh_lock); up_read(&cathandle->lgh_lock); RETURN(loghandle); } } + + LASSERT(!logcookie || + !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) || + llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW); + if (!create) { if (loghandle) down_write(&loghandle->lgh_lock); @@ -231,7 +252,10 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, loghandle = cathandle->u.chd.chd_current_log; if (loghandle) { struct llog_log_hdr *llh = loghandle->lgh_hdr; - if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) { + if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1 && + (!logcookie || + !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) || + EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) { down_write(&loghandle->lgh_lock); up_write(&cathandle->lgh_lock); RETURN(loghandle); @@ -239,9 +263,13 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, } CDEBUG(D_INODE, "creating new log\n"); - loghandle = llog_cat_new_log(cathandle); - if (!IS_ERR(loghandle)) + loghandle = llog_cat_new_log(cathandle, logcookie); + if (!IS_ERR(loghandle)) { down_write(&loghandle->lgh_lock); + if (lock != NULL) + *lock = &loghandle->lgh_lock; + } + up_write(&cathandle->lgh_lock); RETURN(loghandle); } @@ -252,19 +280,25 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, * Assumes caller has already pushed us into the kernel context. */ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, void *buf) + struct llog_cookie *reccookie, void *buf, + struct rw_semaphore **lock, int *lock_count) { struct llog_handle *loghandle; int rc; ENTRY; LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE); - loghandle = llog_cat_current_log(cathandle, 1); + loghandle = llog_cat_current_log(cathandle, 1, reccookie, lock); if (IS_ERR(loghandle)) RETURN(PTR_ERR(loghandle)); /* loghandle is already locked by llog_cat_current_log() for us */ rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1); - up_write(&loghandle->lgh_lock); + if (!lock || *lock == NULL) { + up_write(&loghandle->lgh_lock); + } else { + LASSERT(lock_count != NULL); + *lock_count += 1; + } RETURN(rc); } @@ -480,7 +514,8 @@ EXPORT_SYMBOL(llog_cat_set_first_idx); int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, void *buf, struct llog_cookie *logcookies, - int numcookies, void *data) + int numcookies, void *data, + struct rw_semaphore **lock, int *lock_count) { struct llog_handle *cathandle; int rc; @@ -489,7 +524,7 @@ int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, cathandle = ctxt->loc_handle; LASSERT(cathandle != NULL); - rc = llog_cat_add_rec(cathandle, rec, logcookies, buf); + rc = llog_cat_add_rec(cathandle, rec, logcookies, buf, lock, lock_count); if (rc != 1) CERROR("write one catalog record failed: %d\n", rc); RETURN(rc); @@ -577,42 +612,17 @@ EXPORT_SYMBOL(llog_catalog_setup); int llog_catalog_cleanup(struct llog_ctxt *ctxt) { - struct llog_handle *cathandle, *n, *loghandle; - struct llog_log_hdr *llh; - int rc, index; + struct llog_handle *cathandle; ENTRY; if (!ctxt) return 0; cathandle = ctxt->loc_handle; - if (cathandle) { - list_for_each_entry_safe(loghandle, n, - &cathandle->u.chd.chd_head, - u.phd.phd_entry) { - llh = loghandle->lgh_hdr; - if ((le32_to_cpu(llh->llh_flags) & - LLOG_F_ZAP_WHEN_EMPTY) && - (le32_to_cpu(llh->llh_count) == 1)) { - rc = llog_destroy(loghandle); - if (rc) - CERROR("failure destroying log during " - "cleanup: %d\n", rc); - LASSERT(rc == 0); - index = loghandle->u.phd.phd_cookie.lgc_index; - llog_free_handle(loghandle); - - LASSERT(index); - llog_cat_set_first_idx(cathandle, index); - rc = llog_cancel_rec(cathandle, index); - if (rc == 0) - CDEBUG(D_HA, "cancel plain log at index" - " %u of catalog "LPX64"\n", - index,cathandle->lgh_id.lgl_oid); - } - } + if (cathandle) llog_cat_put(ctxt->loc_handle); - } + +// OBD_FREE(ctxt, sizeof(*ctxt)); return 0; } EXPORT_SYMBOL(llog_catalog_cleanup); diff --git a/lustre/lvfs/llog_lvfs.c b/lustre/lvfs/llog_lvfs.c index 6a4c74f07106f9544953b564c5635c85c85408e4..5f57b64ce2e6381ec4fcb9963c479eddbfd2440c 100644 --- a/lustre/lvfs/llog_lvfs.c +++ b/lustre/lvfs/llog_lvfs.c @@ -277,8 +277,15 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, RETURN(rc); if (rc == 0 && reccookie) { - reccookie->lgc_lgl = loghandle->lgh_id; - reccookie->lgc_index = index; + if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) { + LASSERT(EQ_LOGID(reccookie->lgc_lgl, loghandle->lgh_id)); + LASSERT(reccookie->lgc_index == index); + } else { + reccookie->lgc_lgl = loghandle->lgh_id; + reccookie->lgc_index = index; + llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY); + } + if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC) reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT; else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC) @@ -491,128 +498,228 @@ static struct file *llog_filp_open(char *name, int flags, int mode) static struct file * llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id) { - unsigned int tmpname = ll_insecure_random_int(); - char fidname[LL_FID_NAMELEN]; struct file *filp; - struct dentry *new_child, *parent; - void *handle; - int rc = 0, err, namelen; + int rc = 0; ENTRY; - sprintf(fidname, "OBJECTS/%u", tmpname); - filp = filp_open(fidname, O_CREAT | O_EXCL, 0644); - if (IS_ERR(filp)) { - rc = PTR_ERR(filp); - if (rc == -EEXIST) { - CERROR("impossible object name collision %u\n", - tmpname); - LBUG(); + LASSERT(lgh_id != NULL); + if (lgh_id->lgl_oid) { + struct dentry *dchild; + char fidname[LL_FID_NAMELEN]; + int fidlen = 0; + + down(&ctxt->loc_objects_dir->d_inode->i_sem); + fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen); + dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen); + if (IS_ERR(dchild)) { + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN((struct file *)dchild); } - CERROR("error creating tmp object %u: rc %d\n", tmpname, rc); + if (dchild->d_inode == NULL) { + struct dentry_params dp; + struct inode *inode; + + dchild->d_fsdata = (void *) &dp; + dp.p_ptr = NULL; + dp.p_inum = lgh_id->lgl_oid; + rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode, + dchild, S_IFREG, NULL); + if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid) + dchild->d_fsdata = NULL; + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + dput(dchild); + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN(ERR_PTR(rc)); + } + inode = dchild->d_inode; + LASSERT(inode->i_ino == lgh_id->lgl_oid); + inode->i_generation = lgh_id->lgl_ogen; + CDEBUG(D_HA, "recreated ino %lu with gen %u\n", + inode->i_ino, inode->i_generation); + mark_inode_dirty(inode); + } + + mntget(ctxt->loc_lvfs_ctxt->pwdmnt); + filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt, + O_RDWR | O_LARGEFILE); + if (IS_ERR(filp)) { + dput(dchild); + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN(filp); + } + if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) { + CERROR("%s is not a regular file!: mode = %o\n", fidname, + filp->f_dentry->d_inode->i_mode); + filp_close(filp, 0); + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN(ERR_PTR(-ENOENT)); + } + + up(&ctxt->loc_objects_dir->d_inode->i_sem); RETURN(filp); - } - namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino, - filp->f_dentry->d_inode->i_generation); - parent = filp->f_dentry->d_parent; - down(&parent->d_inode->i_sem); - new_child = lookup_one_len(fidname, parent, namelen); - if (IS_ERR(new_child)) { - CERROR("getting neg dentry for obj rename: %d\n", rc); - GOTO(out_close, rc = PTR_ERR(new_child)); - } - if (new_child->d_inode != NULL) { - CERROR("impossible non-negative obj dentry %lu:%u!\n", - filp->f_dentry->d_inode->i_ino, - filp->f_dentry->d_inode->i_generation); - LBUG(); - } + } else { + unsigned int tmpname = ll_insecure_random_int(); + char fidname[LL_FID_NAMELEN]; + struct dentry *new_child, *parent; + void *handle; + int err, namelen; - handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL); - if (IS_ERR(handle)) - GOTO(out_dput, rc = PTR_ERR(handle)); + sprintf(fidname, "OBJECTS/%u", tmpname); + filp = filp_open(fidname, O_CREAT | O_EXCL, 0644); + if (IS_ERR(filp)) { + rc = PTR_ERR(filp); + if (rc == -EEXIST) { + CERROR("impossible object name collision %u\n", + tmpname); + LBUG(); + } + CERROR("error creating tmp object %u: rc %d\n", tmpname, rc); + RETURN(filp); + } - lock_kernel(); - rc = vfs_rename(parent->d_inode, filp->f_dentry, - parent->d_inode, new_child); - unlock_kernel(); - if (rc) - CERROR("error renaming new object %lu:%u: rc %d\n", - filp->f_dentry->d_inode->i_ino, - filp->f_dentry->d_inode->i_generation, rc); + namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino, + filp->f_dentry->d_inode->i_generation); + parent = filp->f_dentry->d_parent; + down(&parent->d_inode->i_sem); + new_child = lookup_one_len(fidname, parent, namelen); + if (IS_ERR(new_child)) { + CERROR("getting neg dentry for obj rename: %d\n", rc); + GOTO(out_close, rc = PTR_ERR(new_child)); + } + if (new_child->d_inode != NULL) { + CERROR("impossible non-negative obj dentry %lu:%u!\n", + filp->f_dentry->d_inode->i_ino, + filp->f_dentry->d_inode->i_generation); + LBUG(); + } - err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0); - if (!rc) - rc = err; + handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL); + if (IS_ERR(handle)) + GOTO(out_dput, rc = PTR_ERR(handle)); -out_dput: - dput(new_child); -out_close: - up(&parent->d_inode->i_sem); - if (rc) { - filp_close(filp, 0); - filp = (struct file *)rc; - } else { - /* FIXME: is this group 1 is correct? */ - lgh_id->lgl_ogr = 1; - lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino; - lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation; + lock_kernel(); + rc = vfs_rename(parent->d_inode, filp->f_dentry, + parent->d_inode, new_child); + unlock_kernel(); + if (rc) + CERROR("error renaming new object %lu:%u: rc %d\n", + filp->f_dentry->d_inode->i_ino, + filp->f_dentry->d_inode->i_generation, rc); + + err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0); + if (!rc) + rc = err; + + out_dput: + dput(new_child); + out_close: + up(&parent->d_inode->i_sem); + if (rc) { + filp_close(filp, 0); + filp = ERR_PTR(rc); + } else { + /* FIXME: is this group 1 is correct? */ + lgh_id->lgl_ogr = 1; + lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino; + lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation; + } + RETURN(filp); } - - RETURN(filp); } /* creates object for generic case (obd exists) */ static struct file * llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id) { - int rc = 0; struct file *filp; struct dentry *dchild; struct obd_device *obd; struct obdo *oa = NULL; - int open_flags = O_RDWR | O_CREAT | O_LARGEFILE; + int open_flags = O_RDWR | O_LARGEFILE; + int rc = 0; ENTRY; - + obd = ctxt->loc_exp->exp_obd; + LASSERT(obd != NULL); + + if (lgh_id->lgl_oid) { + dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid, + lgh_id->lgl_ogen, lgh_id->lgl_ogr); + if (IS_ERR(dchild) == -ENOENT) { + OBD_ALLOC(oa, sizeof(*oa)); + if (!oa) + RETURN(ERR_PTR(-ENOMEM)); + + oa->o_id = lgh_id->lgl_oid; + oa->o_generation = lgh_id->lgl_ogen; + oa->o_gr = lgh_id->lgl_ogr; + oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP; + rc = obd_create(ctxt->loc_exp, oa, NULL, NULL); + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + GOTO(out_free_oa, rc); + } + CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n", + lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr); + + dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid, + lgh_id->lgl_ogen, lgh_id->lgl_ogr); + } else if (IS_ERR(dchild)) { + CERROR("error looking up logfile "LPX64":0x%x: rc %d\n", + lgh_id->lgl_oid, lgh_id->lgl_ogen, rc); + RETURN((struct file *)dchild); + } - /* this is important to work here over obd_create() as it manages - groups and we need it. Yet another reason is that mds_obd_create() - is fully the same as old version of this function and this helps - us to avoid code duplicating and layering violating. */ - OBD_ALLOC(oa, sizeof(*oa)); - if (!oa) - RETURN(ERR_PTR(-ENOMEM)); - - oa->o_gr = FILTER_GROUP_LLOG; - oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP; - rc = obd_create(ctxt->loc_exp, oa, NULL, NULL); - if (rc) + filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags); + if (IS_ERR(filp)) { + l_dput(dchild); + rc = PTR_ERR(filp); + CERROR("error opening logfile "LPX64"0x%x: rc %d\n", + lgh_id->lgl_oid, lgh_id->lgl_ogen, rc); + } GOTO(out_free_oa, rc); - - dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id, - oa->o_generation, oa->o_gr); - if (IS_ERR(dchild)) - GOTO(out_free_oa, rc = PTR_ERR(dchild)); + } else { + /* this is important to work here over obd_create() as it manages + groups and we need it. Yet another reason is that mds_obd_create() + is fully the same as old version of this function and this helps + us to avoid code duplicating and layering violating. */ + OBD_ALLOC(oa, sizeof(*oa)); + if (!oa) + RETURN(ERR_PTR(-ENOMEM)); - filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, - open_flags); - if (IS_ERR(filp)) { - l_dput(dchild); - GOTO(out_free_oa, rc = PTR_ERR(filp)); + oa->o_gr = FILTER_GROUP_LLOG; + oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP; + rc = obd_create(ctxt->loc_exp, oa, NULL, NULL); + if (rc) + GOTO(out_free_oa, rc); + + dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id, + oa->o_generation, oa->o_gr); + if (IS_ERR(dchild)) + GOTO(out_free_oa, rc = PTR_ERR(dchild)); + + filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, + open_flags); + if (IS_ERR(filp)) { + l_dput(dchild); + GOTO(out_free_oa, rc = PTR_ERR(filp)); + } + + /* group 1 is not longer valid, we use the group which is set + by obd_create()->mds_obd_create(). */ + lgh_id->lgl_ogr = oa->o_gr; + lgh_id->lgl_oid = oa->o_id; + lgh_id->lgl_ogen = oa->o_generation; } - /* group 1 is not longer valid, we use the group which is set - by obd_create()->mds_obd_create(). */ - lgh_id->lgl_ogr = oa->o_gr; - lgh_id->lgl_oid = oa->o_id; - lgh_id->lgl_ogen = oa->o_generation; - OBD_FREE(oa, sizeof(*oa)); - RETURN(filp); - out_free_oa: - OBD_FREE(oa, sizeof(*oa)); - RETURN(ERR_PTR(rc)); + if (rc) + filp = ERR_PTR(rc); + if (oa) + OBD_FREE(oa, sizeof(*oa)); + RETURN(filp); } static struct file * @@ -688,24 +795,15 @@ static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res, push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL); if (logid != NULL) { - char logname[LL_FID_NAMELEN + 10] = "OBJECTS/"; - char fidname[LL_FID_NAMELEN]; - ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen); - strcat(logname, fidname); - - handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644); + handle->lgh_file = llog_object_create(ctxt, logid); if (IS_ERR(handle->lgh_file)) { - CERROR("cannot open %s file, error = %ld\n", - logname, PTR_ERR(handle->lgh_file)); + CERROR("cannot create/open llog object "LPX64":%x " + "error = %ld", logid->lgl_oid, logid->lgl_ogen, + PTR_ERR(handle->lgh_file)); GOTO(cleanup, rc = PTR_ERR(handle->lgh_file)); } - if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", logname, - handle->lgh_file->f_dentry->d_inode->i_mode); - GOTO(cleanup, rc = -ENOENT); - } - LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir); handle->lgh_id = *logid; + } else if (name) { handle->lgh_file = llog_filp_open(name, open_flags, 0644); if (IS_ERR(handle->lgh_file)) { @@ -721,6 +819,7 @@ static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res, rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry); if (rc) GOTO(cleanup, rc); + } else { handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id); if (IS_ERR(handle->lgh_file)) { diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 07d46ba7f60cae9b74ee09da134b89a3017332b4..ba4e3b96c570861afd66d0c0e234dc6204cf237a 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -162,11 +162,13 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *data, { struct obd_device *obddev = class_exp2obd(exp); struct ptlrpc_request *req = *request; - int rc, size[2] = {sizeof(struct mds_rec_unlink), data->namelen + 1}; + int rc, size[3] = {sizeof(struct mds_rec_unlink), + data->namelen + 1, + obddev->u.cli.cl_max_mds_cookiesize}; ENTRY; LASSERT(req == NULL); - req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 2, size, + req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 3, size, NULL); if (req == NULL) RETURN(-ENOMEM); @@ -217,11 +219,11 @@ int mdc_rename(struct obd_export *exp, struct mdc_op_data *data, { struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req; - int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1, - newlen + 1}; + int rc, size[4] = {sizeof(struct mds_rec_rename), oldlen + 1, + newlen + 1, obd->u.cli.cl_max_mds_cookiesize}; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 3, size, + req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 4, size, NULL); if (req == NULL) RETURN(-ENOMEM); @@ -230,7 +232,8 @@ int mdc_rename(struct obd_export *exp, struct mdc_op_data *data, size[0] = sizeof(struct mds_body); size[1] = obd->u.cli.cl_max_mds_easize; - req->rq_replen = lustre_msg_size(2, size); + size[2] = obd->u.cli.cl_max_mds_cookiesize; + req->rq_replen = lustre_msg_size(3, size); rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL); *request = req; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index bf958f14755e75856d7a56b50aaa4ac9bf30cc48..a33ea6cfbb96853da4006cd9d8f6f17727a4c27c 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -486,7 +486,8 @@ int mdc_close(struct obd_export *exp, struct obdo *oa, struct obd_client_handle *och, struct ptlrpc_request **request) { struct obd_device *obd = class_exp2obd(exp); - int reqsize = sizeof(struct mds_body); + int reqsize[2] = {sizeof(struct mds_body), + obd->u.cli.cl_max_mds_cookiesize}; int rc, repsize[3] = {sizeof(struct mds_body), obd->u.cli.cl_max_mds_easize, obd->u.cli.cl_max_mds_cookiesize}; @@ -495,7 +496,7 @@ int mdc_close(struct obd_export *exp, struct obdo *oa, struct l_wait_info lwi; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize, + req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 2, reqsize, NULL); if (req == NULL) GOTO(out, rc = -ENOMEM); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index caa0aaddd5dffa497ad038c816349a56c317c4bb..8fc08d444ad5ccf678501d6c743e81bae6de55df 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1779,9 +1779,9 @@ int mds_handle(struct ptlrpc_request *req) OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); - if (opc == REINT_UNLINK) + if (opc == REINT_UNLINK || opc == REINT_RENAME) bufcount = 3; - else if (opc == REINT_OPEN || opc == REINT_RENAME) + else if (opc == REINT_OPEN) bufcount = 2; else bufcount = 1; @@ -2427,7 +2427,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - rc = lustre_pack_reply(req, 3, repsize, NULL); + rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize, NULL); if (rc) RETURN(req->rq_status = rc); diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 94be916803db7c8bc7adaa7ba647c88442f5796c..e26d84c9754d94af601af0c5453082667c668af9 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -608,13 +608,52 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, struct inode *parent_inode = mds->mds_objects_dir->d_inode; unsigned int tmpname = ll_insecure_random_int(); struct file *filp; - struct dentry *new_child; + struct dentry *dchild; struct lvfs_run_ctxt saved; char fidname[LL_FID_NAMELEN]; void *handle; int rc = 0, err, namelen; ENTRY; + if (oa->o_id) { + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + + namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); + + down(&parent_inode->i_sem); + dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + if (IS_ERR(dchild)) { + up(&parent_inode->i_sem); + GOTO(out_pop, rc = PTR_ERR(dchild)); + } + if (dchild->d_inode == NULL) { + struct dentry_params dp; + struct inode *inode; + + dchild->d_fsdata = (void *) &dp; + dp.p_ptr = NULL; + dp.p_inum = oa->o_id; + rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL); + if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id) + dchild->d_fsdata = NULL; + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + dput(dchild); + up(&parent_inode->i_sem); + GOTO(out_pop, rc); + } + inode = dchild->d_inode; + LASSERT(inode->i_ino == oa->o_id); + inode->i_generation = oa->o_generation; + CDEBUG(D_HA, "recreated ino %lu with gen %u\n", + inode->i_ino, inode->i_generation); + mark_inode_dirty(inode); + } else { + CWARN("it should be here!\n"); + } + GOTO(out_pop, rc); + } + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); sprintf(fidname, "OBJECTS/%u", tmpname); @@ -637,13 +676,13 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); down(&parent_inode->i_sem); - new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen); - if (IS_ERR(new_child)) { + if (IS_ERR(dchild)) { CERROR("getting neg dentry for obj rename: %d\n", rc); - GOTO(out_close, rc = PTR_ERR(new_child)); + GOTO(out_close, rc = PTR_ERR(dchild)); } - if (new_child->d_inode != NULL) { + if (dchild->d_inode != NULL) { CERROR("impossible non-negative obj dentry " LPU64":%u!\n", oa->o_id, oa->o_generation); LBUG(); @@ -656,7 +695,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, lock_kernel(); rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry, - mds->mds_objects_dir->d_inode, new_child); + mds->mds_objects_dir->d_inode, dchild); unlock_kernel(); if (rc) CERROR("error renaming new object "LPU64":%u: rc %d\n", @@ -671,7 +710,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, else if (!rc) rc = err; out_dput: - dput(new_child); + dput(dchild); out_close: up(&parent_inode->i_sem); err = filp_close(filp, 0); diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 8fed0cb6c3c3abde09c4f06c2dac89a988122dee..f1b1517ca77eee2d40cf72c5469e6b19f9af2151 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -68,7 +68,8 @@ int mds_cleanup_orphans(struct obd_device *obd); /* mds/mds_log.c */ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size); + struct llog_cookie *logcookies, int cookies_size, + struct llog_create_locks **res); int mds_llog_init(struct obd_device *obd, struct obd_llogs *, struct obd_device *tgt, int count, struct llog_catid *logid); int mds_llog_finish(struct obd_device *obd, struct obd_llogs *, int count); diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index fc1da22f065ece22c587db2b174dc24f6298d547..6c34f12ece0e518675f093669caea09d940548e1 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -39,7 +39,8 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, void *buf, struct llog_cookie *logcookies, - int numcookies, void *data) + int numcookies, void *data, + struct rw_semaphore **lock, int *lock_count) { struct obd_device *obd = ctxt->loc_obd; struct obd_device *lov_obd = obd->u.mds.mds_osc_obd; @@ -48,7 +49,8 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, ENTRY; lctxt = llog_get_context(&lov_obd->obd_llogs, ctxt->loc_idx); - rc = llog_add(lctxt, rec, buf, logcookies, numcookies, data); + rc = llog_add(lctxt, rec, buf, logcookies, numcookies, data, + lock, lock_count); RETURN(rc); } @@ -84,12 +86,15 @@ static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, int count, int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size) + struct llog_cookie *logcookies, int cookies_size, + struct llog_create_locks **res) { struct mds_obd *mds = &obd->u.mds; struct lov_stripe_md *lsm = NULL; struct llog_ctxt *ctxt; - int rc; + struct llog_create_locks *lcl; + int rc, size, offset = offsetof(struct llog_create_locks, lcl_locks); + int lock_count = 0; ENTRY; if (IS_ERR(mds->mds_osc_obd)) @@ -100,12 +105,29 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, if (rc < 0) RETURN(rc); + if (res != NULL) { + size = offset + + sizeof(struct rw_semaphore *) * lsm->lsm_stripe_count; + OBD_ALLOC(lcl, size); + if (lcl == NULL) + RETURN(-ENOMEM); + + lcl->lcl_count = lsm->lsm_stripe_count; + *res = lcl; + } + ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT); rc = llog_add(ctxt, NULL, lsm, logcookies, - cookies_size / sizeof(struct llog_cookie), NULL); + cookies_size / sizeof(struct llog_cookie), NULL, + res ? &lcl->lcl_locks[0] : NULL, &lock_count); obd_free_memmd(mds->mds_osc_exp, &lsm); + if (res && (rc <= 0 || lock_count == 0)) { + OBD_FREE(lcl, size); + *res = NULL; + } + RETURN(rc); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index b6551223e889c95022f50e7f61bf8925ffb39832..212990aafc7864e7ec5456260ec480eb87c7db33 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -1178,6 +1178,7 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd, struct mds_body *request_body = NULL, *reply_body = NULL; struct dentry_params dp; struct iattr iattr = { 0 }; + struct llog_create_locks *lcl = NULL; ENTRY; if (req && req->rq_reqmsg != NULL) @@ -1239,15 +1240,6 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd, GOTO(cleanup, rc); } - if (req != NULL && req->rq_repmsg != NULL && - (reply_body->valid & OBD_MD_FLEASIZE) && - mds_log_op_unlink(obd, pending_child->d_inode, lmm, - req->rq_repmsg->buflens[1], - lustre_msg_buf(req->rq_repmsg, 2, 0), - req->rq_repmsg->buflens[2]) > 0) { - reply_body->valid |= OBD_MD_FLCOOKIE; - } - pending_child->d_fsdata = (void *) &dp; dp.p_inum = 0; dp.p_ptr = req; @@ -1258,6 +1250,15 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd, if (rc) CERROR("error unlinking orphan %s: rc %d\n",fidname,rc); + if (req != NULL && req->rq_repmsg != NULL && + (reply_body->valid & OBD_MD_FLEASIZE) && + mds_log_op_unlink(obd, pending_child->d_inode, + lmm, req->rq_repmsg->buflens[1], + lustre_msg_buf(req->rq_repmsg, 2, 0), + req->rq_repmsg->buflens[2], &lcl) > 0) { + reply_body->valid |= OBD_MD_FLCOOKIE; + } + goto out; /* Don't bother updating attrs on unlinked inode */ } @@ -1337,6 +1338,8 @@ out: switch (cleanup_phase) { case 2: + if (lcl != NULL) + ptlrpc_save_llog_lock(req, lcl); dput(pending_child); case 1: up(&pending_dir->i_sem); @@ -1365,6 +1368,13 @@ int mds_close(struct ptlrpc_request *req) MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); } + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { + DEBUG_REQ(D_HA, req, "close replay\n"); + memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0), + lustre_msg_buf(req->rq_reqmsg, 1, 0), + req->rq_repmsg->buflens[2]); + } + body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body); if (body == NULL) { CERROR("Can't unpack body\n"); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index f424b76bb7ad709bb34d92ccbdf46a9ca5b590fe..43d240ad7c2468c310eb69ffe3585c7add86898b 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -281,6 +281,7 @@ void mds_steal_ack_locks(struct ptlrpc_request *req) struct list_head *tmp; struct ptlrpc_reply_state *oldrep; struct ptlrpc_service *svc; + struct llog_create_locks *lcl; unsigned long flags; char str[PTL_NALFMT_SIZE]; int i; @@ -315,6 +316,11 @@ void mds_steal_ack_locks(struct ptlrpc_request *req) oldrep->rs_modes[i]); oldrep->rs_nlocks = 0; + lcl = oldrep->rs_llog_locks; + oldrep->rs_llog_locks = NULL; + if (lcl != NULL) + ptlrpc_save_llog_lock(req, lcl); + DEBUG_REQ(D_HA, req, "stole locks for"); ptlrpc_schedule_difficult_reply (oldrep); @@ -1618,6 +1624,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, struct lustre_handle child_reuse_lockh = {0}; #endif struct lustre_handle * slave_lockh = NULL; + struct llog_create_locks *lcl = NULL; char fidname[LL_FID_NAMELEN]; void *handle = NULL; int rc = 0, log_unlink = 0, cleanup_phase = 0; @@ -1632,6 +1639,13 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { + DEBUG_REQ(D_HA, req, "unlink replay\n"); + memcpy(lustre_msg_buf(req->rq_repmsg, offset + 2, 0), + lustre_msg_buf(req->rq_reqmsg, 2, 0), + req->rq_repmsg->buflens[offset + 2]); + } + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) GOTO(cleanup, rc = -ENOENT); @@ -1791,7 +1805,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, lustre_msg_buf(req->rq_repmsg, offset + 1, 0), req->rq_repmsg->buflens[offset + 1], lustre_msg_buf(req->rq_repmsg, offset + 2, 0), - req->rq_repmsg->buflens[offset + 2]) > 0) + req->rq_repmsg->buflens[offset + 2], &lcl) > 0) body->valid |= OBD_MD_FLCOOKIE; break; } @@ -1849,6 +1863,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, if (!rc) (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"), "unlinked", 0, NULL); + if (lcl != NULL) + ptlrpc_save_llog_lock(req, lcl); case 3: /* child ino-reuse lock */ #if 0 if (rc && body != NULL) { @@ -2707,7 +2723,8 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, struct mds_obd *mds = mds_req2mds(req); struct lustre_handle dlm_handles[7] = {{0},{0},{0},{0},{0},{0},{0}}; struct mds_body *body = NULL; - int rc = 0, lock_count = 3; + struct llog_create_locks *lcl = NULL; + int rc = 0, log_unlink = 0, lock_count = 3; int cleanup_phase = 0; void *handle = NULL; ENTRY; @@ -2720,6 +2737,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { + DEBUG_REQ(D_HA, req, "rename replay\n"); + memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0), + lustre_msg_buf(req->rq_reqmsg, 3, 0), + req->rq_repmsg->buflens[2]); + } + if (rec->ur_namelen == 1) { rc = mds_reint_rename_create_name(rec, offset, req); RETURN(rc); @@ -2774,7 +2798,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME | OBD_MD_FLMTIME); } else { - /* XXX need log unlink? */ + log_unlink = 1; } } @@ -2803,6 +2827,16 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new); unlock_kernel(); + if (!rc && log_unlink) { + if (mds_log_op_unlink(obd, de_tgtdir->d_inode, + lustre_msg_buf(req->rq_repmsg, 1, 0), + req->rq_repmsg->buflens[1], + lustre_msg_buf(req->rq_repmsg, 2, 0), + req->rq_repmsg->buflens[2], &lcl) > 0) { + body->valid |= OBD_MD_FLCOOKIE; + } + } + GOTO(cleanup, rc); cleanup: rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL, @@ -2815,6 +2849,9 @@ cleanup: if (dlm_handles[6].cookie != 0) ldlm_lock_decref(&(dlm_handles[6]), LCK_PW); #endif + if (lcl != NULL) + ptlrpc_save_llog_lock(req, lcl); + if (rc) { if (lock_count == 4) ldlm_lock_decref(&(dlm_handles[3]), LCK_EX); diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index c11328cbcaa2a66b80748c19800aed105ea5bbd8..e64bc5ec2689784fa8bd4a920a6343986fc15db4 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -226,7 +226,7 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild, if (logcookies == NULL) rc = -ENOMEM; else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies, - mds->mds_max_cookiesize) > 0) + mds->mds_max_cookiesize, NULL) > 0) log_unlink = 1; } err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0); diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index b63d55c1f4c05acdd58ef50db4673ba28ddb25dd..6e3d512de11a7cc9780be9a258662d364e6d06db 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -100,15 +100,14 @@ static int cat_cancel_cb(struct llog_handle *cathandle, { struct llog_logid_rec *lir = (struct llog_logid_rec *)rec; struct llog_handle *loghandle; - struct llog_log_hdr *llh; - int rc, index; + int rc; ENTRY; if (rec->lrh_type != LLOG_LOGID_MAGIC) { CERROR("invalid record in catalog\n"); RETURN(-EINVAL); } - CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", + CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "LPX64"\n", lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen, rec->lrh_index, cathandle->lgh_id.lgl_oid); @@ -119,26 +118,8 @@ static int cat_cancel_cb(struct llog_handle *cathandle, RETURN(rc); } - llh = loghandle->lgh_hdr; - if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) && - (llh->llh_count == 1)) { - rc = llog_destroy(loghandle); - if (rc) - CERROR("failure destroying log in postsetup: %d\n", rc); - LASSERT(rc == 0); - - index = loghandle->u.phd.phd_cookie.lgc_index; - llog_free_handle(loghandle); - - LASSERT(index); - llog_cat_set_first_idx(cathandle, index); - rc = llog_cancel_rec(cathandle, index); - if (rc == 0) - CWARN("cancel log "LPX64":%x at index %u of catalog " - LPX64"\n", lir->lid_id.lgl_oid, - lir->lid_id.lgl_ogen, rec->lrh_index, - cathandle->lgh_id.lgl_oid); - } + if (cathandle->lgh_last_idx == loghandle->u.phd.phd_cookie.lgc_index) + cathandle->u.chd.chd_current_log = loghandle; RETURN(rc); } @@ -193,67 +174,6 @@ int llog_obd_origin_setup(struct obd_device *obd, struct obd_llogs *llogs, } EXPORT_SYMBOL(llog_obd_origin_setup); -int llog_obd_origin_cleanup(struct llog_ctxt *ctxt) -{ - struct llog_handle *cathandle, *n, *loghandle; - struct llog_log_hdr *llh; - int rc, index; - ENTRY; - - if (!ctxt) - return 0; - - cathandle = ctxt->loc_handle; - if (cathandle) { - list_for_each_entry_safe(loghandle, n, - &cathandle->u.chd.chd_head, - u.phd.phd_entry) { - llh = loghandle->lgh_hdr; - if ((llh->llh_flags & - LLOG_F_ZAP_WHEN_EMPTY) && - (llh->llh_count == 1)) { - rc = llog_destroy(loghandle); - if (rc) - CERROR("failure destroying log during " - "cleanup: %d\n", rc); - LASSERT(rc == 0); - - index = loghandle->u.phd.phd_cookie.lgc_index; - llog_free_handle(loghandle); - - LASSERT(index); - llog_cat_set_first_idx(cathandle, index); - rc = llog_cancel_rec(cathandle, index); - if (rc == 0) - CDEBUG(D_HA, "cancel plain log at index" - " %u of catalog "LPX64"\n", - index,cathandle->lgh_id.lgl_oid); - } - } - llog_cat_put(ctxt->loc_handle); - } - return 0; -} -EXPORT_SYMBOL(llog_obd_origin_cleanup); - -/* add for obdfilter/sz and mds/unlink */ -int llog_obd_origin_add(struct llog_ctxt *ctxt, - struct llog_rec_hdr *rec, struct lov_stripe_md *lsm, - struct llog_cookie *logcookies, int numcookies) -{ - struct llog_handle *cathandle; - int rc; - ENTRY; - - cathandle = ctxt->loc_handle; - LASSERT(cathandle != NULL); - rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL); - if (rc != 1) - CERROR("write one catalog record failed: %d\n", rc); - RETURN(rc); -} -EXPORT_SYMBOL(llog_obd_origin_add); - int obd_llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs, int count, char *name) { diff --git a/lustre/obdclass/llog_test.c b/lustre/obdclass/llog_test.c index 994208bbccb264535647c0cd1e1f8447bf2dab47..ff6e5bafe3ce6e9b9d075e5d8d6d856a5ca71f59 100644 --- a/lustre/obdclass/llog_test.c +++ b/lustre/obdclass/llog_test.c @@ -254,7 +254,7 @@ static int llog_test_4(struct obd_device *obd) cat_logid = cath->lgh_id; CWARN("4b: write 1 record into the catalog\n"); - rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL); + rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL, NULL, NULL); if (rc != 1) { CERROR("4b: write 1 catalog record failed at: %d\n", rc); GOTO(out, rc); @@ -279,7 +279,7 @@ static int llog_test_4(struct obd_device *obd) CWARN("4d: write 40,000 more log records\n"); for (i = 0; i < 40000; i++) { - rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL); + rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL, NULL, NULL); if (rc) { CERROR("4d: write 40000 records failed at #%d: %d\n", i + 1, rc); @@ -297,7 +297,7 @@ static int llog_test_4(struct obd_device *obd) for (i = 0; i < 5; i++) { rec.lrh_len = buflen; rec.lrh_type = OBD_CFG_REC; - rc = llog_cat_add_rec(cath, &rec, NULL, buf); + rc = llog_cat_add_rec(cath, &rec, NULL, buf, NULL, NULL); if (rc) { CERROR("4e: write 5 records failed at #%d: %d\n", i + 1, rc); @@ -403,7 +403,7 @@ static int llog_test_5(struct obd_device *obd) } CWARN("5d: add 1 record to the log with many canceled empty pages\n"); - rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL); + rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL, NULL, NULL); if (rc) { CERROR("5d: add record to the log with many canceled empty\ pages failed\n"); diff --git a/lustre/obdfilter/filter_log.c b/lustre/obdfilter/filter_log.c index 825748726f2e7842ee3d0e011692bc45c9754f7d..8e2e1884a87c6c8a31d351b01bf758b157c75536 100644 --- a/lustre/obdfilter/filter_log.c +++ b/lustre/obdfilter/filter_log.c @@ -80,7 +80,8 @@ int filter_log_sz_change(struct llog_handle *cathandle, lsc->lsc_fid = *mds_fid; lsc->lsc_io_epoch = io_epoch; - rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie, NULL); + rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie, + NULL, NULL, NULL); OBD_FREE(lsc, sizeof(*lsc)); if (rc > 0) { diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c index 714703fa3c9bc9e035271aad9dd726c107122606..9671976926cb348f1fa4663cb7ddd9381e2d5677 100644 --- a/lustre/ptlrpc/llog_net.c +++ b/lustre/ptlrpc/llog_net.c @@ -71,7 +71,7 @@ int llog_origin_connect(struct llog_ctxt *ctxt, int count, lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr); lgr->lgr_hdr.lrh_type = LLOG_GEN_REC; lgr->lgr_gen = ctxt->loc_gen; - rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, NULL); + rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, NULL, NULL, NULL); OBD_FREE(lgr, sizeof(*lgr)); if (rc != 1) RETURN(rc); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index d00dde8ca315270058cd7a3686cc6989e22c2fd2..9e3a0c78c902c016bebbdd7bd00844fccc6dbc61 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -134,6 +134,7 @@ EXPORT_SYMBOL(ptlrpc_disconnect_import); /* service.c */ EXPORT_SYMBOL(ptlrpc_save_lock); +EXPORT_SYMBOL(ptlrpc_save_llog_lock); EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply); EXPORT_SYMBOL(ptlrpc_commit_replies); EXPORT_SYMBOL(ptlrpc_init_svc); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 36bed9b74303d25170009f7c4dd7116dacba7085..692c929e3b4e1801cc7016b9263babde330b0691 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -28,6 +28,7 @@ #include <linux/obd_support.h> #include <linux/obd_class.h> #include <linux/lustre_net.h> +#include <linux/lustre_log.h> #include <portals/types.h> #include "ptlrpc_internal.h" @@ -116,6 +117,17 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd) OBD_FREE (rqbd, sizeof (*rqbd)); } +void +ptlrpc_save_llog_lock (struct ptlrpc_request *req, + struct llog_create_locks *lcl) +{ + struct ptlrpc_reply_state *rs = req->rq_reply_state; + LASSERT (rs != NULL); + LASSERT (rs->rs_llog_locks == NULL); + + rs->rs_llog_locks = lcl; +} + void ptlrpc_save_lock (struct ptlrpc_request *req, struct lustre_handle *lock, int mode) @@ -171,7 +183,9 @@ ptlrpc_commit_replies (struct obd_device *obd) list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) { struct ptlrpc_reply_state *rs = list_entry (tmp, struct ptlrpc_reply_state, rs_obd_list); + struct llog_create_locks *lcl = rs->rs_llog_locks; + rs->rs_llog_locks = NULL; LASSERT (rs->rs_difficult); if (rs->rs_transno <= obd->obd_last_committed) { @@ -181,6 +195,9 @@ ptlrpc_commit_replies (struct obd_device *obd) list_del_init (&rs->rs_obd_list); ptlrpc_schedule_difficult_reply (rs); spin_unlock (&svc->srv_lock); + + if (lcl != NULL) + llog_create_lock_free(lcl); } } @@ -510,6 +527,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) unsigned long flags; struct obd_export *exp; struct obd_device *obd; + struct llog_create_locks *lcl; int nlocks; int been_handled; char str[PTL_NALFMT_SIZE]; @@ -553,6 +571,9 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) nlocks = rs->rs_nlocks; /* atomic "steal", but */ rs->rs_nlocks = 0; /* locks still on rs_locks! */ + lcl = rs->rs_llog_locks; + rs->rs_llog_locks = NULL; + if (nlocks == 0 && !been_handled) { /* If we see this, we should already have seen the warning * in mds_steal_ack_locks() */ @@ -565,7 +586,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) } if ((!been_handled && rs->rs_on_net) || - nlocks > 0) { + nlocks > 0 || lcl != NULL) { spin_unlock_irqrestore(&svc->srv_lock, flags); if (!been_handled && rs->rs_on_net) { @@ -578,6 +599,9 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) ldlm_lock_decref(&rs->rs_locks[nlocks], rs->rs_modes[nlocks]); + if (lcl != NULL) + llog_create_lock_free(lcl); + spin_lock_irqsave(&svc->srv_lock, flags); } diff --git a/lustre/smfs/cache_space.c b/lustre/smfs/cache_space.c index a5bd94bded1addbc6348ce1552d0f69152150752..45bcf40b0d628b0f589e4c1b5fb6d5330687303c 100644 --- a/lustre/smfs/cache_space.c +++ b/lustre/smfs/cache_space.c @@ -324,7 +324,8 @@ insert: llr->llr_pfid.generation = parent->i_generation; llr->llr_pfid.f_type = parent->i_mode & S_IFMT; - rc = llog_add(ctxt, &llr->llr_hdr, NULL, logcookie, 1, NULL); + rc = llog_add(ctxt, &llr->llr_hdr, NULL, logcookie, 1, + NULL, NULL, NULL); if (rc != 1) { CERROR("failed at llog_add: %d\n", rc); GOTO(out, rc); diff --git a/lustre/smfs/kml.c b/lustre/smfs/kml.c index 7d79587c0a3572a88a45c07cb5fe320dee3ce7d8..310e4dad1f6b7b93ce9a6e909ff8f8298a00cd17 100644 --- a/lustre/smfs/kml.c +++ b/lustre/smfs/kml.c @@ -284,7 +284,8 @@ int smfs_process_rec(struct super_block *sb, lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr); lgr->lgr_hdr.lrh_type = LLOG_GEN_REC; lgr->lgr_gen = ctxt->loc_gen; - rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, NULL); + rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, + NULL, NULL, NULL); OBD_FREE(lgr, sizeof(*lgr)); if (rc != 1) RETURN(rc); diff --git a/lustre/smfs/smfs_llog.c b/lustre/smfs/smfs_llog.c index af370b9616a4968426d9f2609c0eac52a6ac0ad2..cfecc3e1c65580e642e2cc20278f997a99e45bb5 100644 --- a/lustre/smfs/smfs_llog.c +++ b/lustre/smfs/smfs_llog.c @@ -174,7 +174,7 @@ int smfs_llog_add_rec(struct smfs_super_info * sinfo, void *data, int data_size) rec.lrh_len = size_round(data_size); rec.lrh_type = SMFS_UPDATE_REC; - rc = llog_add(sinfo->smsi_rec_log, &rec, data, NULL, 0, NULL); + rc = llog_add(sinfo->smsi_rec_log, &rec, data, NULL, 0, NULL, NULL, NULL); if (rc != 1) { CERROR("error adding kml rec: %d\n", rc); RETURN(-EINVAL); diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 271b242293ef9b3fe123372ca9f82cff7ac6bcc4..255ca403b1c0551ec4317440caeb11e970a58dd4 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -977,6 +977,16 @@ test_48() { } run_test 48 "Don't lose transno when client is evicted (2525)" +# b=3550 - replay of unlink +test_49() { + replay_barrier mds + createmany -o $DIR/$tfile-%d 400 || return 1 + unlinkmany $DIR/$tfile-%d 0 400 || return 2 + fail mds + $CHECKSTAT -t file $DIR/$tfile-* && return 3 || true +} +run_test 49 "re-write records to llog as written during fail" + equals_msg test complete, cleaning up $CLEANUP