From c8684b9b51b84389f06bebb3947e691ba16c5a6a Mon Sep 17 00:00:00 2001 From: shadow <shadow> Date: Fri, 10 Aug 2007 11:58:38 +0000 Subject: [PATCH] lvbo_init failed for resource with missing objects. b=9977 i=green i=wangdi --- lustre/ChangeLog | 8 ++ lustre/include/lustre/lustre_idl.h | 10 +++ lustre/liblustre/rw.c | 9 +- lustre/llite/file.c | 2 +- lustre/llite/rw.c | 5 +- lustre/lov/lov_merge.c | 8 +- lustre/lov/lov_obd.c | 12 ++- lustre/obdfilter/filter.c | 140 +++++++++++++++++++++++------ lustre/obdfilter/filter_internal.h | 1 + lustre/obdfilter/filter_io.c | 30 ++++++- lustre/obdfilter/filter_lvb.c | 2 + lustre/osc/osc_request.c | 11 +++ lustre/tests/sanityN.sh | 24 ++--- 13 files changed, 204 insertions(+), 58 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index c70fe4f962..e54c538a6a 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -157,6 +157,14 @@ Bugzilla : 12955 Description: jbd statistics Details : Port older jbd statistics patch for sles10 +Severity : normal +Bugzilla : 9977 +Frequency : rare +Description: lvbo_init failed for resource with missing objects. +Details : Fix returning error if we do stat for file with missing/corrupted + objects and i_size set to all sum of size of all avaible objects. + if we to truncate/write to missing object - it is recreated. + -------------------------------------------------------------------------------- 2007-07-30 Cluster File Systems, Inc. <info@clusterfs.com> diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 0dbe24313c..e425c826ac 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -599,6 +599,16 @@ extern void lustre_swab_ost_last_id(obd_id *id); /* lock value block communicated between the filter and llite */ +/* OST_LVB_ERR_INIT is needed because the return code in rc is + * negative, i.e. because ((MASK + rc) & MASK) != MASK. */ +#define OST_LVB_ERR_INIT 0xffbadbad80000000ULL +#define OST_LVB_ERR_MASK 0xffbadbad00000000ULL +#define OST_LVB_IS_ERR(blocks) \ + ((blocks & OST_LVB_ERR_MASK) == OST_LVB_ERR_MASK) +#define OST_LVB_SET_ERR(blocks, rc) \ + do { blocks = OST_LVB_ERR_INIT + rc; } while (0) +#define OST_LVB_GET_ERR(blocks) (int)(blocks - OST_LVB_ERR_INIT) + struct ost_lvb { __u64 lvb_size; __u64 lvb_mtime; diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index 80457fcb39..0811eae2a5 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -259,15 +259,18 @@ int llu_glimpse_size(struct inode *inode) } inode_init_lvb(inode, &lvb); - obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0); + rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0); st->st_size = lvb.lvb_size; st->st_blocks = lvb.lvb_blocks; + /* handle st_blocks overflow gracefully */ + if (st->st_blocks < lvb.lvb_blocks) + st->st_blocks = ~0UL; st->st_mtime = lvb.lvb_mtime; st->st_atime = lvb.lvb_atime; st->st_ctime = lvb.lvb_ctime; - CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: %llu\n", - (long long)st->st_size, (long long)st->st_blocks); + CDEBUG(D_DLMTRACE, "glimpse: size: %llu, blocks: "LPU64"\n", + (long long)st->st_size, st->st_blocks); RETURN(rc); } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index b3cf253451..31785179ae 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1046,7 +1046,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) ll_inode_size_lock(inode, 1); inode_init_lvb(inode, &lvb); - obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0); + rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0); inode->i_size = lvb.lvb_size; inode->i_blocks = lvb.lvb_blocks; LTIME_S(inode->i_mtime) = lvb.lvb_mtime; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 8b80424e18..f4f1cd644d 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -139,8 +139,9 @@ void ll_truncate(struct inode *inode) * race condition. */ lov_stripe_lock(lli->lli_smd); inode_init_lvb(inode, &lvb); - obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0); - if (lvb.lvb_size == inode->i_size) { + rc = obd_merge_lvb(ll_i2obdexp(inode), lli->lli_smd, &lvb, 0); + oa.o_blocks = lvb.lvb_blocks; + if (lvb.lvb_size == inode->i_size && rc == 0) { CDEBUG(D_VFSTRACE, "skipping punch for obj "LPX64", %Lu=%#Lx\n", lli->lli_smd->lsm_object_id,inode->i_size,inode->i_size); lov_stripe_unlock(lli->lli_smd); diff --git a/lustre/lov/lov_merge.c b/lustre/lov/lov_merge.c index 984a8ee256..45544c94f9 100644 --- a/lustre/lov/lov_merge.c +++ b/lustre/lov/lov_merge.c @@ -57,6 +57,7 @@ int lov_merge_lvb(struct obd_export *exp, struct lov_stripe_md *lsm, __u64 current_atime = lvb->lvb_atime; __u64 current_ctime = lvb->lvb_ctime; int i; + int rc = 0; LASSERT_SPIN_LOCKED(&lsm->lsm_lock); #ifdef __KERNEL__ @@ -67,6 +68,11 @@ int lov_merge_lvb(struct obd_export *exp, struct lov_stripe_md *lsm, obd_size lov_size, tmpsize; loi = lsm->lsm_oinfo[i]; + if (OST_LVB_IS_ERR(loi->loi_lvb.lvb_blocks)) { + rc = OST_LVB_GET_ERR(loi->loi_lvb.lvb_blocks); + continue; + } + tmpsize = loi->loi_kms; if (kms_only == 0 && loi->loi_lvb.lvb_size > tmpsize) tmpsize = loi->loi_lvb.lvb_size; @@ -94,7 +100,7 @@ int lov_merge_lvb(struct obd_export *exp, struct lov_stripe_md *lsm, lvb->lvb_mtime = current_mtime; lvb->lvb_atime = current_atime; lvb->lvb_ctime = current_ctime; - RETURN(0); + RETURN(rc); } /* Must be called under the lov_stripe_lock() */ diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 0167df50e9..31569ca216 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1965,6 +1965,7 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, struct list_head *pos; struct lov_obd *lov = &exp->exp_obd->u.lov; struct lustre_handle *lov_lockhp; + ldlm_mode_t this_mode; int err = 0, rc = 0; ENTRY; @@ -1983,8 +1984,17 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, req = list_entry(pos, struct lov_request, rq_link); lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; + /* If this lock was used for a write or truncate, the object + * will have been recreated by the OST, cancel the lock + * (setting LCK_GROUP incidentally causes immediate cancel). */ + if (OST_LVB_IS_ERR(lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks) && + (mode == LCK_PW || mode == LCK_CW)) + this_mode = LCK_GROUP; + else + this_mode = mode; + rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp, - req->rq_oi.oi_md, mode, lov_lockhp); + req->rq_oi.oi_md, this_mode, lov_lockhp); rc = lov_update_common_set(set, req, rc); if (rc) { CERROR("error: cancel objid "LPX64" subobj " diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index a85fbfbe94..f6251c30b5 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2860,23 +2860,21 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, for (i = 0; i < *num && err == 0; i++) { int cleanup_phase = 0; - if (filter->fo_destroy_in_progress) { - CWARN("%s: precreate aborted by destroy\n", - obd->obd_name); - rc = -EAGAIN; - break; - } - if (recreate_obj) { __u64 last_id; next_id = oa->o_id; last_id = filter_last_id(filter, group); if (next_id > last_id) { - CERROR("Error: Trying to recreate obj greater" + CERROR("%s: trying to recreate obj greater" "than last id "LPD64" > "LPD64"\n", - next_id, last_id); + obd->obd_name, next_id, last_id); GOTO(cleanup, rc = -EINVAL); } + } else if (filter->fo_destroy_in_progress) { + CWARN("%s: precreate aborted by destroy\n", + obd->obd_name); + rc = -EAGAIN; + break; } else next_id = filter_last_id(filter, group) + 1; @@ -2970,20 +2968,72 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, RETURN(rc); } +int filter_recreate(struct obd_device *obd, struct obdo *oa) +{ + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + struct ldlm_valblock_ops *ns_lvbo; + struct ldlm_resource *res; + obd_valid old_valid = oa->o_valid; + obd_flag old_flags = oa->o_flags; + int diff = 1, rc; + ENTRY; + + if (oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr)) { + CERROR("recreate objid "LPU64" > last id "LPU64"\n", + oa->o_id, filter_last_id(&obd->u.filter, oa->o_gr)); + RETURN(-EINVAL); + } + + if ((oa->o_valid & OBD_MD_FLFLAGS) == 0) { + oa->o_valid |= OBD_MD_FLFLAGS; + oa->o_flags = OBD_FL_RECREATE_OBJS; + } else { + oa->o_flags |= OBD_FL_RECREATE_OBJS; + } + + down(&obd->u.filter.fo_create_lock); + rc = filter_precreate(obd, oa, oa->o_gr, &diff); + up(&obd->u.filter.fo_create_lock); + + res = ldlm_resource_get(obd->obd_namespace, NULL, + res_id, LDLM_EXTENT, 0); + if (res != NULL) { + /* Update lvb->lvb_blocks for the recreated object */ + ns_lvbo = res->lr_namespace->ns_lvbo; + if (ns_lvbo && ns_lvbo->lvbo_update) { + rc = ns_lvbo->lvbo_update(res, NULL, 0, 1); + if (rc) + RETURN(rc); + } + ldlm_resource_putref(res); + } + + if (rc == 0) + CWARN("%s: recreated missing object "LPU64"/"LPU64"\n", + obd->obd_name, oa->o_id, oa->o_gr); + + oa->o_valid = old_valid; + oa->o_flags = old_flags; + RETURN(rc); +} + static int filter_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct obd_device *obd = NULL; struct lvfs_run_ctxt saved; struct lov_stripe_md *lsm = NULL; + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + ldlm_policy_data_t policy = { .l_extent = { 0, OBD_OBJECT_EOF } }; + struct lustre_handle lockh; + int flags = 0; int rc = 0; ENTRY; if (!(oa->o_valid & OBD_MD_FLGROUP)) oa->o_gr = 0; - CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n", - oa->o_gr, oa->o_id); + CDEBUG(D_INFO, "object "LPU64"/"LPU64"\n", oa->o_id, oa->o_gr); if (ea != NULL) { lsm = *ea; if (lsm == NULL) { @@ -2998,19 +3048,16 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_RECREATE_OBJS)) { - if (oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr)) { - CERROR("recreate objid "LPU64" > last id "LPU64"\n", - oa->o_id, filter_last_id(&obd->u.filter, - oa->o_gr)); - rc = -EINVAL; - } else { - struct filter_obd *filter = &obd->u.filter; - int diff = 1; - - down(&filter->fo_create_lock); - rc = filter_precreate(obd, oa, oa->o_gr, &diff); - up(&filter->fo_create_lock); - } + /* Cancel all conflicting extent locks on recreating object, + * thus object's metadata will be updated on the clients */ + rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, + LDLM_EXTENT, &policy, LCK_PW, + &flags, ldlm_blocking_ast, + ldlm_completion_ast, + ldlm_glimpse_ast, NULL, 0, + NULL, &lockh); + rc = filter_recreate(obd, oa); + ldlm_lock_decref(&lockh, LCK_PW); } else { rc = filter_handle_precreate(exp, oa, oa->o_gr, oti); } @@ -3190,7 +3237,10 @@ static int filter_truncate(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti, struct ptlrpc_request_set *rqset) { - int rc; + struct obdo *oa = oinfo->oi_oa; + struct dentry *dentry; + struct lvfs_run_ctxt saved; + int rc = 0; ENTRY; if (oinfo->oi_policy.l_extent.end != OBD_OBJECT_EOF) { @@ -3200,11 +3250,45 @@ static int filter_truncate(struct obd_export *exp, struct obd_info *oinfo, } CDEBUG(D_INODE, "calling truncate for object "LPU64", valid = "LPX64 - ", o_size = "LPD64"\n", oinfo->oi_oa->o_id, - oinfo->oi_oa->o_valid, oinfo->oi_policy.l_extent.start); + ", o_size = "LPD64"\n", oa->o_id, + oa->o_valid, oinfo->oi_policy.l_extent.start); + + oa->o_size = oinfo->oi_policy.l_extent.start; + + if (!(oa->o_valid & OBD_MD_FLGROUP)) + oa->o_gr = 0; + + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + lock_kernel(); + + dentry = filter_fid2dentry(exp->exp_obd, NULL, oa->o_gr, oa->o_id); + if (IS_ERR(dentry)) + GOTO(out_unlock, rc = PTR_ERR(dentry)); + + if (dentry->d_inode == NULL) { + if (oinfo->oi_policy.l_extent.start == 0 && + filter_recreate(exp->exp_obd, oa) == 0) { + f_dput(dentry); + dentry = filter_fid2dentry(exp->exp_obd, NULL, + oa->o_gr, oa->o_id); + } + if (IS_ERR(dentry) || dentry->d_inode == NULL) { + CERROR("%s: punch missing obj "LPU64"/"LPU64": rc %d\n", + exp->exp_obd->obd_name, oa->o_id, oa->o_gr, rc); + if (IS_ERR(dentry)) + GOTO(out_unlock, rc = -ENOENT); + GOTO(out_dput, rc = -ENOENT); + } + } - oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; rc = filter_setattr(exp, oinfo, oti); + +out_dput: + f_dput(dentry); +out_unlock: + unlock_kernel(); + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + RETURN(rc); } diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 9fb48ad4fa..65e54431eb 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -116,6 +116,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, struct obdo *oa, struct obd_trans_info *oti); int filter_setattr(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti); +int filter_recreate(struct obd_device *obd, struct obdo *oa); struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa); diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 8e55e5f412..72e3344868 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -526,9 +526,33 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, cleanup_phase = 2; if (dentry->d_inode == NULL) { - CERROR("%s: trying to BRW to non-existent file "LPU64"\n", - exp->exp_obd->obd_name, obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); + struct obdo *noa = oa; + + if (oa == NULL) { + OBDO_ALLOC(noa); + if (noa == NULL) + GOTO(recreate_out, rc = -ENOENT); + noa->o_id = obj->ioo_id; + noa->o_valid = OBD_MD_FLID; + } + + if (filter_recreate(exp->exp_obd, noa) == 0) { + f_dput(dentry); + dentry = filter_fid2dentry(exp->exp_obd, NULL, + obj->ioo_gr, obj->ioo_id); + } + if (oa == NULL) + OBDO_FREE(noa); + recreate_out: + if (IS_ERR(dentry) || dentry->d_inode == NULL) { + CERROR("%s: BRW to missing obj "LPU64"/"LPU64":rc %d\n", + exp->exp_obd->obd_name, + obj->ioo_id, obj->ioo_gr, + IS_ERR(dentry) ? (int)PTR_ERR(dentry) : -ENOENT); + if (IS_ERR(dentry)) + cleanup_phase = 1; + GOTO(cleanup, rc = -ENOENT); + } } fso.fso_dentry = dentry; diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c index 6c71a9bbf0..115a012448 100644 --- a/lustre/obdfilter/filter_lvb.c +++ b/lustre/obdfilter/filter_lvb.c @@ -94,6 +94,8 @@ static int filter_lvbo_init(struct ldlm_resource *res) out_dentry: f_dput(dentry); + if (rc) + OST_LVB_SET_ERR(lvb->lvb_blocks, rc); /* Don't free lvb data on lookup error */ return rc; } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9caa9df9d0..cdea3b4ff6 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -3214,6 +3214,17 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, err = lquota_poll_check(quota_interface, exp, (struct if_quotacheck *)karg); GOTO(out, err); + case OBD_IOC_DESTROY: { + struct obdo *oa; + + if (!capable (CAP_SYS_ADMIN)) + GOTO (out, err = -EPERM); + oa = &data->ioc_obdo1; + oa->o_valid |= OBD_MD_FLGROUP; + + err = osc_destroy(exp, oa, NULL, NULL, NULL); + GOTO(out, err); + } default: CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, cfs_curproc_comm()); diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index e268ce21d4..ad6f1729b4 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -3,8 +3,8 @@ set -e ONLY=${ONLY:-"$*"} -# bug number for skipped test: 3192 9977 -ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"14b 28"} +# bug number for skipped test: 3192 +ALWAYS_EXCEPT=${ALWAYS_EXCEPT:-"14b"} # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! [ "$SLOW" = "no" ] && EXCEPT="$EXCEPT 16" @@ -730,28 +730,14 @@ test_27() { run_test 27 "align non-overlapping extent locks from request ===" test_28() { # bug 9977 - ECHO_UUID="ECHO_osc1_UUID" - tOST=`$LCTL dl | | awk '/-osc-|OSC.*MNT/ { print $4 }' | head -1` + ostID=`$LCTL dl | awk '/-osc-|OSC.*MNT/ { ost++; if (ost == 2) { print $1 } }'` lfs setstripe $DIR1/$tfile 1048576 0 2 - tOBJID=`lfs getstripe $DIR1/$tfile |grep "^[[:space:]]\+1" |awk '{print $2}'` + tOBJID=`lfs getstripe $DIR1/$tfile | awk '/^[[:space:]]+1/ {print $2}'` dd if=/dev/zero of=$DIR1/$tfile bs=1024k count=2 - $LCTL <<-EOF - newdev - attach echo_client ECHO_osc1 $ECHO_UUID - setup $tOST - EOF - - tECHOID=`$LCTL dl | grep $ECHO_UUID | awk '{print $1}'` - $LCTL --device $tECHOID destroy "${tOBJID}:0" + $LCTL --device $ostID destroy "${tOBJID}" - $LCTL <<-EOF - cfg_device ECHO_osc1 - cleanup - detach - EOF - # reading of 1st stripe should pass dd if=$DIR2/$tfile of=/dev/null bs=1024k count=1 || error # reading of 2nd stripe should fail (this stripe was destroyed) -- GitLab