diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index aa9583fe2585ead138d32928be2b1d77e3ae32a3..90f53fbee19229acffef0850fdbb242b23429e74 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -210,6 +210,21 @@ struct lov_object { struct lov_layout_raid0 { unsigned lo_nr; struct lov_stripe_md *lo_lsm; + /** + * Array of sub-objects. Allocated when top-object is + * created (lov_init_raid0()). + * + * Top-object is a strict master of its sub-objects: + * it is created before them, and outlives its + * children (this later is necessary so that basic + * functions like cl_object_top() always + * work). Top-object keeps a reference on every + * sub-object. + * + * When top-object is destroyed (lov_delete_raid0()) + * it releases its reference to a sub-object and waits + * until the latter is finally destroyed. + */ struct lovsub_object **lo_sub; /** * When this is true, lov_object::lo_attr contains @@ -394,6 +409,7 @@ struct lov_thread_info { struct cl_2queue lti_cl2q; union lov_layout_state lti_state; struct cl_lock_closure lti_closure; + cfs_waitlink_t lti_waiter; }; /** diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index a38d22b27efdc899a98aa91a365595b9ec87a9f3..d5781b4cea3fa4295c03f8fd3f22395d5cb84c3d 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -235,6 +235,41 @@ static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov, LASSERT(lov->lo_type == LLT_EMPTY); } +static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, + struct lovsub_object *los, int idx) +{ + struct cl_object *sub; + struct lov_layout_raid0 *r0; + struct lu_site *site; + cfs_waitlink_t *waiter; + + r0 = &lov->u.raid0; + sub = lovsub2cl(los); + LASSERT(r0->lo_sub[idx] == los); + + cl_object_kill(env, sub); + /* release a reference to the sub-object and ... */ + lu_object_ref_del(&sub->co_lu, "lov-parent", lov); + cl_object_put(env, sub); + + /* ... wait until it is actually destroyed---sub-object clears its + * ->lo_sub[] slot in lovsub_object_fini() */ + if (r0->lo_sub[idx] == los) { + waiter = &lov_env_info(env)->lti_waiter; + site = sub->co_lu.lo_dev->ld_site; + cfs_waitlink_init(waiter); + cfs_waitq_add(&site->ls_marche_funebre, waiter); + set_current_state(CFS_TASK_UNINT); + + while (r0->lo_sub[idx] == los) + /* this wait-queue is signaled at the end of + * lu_object_free(). */ + cfs_waitq_wait(waiter, CFS_TASK_UNINT); + cfs_waitq_del(&site->ls_marche_funebre, waiter); + } + LASSERT(r0->lo_sub[idx] == NULL); +} + static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) { @@ -242,17 +277,16 @@ static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, int i; ENTRY; - if (r0->lo_sub != NULL && - lu_object_is_dying(lov->lo_cl.co_lu.lo_header)) { + if (r0->lo_sub != NULL) { for (i = 0; i < r0->lo_nr; ++i) { - struct lovsub_object *sub = r0->lo_sub[i]; + struct lovsub_object *los = r0->lo_sub[i]; - if (sub != NULL) + if (los != NULL) /* * If top-level object is to be evicted from * the cache, so are its sub-objects. */ - cl_object_kill(env, lovsub2cl(sub)); + lov_subobject_kill(env, lov, los, i); } } EXIT; @@ -271,19 +305,6 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, ENTRY; if (r0->lo_sub != NULL) { - int i; - - for (i = 0; i < r0->lo_nr; ++i) { - struct cl_object *sub; - - if (r0->lo_sub[i] == NULL) - continue; - sub = lovsub2cl(r0->lo_sub[i]); - cl_object_header(sub)->coh_parent = NULL; - lu_object_ref_del(&sub->co_lu, "lov-parent", lov); - cl_object_put(env, sub); - r0->lo_sub[i] = NULL; - } OBD_FREE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); r0->lo_sub = NULL; } diff --git a/lustre/lov/lovsub_object.c b/lustre/lov/lovsub_object.c index e49d43d5c72af6a3929a7b42cab44b529362cc85..a2c4f0611af1d504e0139ce35ffdfc1aa23c1720 100644 --- a/lustre/lov/lovsub_object.c +++ b/lustre/lov/lovsub_object.c @@ -74,8 +74,13 @@ int lovsub_object_init(const struct lu_env *env, struct lu_object *obj, static void lovsub_object_free(const struct lu_env *env, struct lu_object *obj) { struct lovsub_object *los = lu2lovsub(obj); + struct lov_object *lov = los->lso_super; + + LASSERT(lov->lo_type == LLT_RAID0); + LASSERT(lov->u.raid0.lo_sub[los->lso_index] == los); ENTRY; + lov->u.raid0.lo_sub[los->lso_index] = NULL; lu_object_fini(obj); lu_object_header_fini(&los->lso_header.coh_lu); OBD_SLAB_FREE_PTR(los, lovsub_object_kmem);