diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 8f7debb6563b4d90d4597a9f0f6a5fa40a31c182..a74808ae2e001977a31285909a17fcc5a8d469b3 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1627,6 +1627,13 @@ Description: Introduce lu_kmem_descr. Details : lu_kmem_descr and its companion interface allow to create and destroy a number of kmem caches at once. +Severity : normal +Bugzilla : 16450 +Description: Fix lu_object finalization race. +Details : Fix a race between lu_object_find() finding an object and its + concurrent finalization. This race is (most likely) not possible + on the server, but might happen on the client. + -------------------------------------------------------------------------------- 2007-08-10 Cluster File Systems, Inc. <info@clusterfs.com> diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 64384b79a6eb8405be3d0ee64c92a11966e26725..e848facd8d501aa30db7eb3a5eb8e0070dabc6ad 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -631,15 +631,25 @@ struct lu_site { /* * Client Seq Manager + /** + * Wait-queue signaled when an object in this site is ultimately + * destroyed (lu_object_free()). It is used by lu_object_find() to + * wait before re-trying when object in the process of destruction is + * found in the hash table. + * + * If having a single wait-queue turns out to be a problem, a + * wait-queue per hash-table bucket can be easily implemented. + * + * \see htable_lookup(). */ - struct lu_client_seq *ls_client_seq; + cfs_waitq_t ls_marche_funebre; - /* statistical counters. Protected by nothing, races are accepted. */ + /** statistical counters. Protected by nothing, races are accepted. */ struct { __u32 s_created; __u32 s_cache_hit; __u32 s_cache_miss; - /* + /** * Number of hash-table entry checks made. * * ->s_cache_check / (->s_cache_miss + ->s_cache_hit) diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 1a3699370c81d64252e6c8ec17f848ad53aa5ad7..8ecd85c84988964d5a7090246dfe3ddee6c3bb19 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -421,15 +421,29 @@ EXPORT_SYMBOL(lu_object_invariant); static struct lu_object *htable_lookup(struct lu_site *s, const struct hlist_head *bucket, - const struct lu_fid *f) + const struct lu_fid *f, + cfs_waitlink_t *waiter) { struct lu_object_header *h; struct hlist_node *scan; hlist_for_each_entry(h, scan, bucket, loh_hash) { s->ls_stats.s_cache_check ++; - if (likely(lu_fid_eq(&h->loh_fid, f) && - !lu_object_is_dying(h))) { + if (likely(lu_fid_eq(&h->loh_fid, f))) { + if (unlikely(lu_object_is_dying(h))) { + /* + * Lookup found an object being destroyed; + * this object cannot be returned (to assure + * that references to dying objects are + * eventually drained), and moreover, lookup + * has to wait until object is freed. + */ + cfs_waitlink_init(waiter); + cfs_waitq_add(&s->ls_marche_funebre, waiter); + set_current_state(CFS_TASK_UNINT); + s->ls_stats.s_cache_death_race ++; + return ERR_PTR(-EAGAIN); + } /* bump reference count... */ if (atomic_add_return(1, &h->loh_ref) == 1) ++ s->ls_busy; @@ -454,14 +468,29 @@ static __u32 fid_hash(const struct lu_fid *f, int bits) return hash_long(fid_flatten(f), bits); } -/* - * Search cache for an object with the fid @f. If such object is found, return - * it. Otherwise, create new object, insert it into cache and return it. In - * any case, additional reference is acquired on the returned object. +/** + * Search cache for an object with the fid \a f. If such object is found, + * return it. Otherwise, create new object, insert it into cache and return + * it. In any case, additional reference is acquired on the returned object. */ struct lu_object *lu_object_find(const struct lu_env *env, - struct lu_site *s, const struct lu_fid *f) + struct lu_device *dev, const struct lu_fid *f, + const struct lu_object_conf *conf) { + return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf); +} +EXPORT_SYMBOL(lu_object_find); + +/** + * Core logic of lu_object_find*() functions. + */ +static struct lu_object *lu_object_find_try(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf, + cfs_waitlink_t *waiter) +{ + struct lu_site *s; struct lu_object *o; struct lu_object *shadow; struct hlist_head *bucket; @@ -478,12 +507,16 @@ struct lu_object *lu_object_find(const struct lu_env *env, * object just allocated. * - unlock index; * - return object. + * + * If dying object is found during index search, add @waiter to the + * site wait-queue and return ERR_PTR(-EAGAIN). */ + s = dev->ld_site; bucket = s->ls_hash + fid_hash(f, s->ls_hash_bits); read_lock(&s->ls_guard); - o = htable_lookup(s, bucket, f); + o = htable_lookup(s, bucket, f, waiter); read_unlock(&s->ls_guard); if (o != NULL) @@ -493,14 +526,14 @@ struct lu_object *lu_object_find(const struct lu_env *env, * Allocate new object. This may result in rather complicated * operations, including fld queries, inode loading, etc. */ - o = lu_object_alloc(env, s, f); + o = lu_object_alloc(env, dev, f, conf); if (unlikely(IS_ERR(o))) return o; LASSERT(lu_fid_eq(lu_object_fid(o), f)); write_lock(&s->ls_guard); - shadow = htable_lookup(s, bucket, f); + shadow = htable_lookup(s, bucket, f, waiter); if (likely(shadow == NULL)) { hlist_add_head(&o->lo_header->loh_hash, bucket); list_add_tail(&o->lo_header->loh_lru, &s->ls_lru); @@ -515,9 +548,58 @@ struct lu_object *lu_object_find(const struct lu_env *env, lu_object_free(env, o); return shadow; } -EXPORT_SYMBOL(lu_object_find); -/* +/** + * Much like lu_object_find(), but top level device of object is specifically + * \a dev rather than top level device of the site. This interface allows + * objects of different "stacking" to be created within the same site. + */ +struct lu_object *lu_object_find_at(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) +{ + struct lu_object *obj; + cfs_waitlink_t wait; + + while (1) { + obj = lu_object_find_try(env, dev, f, conf, &wait); + if (obj == ERR_PTR(-EAGAIN)) { + /* + * lu_object_find_try() already added waiter into the + * wait queue. + */ + cfs_waitq_wait(&wait, CFS_TASK_UNINT); + cfs_waitq_del(&dev->ld_site->ls_marche_funebre, &wait); + } else + break; + } + return obj; +} +EXPORT_SYMBOL(lu_object_find_at); + +/** + * Find object with given fid, and return its slice belonging to given device. + */ +struct lu_object *lu_object_find_slice(const struct lu_env *env, + struct lu_device *dev, + const struct lu_fid *f, + const struct lu_object_conf *conf) +{ + struct lu_object *top; + struct lu_object *obj; + + top = lu_object_find(env, dev, f, conf); + if (!IS_ERR(top)) { + obj = lu_object_locate(top->lo_header, dev->ld_type); + if (obj == NULL) + lu_object_put(env, top); + } else + obj = top; + return obj; +} +EXPORT_SYMBOL(lu_object_find_slice); + /** * Global list of all device types. */