diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 2c44a3d6e9e0a47572264d3c72dd48e82532bf81..74b1c5267cd485bce5d5750e4af7ae5a826b678d 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -158,7 +158,9 @@ typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, struct ldlm_valblock_ops { int (*lvbo_init)(struct ldlm_resource *res); - int (*lvbo_update)(struct ldlm_resource *res, struct lustre_msg *m, + + int (*lvbo_update)(struct ldlm_resource *res, + struct lustre_msg *m, int buf_idx, int increase); }; diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index 0a91241b3ae7e421503d421e28dae040126e5b99..8f9b829411927fa1f7d1e2e9da1c1c99bef5de41 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -31,18 +31,8 @@ struct mds_export_data { struct osc_creator { spinlock_t oscc_lock; - struct list_head oscc_list; - struct obd_device *oscc_obd; - obd_id oscc_last_id;//last available pre-created object - obd_id oscc_next_id;// what object id to give out next - obd_id oscc_gr; - int oscc_grow_count; - int oscc_max_grow_count; - int oscc_kick_barrier; - struct osc_created *oscc_osccd; - struct obdo oscc_oa; + struct obd_device *oscc_obd; int oscc_flags; - wait_queue_head_t oscc_waitq; /* creating procs wait on this */ }; struct ldlm_export_data { diff --git a/lustre/include/linux/lustre_fsfilt.h b/lustre/include/linux/lustre_fsfilt.h index 70d027b66ea1648bf21649b678a464c7d638dd06..5c9ecd07108129b893a9c4def573631e8ba31dfa 100644 --- a/lustre/include/linux/lustre_fsfilt.h +++ b/lustre/include/linux/lustre_fsfilt.h @@ -601,6 +601,7 @@ fsfilt_clear_fs_flags(struct obd_device *obd, struct inode *inode, int flags) return obd->obd_fsops->fs_clear_fs_flags(inode, flags); return 0; } + static inline int fsfilt_precreate_rec(struct obd_device *obd, struct dentry *dentry, int *num, struct obdo *oa) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 6a19cf2eff33e2aa23e4d93e6554eeb6ea634632..c94afce0870cbd14fdb2eb7a169a70fa0bed9299 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -463,10 +463,7 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_BRW_FROM_GRANT 0x20 /* the osc manages this under llite */ #define OBD_BRW_GRANTED 0x40 /* the ost manages this */ -#define OBD_OBJECT_EOF 0xffffffffffffffffULL - -#define OST_MIN_PRECREATE 32 -#define OST_MAX_PRECREATE 20000 +#define OBD_OBJECT_EOF 0xffffffffffffffffULL struct obd_ioobj { obd_id ioo_id; diff --git a/lustre/include/linux/lustre_smfs.h b/lustre/include/linux/lustre_smfs.h index feab2166f4e9c2328267ba24bc2bff7fa6ba1bad..ba6667f5ff430fa7d7e0aae227aaf1c9ebd95b29 100644 --- a/lustre/include/linux/lustre_smfs.h +++ b/lustre/include/linux/lustre_smfs.h @@ -162,7 +162,6 @@ struct fs_extent{ /* SMFS external flags and methods */ #define SM_ALL_PLG 0x80L -#define SM_PRECREATE 0x100L #define SM_DO_REC 0x1 #define SM_INIT_REC 0x2 diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 60a18f9a83699d1aaf165d7894fe0aebb9e46a9f..23f46e67b4e1f929e36352b76175597aeaf78f1e 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -373,11 +373,12 @@ struct mds_obd { struct obd_export *mds_dt_exp; int mds_has_dt_desc; struct lov_desc mds_dt_desc; + + spinlock_t mds_dt_lock; obd_id *mds_dt_objids; - int mds_dt_objids_valid; - int mds_dt_nextid_set; struct file *mds_dt_objid_filp; - spinlock_t mds_dt_lock; + int mds_dt_objids_valid; + unsigned long *mds_client_bitmap; struct semaphore mds_orphan_recovery_sem; @@ -407,6 +408,7 @@ struct mds_obd { /* which secure flavor from remote to this mds is denied */ spinlock_t mds_denylist_lock; struct list_head mds_denylist; + struct semaphore mds_create_sem; }; struct echo_obd { @@ -545,13 +547,19 @@ struct niobuf_local { int rc; }; +#define OBD_MODE_ASYNC (1 << 0) +#define OBD_MODE_CROW (1 << 1) + /* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */ #define N_LOCAL_TEMP_PAGE 0x10000000 +typedef int (*obd_obj_alloc_func_t)(obd_id *objid); + struct obd_trans_info { __u64 oti_transno; __u64 *oti_objid; - /* Only used on the server side for tracking acks. */ + + /* only used on the server side for tracking acks. */ struct oti_req_ack_lock { struct lustre_handle lock; __u32 mode; @@ -560,7 +568,8 @@ struct obd_trans_info { struct llog_cookie oti_onecookie; struct llog_cookie *oti_logcookies; int oti_numcookies; - int oti_async; + int oti_flags; + obd_obj_alloc_func_t oti_obj_alloc; }; static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies) diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 2b0288279a0bdcd62d62fa36f7a0288c38abe596..6d2dae806f7ec2beca64fc516220d0d9eb8e9abf 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -300,7 +300,8 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, LASSERT(list_empty(&lock->l_res_link)); lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, - lock->l_resource->lr_type, 1); + lock->l_resource->lr_type, + 1); if (lock->l_resource == NULL) { LBUG(); RETURN(-ENOMEM); @@ -769,7 +770,8 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, parent_res = parent_lock->l_resource; } - res = ldlm_resource_get(ns, parent_res, res_id, type, 1); + res = ldlm_resource_get(ns, parent_res, res_id, + type, 1); if (res == NULL) RETURN(NULL); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 1ed72f32f6192986450a7441b54272646904a8d1..3bc90b0ffe4a7b0c7fb179cd6dfc43e9f7096c91 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -922,8 +922,10 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns, } /* non-blocking function to manipulate a lock whose cb_data is being put away.*/ -void ldlm_change_cbdata(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, - ldlm_iterator_t iter, void *data) +void ldlm_change_cbdata(struct ldlm_namespace *ns, + struct ldlm_res_id *res_id, + ldlm_iterator_t iter, + void *data) { struct ldlm_resource *res; ENTRY; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index ac93ac6c78fa22ffac1f0fc5b88130584fbb68a8..9a88971d01b82e1d5a39932c4801b6d0e42c8c49 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -494,8 +494,8 @@ struct ldlm_resource * ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, struct ldlm_res_id name, __u32 type, int create) { - struct list_head *bucket, *tmp; struct ldlm_resource *res = NULL; + struct list_head *bucket, *tmp; ENTRY; LASSERT(ns != NULL); @@ -536,8 +536,8 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, rc = ns->ns_lvbo->lvbo_init(res); up(&res->lr_lvb_sem); if (rc) - CERROR("lvbo_init failed for resource "LPU64": rc %d\n", - name.name[0], rc); + CERROR("lvbo_init failed for resource " + LPU64": rc %d\n", name.name[0], rc); } else { out: l_unlock(&ns->ns_lock); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index e2f5453382dc7bc2fdb15c6db0bbfcbd542ea609..b522d89257716c7cf97e5661f0e158740fa8bac6 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1158,62 +1158,6 @@ out: return retval; } -static int ll_lov_recreate_obj(struct inode *inode, struct file *file, - unsigned long arg) -{ - struct ll_inode_info *lli = ll_i2info(inode); - struct obd_export *exp = ll_i2dtexp(inode); - struct ll_recreate_obj ucreatp; - struct obd_trans_info oti = { 0 }; - struct obdo *oa = NULL; - int lsm_size; - int rc = 0; - struct lov_stripe_md *lsm, *lsm2; - ENTRY; - - if (!capable (CAP_SYS_ADMIN)) - RETURN(-EPERM); - - rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg, - sizeof(struct ll_recreate_obj)); - if (rc) { - RETURN(-EFAULT); - } - oa = obdo_alloc(); - if (oa == NULL) - RETURN(-ENOMEM); - - down(&lli->lli_open_sem); - lsm = lli->lli_smd; - if (lsm == NULL) - GOTO(out, rc = -ENOENT); - lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) * - (lsm->lsm_stripe_count)); - - OBD_ALLOC(lsm2, lsm_size); - if (lsm2 == NULL) - GOTO(out, rc = -ENOMEM); - - oa->o_id = ucreatp.lrc_id; - oa->o_nlink = ucreatp.lrc_ost_idx; - oa->o_gr = ucreatp.lrc_group; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS; - oa->o_flags |= OBD_FL_RECREATE_OBJS; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | - OBD_MD_FLMTIME | OBD_MD_FLCTIME); - - oti.oti_objid = NULL; - memcpy(lsm2, lsm, lsm_size); - rc = obd_create(exp, oa, NULL, 0, &lsm2, &oti); - - OBD_FREE(lsm2, lsm_size); - GOTO(out, rc); -out: - up(&lli->lli_open_sem); - obdo_free(oa); - return rc; -} - static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, int flags, struct lov_user_md *lum, int lum_size) @@ -1309,12 +1253,12 @@ static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, } static int ll_lov_setea(struct inode *inode, struct file *file, - unsigned long arg) + unsigned long arg) { int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE; struct lov_user_md *lump; int lum_size = sizeof(struct lov_user_md) + - sizeof(struct lov_user_ost_data); + sizeof(struct lov_user_ost_data); int rc; ENTRY; @@ -1497,8 +1441,6 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, } case LL_IOC_LOV_GETSTRIPE: RETURN(ll_lov_getstripe(inode, arg)); - case LL_IOC_RECREATE_OBJ: - RETURN(ll_lov_recreate_obj(inode, file, arg)); case EXT3_IOC_GETFLAGS: case EXT3_IOC_SETFLAGS: RETURN( ll_iocontrol(inode, file, cmd, arg) ); @@ -1607,7 +1549,8 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) oa->o_id = lsm->lsm_object_id; oa->o_gr = lsm->lsm_object_gr; - oa->o_valid = OBD_MD_FLID; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLGROUP)); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 673b763250f1c8af3c06ed6028e8c605b8db0da1..25ae7e6b341f14a888e7cf2c3f8e7d90aaf1992c 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -489,33 +489,4 @@ ll_prepare_mdc_data(struct mdc_op_data *data, struct inode *i1, data->mod_time = LTIME_S(CURRENT_TIME); } -#if 0 -/* - * this was needed for catching correct calling place of ll_intent_alloc() with - * missed ll_intent_free() causing memory leak. --umka - */ -#define ll_intent_alloc(it) \ - ({ \ - int err; \ - OBD_SLAB_ALLOC((it)->d.fs_data, ll_intent_slab, SLAB_KERNEL, \ - sizeof(struct lustre_intent_data)); \ - if (!(it)->d.fs_data) { \ - err = -ENOMEM; \ - } else { \ - err = 0; \ - } \ - (it)->it_op_release = ll_intent_release; \ - err; \ - }) - -#define ll_intent_free(it) \ - do { \ - if ((it)->d.fs_data) { \ - OBD_SLAB_FREE((it)->d.fs_data, ll_intent_slab, \ - sizeof(struct lustre_intent_data)); \ - (it)->d.fs_data = NULL; \ - } \ - } while (0) -#endif - #endif /* LLITE_INTERNAL_H */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 6e0a11ae38115bacf9a78cf92f6f1736cffb4936..85508c79dd4d47e67d1270b4ad82a3b2abe6050c 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -141,9 +141,8 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov, struct obd_device *obd; struct obd_statfs osfs; struct lustre_md md; - kdev_t devno; - int err; __u32 valsize; + int err; ENTRY; obd = class_name2obd(lmv); @@ -212,11 +211,18 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov, sb->s_blocksize = osfs.os_bsize; sb->s_blocksize_bits = log2(osfs.os_bsize); sb->s_maxbytes = PAGE_CACHE_MAXBYTES; - - devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid, - strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid)); - sb->s_dev = devno; + /* in 2.6.x FS is not allowed to form s_dev */ +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) + { + kdev_t devno; + + devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid, + strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid)); + + sb->s_dev = devno; + } +#endif /* after statfs, we are supposed to have connected to MDSs, * so it's ok to check remote flag returned. @@ -1170,7 +1176,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* from sys_utime() */ if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { if (current->fsuid != inode->i_uid && - (rc=ll_permission(inode,MAY_WRITE,NULL))!=0) + (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0) RETURN(rc); } else { /* from inode_change_ok() */ @@ -1228,7 +1234,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) if (!rc) rc = err; } - } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) { + } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET | ATTR_UID | ATTR_GID)) { struct obdo *oa = NULL; CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n", @@ -1241,6 +1247,17 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) oa->o_id = lsm->lsm_object_id; oa->o_gr = lsm->lsm_object_gr; oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + if (ia_valid & ATTR_UID) { + oa->o_uid = inode->i_uid; + oa->o_valid |= OBD_MD_FLUID; + } + + if (ia_valid & ATTR_GID) { + oa->o_gid = inode->i_gid; + oa->o_valid |= OBD_MD_FLGID; + } + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL); @@ -1248,6 +1265,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) if (rc) CERROR("obd_setattr fails: rc = %d\n", rc); } + RETURN(rc); } diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 88b1cc0814494dff9c6a21251642e2baa43de575..26fc979b6b24c6835e4e13122c9c75d9b37f984b 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -180,8 +180,8 @@ out_unlock: up(&lli->lli_size_sem); } /* ll_truncate */ -int ll_prepare_write(struct file *file, struct page *page, unsigned from, - unsigned to) +int ll_prepare_write(struct file *file, struct page *page, + unsigned from, unsigned to) { struct inode *inode = page->mapping->host; struct ll_inode_info *lli = ll_i2info(inode); @@ -209,11 +209,22 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, oa->o_id = lsm->lsm_object_id; oa->o_gr = lsm->lsm_object_gr; oa->o_mode = inode->i_mode; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE | OBD_MD_FLGROUP; - rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode), oa, lsm, - 1, &pga, NULL); + /* + * needed for quota to create OSS object on write with correct + * owner/group. + */ + oa->o_uid = inode->i_uid; + oa->o_valid |= OBD_MD_FLUID; + + oa->o_gid = inode->i_gid; + oa->o_valid |= OBD_MD_FLGID; + + rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode), + oa, lsm, 1, &pga, NULL); if (rc) GOTO(out_free_oa, rc); @@ -317,9 +328,13 @@ static int ll_ap_refresh_count(void *data, int cmd) lli = ll_i2info(page->mapping->host); lsm = lli->lli_smd; - down(&lli->lli_size_sem); + /* + * this callback is called with client lock taken, thus, it should not + * sleep or deadlock is possible. --umka + */ +// down(&lli->lli_size_sem); kms = lov_merge_size(lsm, 1); - up(&lli->lli_size_sem); +// up(&lli->lli_size_sem); /* catch race with truncate */ if (((__u64)page->index << PAGE_SHIFT) >= kms) diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index ea1358dbc6cfa0bcba685c7c6c4c081403a9b820..ac36396cd73ed4729931e93551b19ff70419ee35 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1201,7 +1201,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_obj *obj; - int rc; + int rc, mds; ENTRY; rc = lmv_check_connect(obd); @@ -1210,25 +1210,31 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data, if (data->namelen != 0) { /* usual link request */ - obj = lmv_grab_obj(obd, &data->id1); + obj = lmv_grab_obj(obd, &data->id2); if (obj) { rc = raw_name2idx(obj->hashtype, obj->objcount, data->name, data->namelen); - data->id1 = obj->objs[rc].id; + data->id2 = obj->objs[rc].id; lmv_put_obj(obj); } + + mds = id_group(&data->id2); CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n", OLID4(&data->id2), data->namelen, data->name, OLID4(&data->id1)); } else { + mds = id_group(&data->id1); + /* request from MDS to acquire i_links for inode by id1 */ CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n", OLID4(&data->id1)); } - - rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp, - data, request); + + CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", + mds, OLID4(&data->id1)); + rc = md_link(lmv->tgts[mds].ltd_exp, data, request); + RETURN(rc); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 8b3c13ac814919aaa202e6fb0d836469198df902..ae74d2262fb2bae9cf4d3fa7ac19d935408cdaae 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -656,7 +656,8 @@ out: #define log2(n) ffz(~(n)) #endif -static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, +static int lov_clear_orphans(struct obd_export *export, + struct obdo *src_oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { @@ -682,12 +683,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, } for (i = 0; i < lov->desc.ld_tgt_count; i++) { + int err; struct lov_stripe_md obj_md; struct lov_stripe_md *obj_mdp = &obj_md; - int err; - /* if called for a specific target, we don't - care if it is not active. */ + /* + * if called for a specific target, we don't care if it is not + * active. + */ if (lov->tgts[i].active == 0 && ost_uuid == NULL) { CDEBUG(D_HA, "lov idx %d inactive\n", i); continue; @@ -696,16 +699,25 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid)) continue; + /* + * setting up objid OSS objects should be destroyed starting + * from it. + */ memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); + tmp_oa->o_valid |= OBD_MD_FLID; + tmp_oa->o_id = oti->oti_objid[i]; /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0, &obj_mdp, oti); - if (err) - /* This export will be disabled until it is recovered, - and then orphan recovery will be completed. */ + if (err) { + /* + * this export will be disabled until it is recovered, + * and then orphan recovery will be completed. + */ CERROR("error in orphan recovery on OST idx %d/%d: " "rc = %d\n", i, lov->desc.ld_tgt_count, err); + } if (ost_uuid) break; @@ -714,51 +726,11 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, RETURN(rc); } -static int lov_recreate(struct obd_export *exp, struct obdo *src_oa, - void *acl, int acl_size, - struct lov_stripe_md **ea, struct obd_trans_info *oti) -{ - struct lov_stripe_md *obj_mdp, *lsm; - struct lov_obd *lov = &exp->exp_obd->u.lov; - unsigned ost_idx; - int rc, i; - ENTRY; - - LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS && - src_oa->o_flags & OBD_FL_RECREATE_OBJS); - - OBD_ALLOC(obj_mdp, sizeof(*obj_mdp)); - if (obj_mdp == NULL) - RETURN(-ENOMEM); - - ost_idx = src_oa->o_nlink; - lsm = *ea; - if (lsm == NULL) - GOTO(out, rc = -EINVAL); - if (ost_idx >= lov->desc.ld_tgt_count) - GOTO(out, rc = -EINVAL); - - for (i = 0; i < lsm->lsm_stripe_count; i++) { - if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) { - if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id) - GOTO(out, rc = -EINVAL); - break; - } - } - if (i == lsm->lsm_stripe_count) - GOTO(out, rc = -EINVAL); - - rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size, - &obj_mdp, oti); -out: - OBD_FREE(obj_mdp, sizeof(*obj_mdp)); - RETURN(rc); -} - /* the LOV expects oa->o_id to be set to the LOV object id */ -static int lov_create(struct obd_export *exp, struct obdo *src_oa, - void *acl, int acl_size, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +static int +lov_create(struct obd_export *exp, struct obdo *src_oa, + void *acl, int acl_size, struct lov_stripe_md **ea, + struct obd_trans_info *oti) { struct lov_request_set *set = NULL; struct list_head *pos; @@ -780,13 +752,9 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, if (!lov->desc.ld_active_tgt_count) RETURN(-EIO); - /* Recreate a specific object id at the given OST index */ - if ((src_oa->o_valid & OBD_MD_FLFLAGS) && - (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) { - rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti); - RETURN(rc); - } - + LASSERT(oti->oti_flags & OBD_MODE_CROW); + + /* main creation loop */ rc = lov_prep_create_set(exp, ea, src_oa, oti, &set); if (rc) RETURN(rc); @@ -795,9 +763,21 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, struct lov_request *req = list_entry(pos, struct lov_request, rq_link); - /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ - rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa, - acl, acl_size, &req->rq_md, oti); + obd_id *objids = oti->oti_objid; + + if (oti->oti_obj_alloc) { + __u64 next_id; + + /* + * allocating new objid. Here it is delegated to caller, + * that is MDS in CROW case. + */ + next_id = oti->oti_obj_alloc(&objids[req->rq_idx]); + req->rq_oa->o_id = next_id; + } else { + /* and here is default "allocator" */ + req->rq_oa->o_id = ++objids[req->rq_idx]; + } lov_update_create_set(set, req, rc); } rc = lov_fini_create_set(set, ea); @@ -996,12 +976,6 @@ static int lov_setattr(struct obd_export *exp, struct obdo *src_oa, if (!exp || !exp->exp_obd) RETURN(-ENODEV); - /* for now, we only expect time updates here */ - LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE| - OBD_MD_FLATIME | OBD_MD_FLMTIME | - OBD_MD_FLCTIME | OBD_MD_FLFLAGS | - OBD_MD_FLSIZE | OBD_MD_FLGROUP))); - LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0); lov = &exp->exp_obd->u.lov; @@ -2081,21 +2055,6 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, #define KEY_IS(str) \ (keylen == strlen(str) && memcmp(key, str, keylen) == 0) - if (KEY_IS("next_id")) { - if (vallen != lov->desc.ld_tgt_count) - RETURN(-EINVAL); - for (i = 0; i < lov->desc.ld_tgt_count; i++) { - /* initialize all OSCs, even inactive ones */ - if (obd_uuid_empty(&lov->tgts[i].uuid)) - continue; - err = obd_set_info(lov->tgts[i].ltd_exp, - keylen, key, sizeof(obd_id), - ((obd_id*)val) + i); - if (!rc) - rc = err; - } - RETURN(rc); - } if (KEY_IS("async")) { struct lov_desc *desc = &lov->desc; struct lov_tgt_desc *tgts = lov->tgts; @@ -2131,10 +2090,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, RETURN(rc); } - if (KEY_IS("growth_count")) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - } else if (KEY_IS("mds_conn")) { + if (KEY_IS("mds_conn")) { if (vallen != sizeof(__u32)) RETURN(-EINVAL); } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) { diff --git a/lustre/lov/lov_qos.c b/lustre/lov/lov_qos.c index b8ac8fed7ff95f83ad60fe9106424147d7caf163..64cfa27566ce1432578bb48b6295016dbc4b729c 100644 --- a/lustre/lov/lov_qos.c +++ b/lustre/lov/lov_qos.c @@ -182,6 +182,3 @@ int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea) out: RETURN(rc); } - - - diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 9df75b657e2f24e0189dbcd85d7002cfb7983845..de66527db471809cd8bbfb7e97154fa1acaf3eba 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -591,10 +591,10 @@ int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea) int lov_update_create_set(struct lov_request_set *set, struct lov_request *req, int rc) { + struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; struct obd_trans_info *oti = set->set_oti; struct lov_stripe_md *lsm = set->set_md; struct lov_oinfo *loi; - struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; ENTRY; req->rq_stripe = set->set_success; @@ -658,7 +658,7 @@ int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea, /* If the MDS file was truncated up to some size, stripe over * enough OSTs to allow the file to be created at that size. */ if (src_oa->o_valid & OBD_MD_FLSIZE) { - stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1; + stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1; do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12)); if (stripes > lov->desc.ld_active_tgt_count) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 72ccf1cba2c480598caf2dcb5a0498363ff255da..f6ab6c1846e2f3d5c8dd57812bddd8b96efad0de 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -344,10 +344,19 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id, if (inode->i_ino != id_ino(&mds->mds_rootid) && generation && inode->i_generation != generation) { /* we didn't find the right inode.. */ - CERROR("bad inode %lu, link: %lu, ct: %d, generation %u/%u\n", - inode->i_ino, (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count), inode->i_generation, - generation); + if (id_group(id) != mds->mds_num) { + CERROR("bad inode %lu found, link: %lu, ct: %d, generation " + "%u != %u, mds %u != %u, request to wrong MDS?\n", + inode->i_ino, (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count), inode->i_generation, + generation, mds->mds_num, (unsigned)id_group(id)); + } else { + CERROR("bad inode %lu found, link: %lu, ct: %d, generation " + "%u != %u, inode is recreated while request handled?\n", + inode->i_ino, (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count), inode->i_generation, + generation); + } dput(result); RETURN(ERR_PTR(-ENOENT)); } @@ -868,16 +877,14 @@ int mds_get_md(struct obd_device *obd, struct inode *inode, RETURN(rc); } - /* Call with lock=1 if you want mds_pack_md to take the i_sem. * Call with lock=0 if the caller has already taken the i_sem. */ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, struct mds_body *body, struct inode *inode, int lock, int mea) { struct mds_obd *mds = &obd->u.mds; + int rc, lmm_size; void *lmm; - int lmm_size; - int rc; ENTRY; lmm = lustre_msg_buf(msg, offset, 0); @@ -902,11 +909,9 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, rc = mds_get_md(obd, inode, lmm, &lmm_size, lock, mea); if (rc > 0) { - if (S_ISDIR(inode->i_mode)) - body->valid |= OBD_MD_FLDIREA; - else - body->valid |= OBD_MD_FLEASIZE; - + body->valid |= S_ISDIR(inode->i_mode) ? + OBD_MD_FLDIREA : OBD_MD_FLEASIZE; + if (mea) body->valid |= OBD_MD_MEA; @@ -916,6 +921,7 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, RETURN(rc); } + int mds_pack_link(struct dentry *dentry, struct ptlrpc_request *req, struct mds_body *repbody, int reply_off) { @@ -3264,6 +3270,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) /* we have to know mdsnum before touching underlying fs -bzzz */ atomic_set(&mds->mds_open_count, 0); sema_init(&mds->mds_md_sem, 1); + sema_init(&mds->mds_create_sem, 1); mds->mds_md_connected = 0; mds->mds_md_name = NULL; @@ -3478,13 +3485,6 @@ int mds_postrecov_common(struct obd_device *obd) ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT); LASSERT(ctxt != NULL); - /* set nextid first, so we are sure it happens */ - rc = mds_dt_set_nextid(obd); - if (rc) { - CERROR("%s: mds_dt_set_nextid() failed\n", obd->obd_name); - GOTO(out, rc); - } - /* clean PENDING dir */ rc = mds_cleanup_orphans(obd); if (rc < 0) @@ -3493,8 +3493,8 @@ int mds_postrecov_common(struct obd_device *obd) group = FILTER_GROUP_FIRST_MDS + mds->mds_num; valsize = sizeof(group); - rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"), "mds_conn", - valsize, &group); + rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"), + "mds_conn", valsize, &group); if (rc) GOTO(out, rc); @@ -3507,7 +3507,7 @@ int mds_postrecov_common(struct obd_device *obd) } /* remove the orphaned precreated objects */ - rc = mds_dt_clearorphans(mds, NULL /* all OSTs */); + rc = mds_dt_clear_orphans(mds, NULL /* all OSTs */); if (rc) GOTO(err_llog, rc); diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 0e6dd0f2b1cc2fac62030cb56660d7e916e89541..796b70adbedfaf71266af053b6daf7ae77744648 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -140,10 +140,8 @@ int mds_dt_set_info(struct obd_export *exp, obd_count keylen, void *key, obd_count vallen, void *val); int mds_get_lovtgts(struct obd_device *, int tgt_count, struct obd_uuid *); int mds_dt_write_objids(struct obd_device *obd); -void mds_dt_update_objids(struct obd_device *obd, obd_id *ids); int mds_dt_set_growth(struct mds_obd *mds, int count); -int mds_dt_set_nextid(struct obd_device *obd); -int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid); +int mds_dt_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid); int mds_post_mds_lovconf(struct obd_device *obd); int mds_notify(struct obd_device *obd, struct obd_device *watched, int active, void *data); @@ -152,10 +150,17 @@ int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode, struct lov_mds_md *lmm, int lmm_size); int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode, struct lustre_msg *msg, int offset); +void mds_dt_update_objids(struct obd_device *obd, obd_id *ids); +void mds_dt_save_objids(struct obd_device *obd, obd_id *ids); /* mds/mds_open.c */ -int mds_destroy_objects(struct obd_device *obd, - struct inode *inode, int async); +int +mds_create_object(struct obd_device *obd, struct ptlrpc_request *req, + int offset, struct mds_update_record *rec, + struct dentry *dchild, void **handle, + obd_id *ids); +int mds_destroy_object(struct obd_device *obd, + struct inode *inode, int async); int mds_query_write_access(struct inode *inode); int mds_open(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *); diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index ce81bc0d59077937e4bcfb0f5e246ebb45d1b13d..a660e41e28e4b94458ba13b56224549739fcbb9b 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -781,10 +781,6 @@ static int filter_start_page_write(struct inode *inode, return 0; } -struct dentry *filter_id2dentry(struct obd_device *obd, - struct dentry *dir_dentry, - obd_gr group, obd_id id); - int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 5d13d0f8256a71f8393845ae2d63f88037bb2063..ced694c6fcd25a3263ceaee0b6f4f6bec23802d6 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -55,6 +55,19 @@ void cpu_to_le_lov_desc (struct lov_desc *ld) ld->ld_pattern = cpu_to_le32 (ld->ld_pattern); } +void mds_dt_save_objids(struct obd_device *obd, obd_id *ids) +{ + struct mds_obd *mds = &obd->u.mds; + int i; + ENTRY; + + spin_lock(&mds->mds_dt_lock); + for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) + ids[i] = mds->mds_dt_objids[i]; + spin_unlock(&mds->mds_dt_lock); + EXIT; +} + void mds_dt_update_objids(struct obd_device *obd, obd_id *ids) { struct mds_obd *mds = &obd->u.mds; @@ -63,8 +76,8 @@ void mds_dt_update_objids(struct obd_device *obd, obd_id *ids) spin_lock(&mds->mds_dt_lock); for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) - if (ids[i] > (mds->mds_dt_objids)[i]) - (mds->mds_dt_objids)[i] = ids[i]; + if (ids[i] > mds->mds_dt_objids[i]) + mds->mds_dt_objids[i] = ids[i]; spin_unlock(&mds->mds_dt_lock); EXIT; } @@ -72,14 +85,15 @@ void mds_dt_update_objids(struct obd_device *obd, obd_id *ids) static int mds_dt_read_objids(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; - obd_id *ids; + int i, rc, size; loff_t off = 0; - int i, rc, size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids); + obd_id *ids; ENTRY; if (mds->mds_dt_objids != NULL) RETURN(0); + size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids); OBD_ALLOC(ids, size); if (ids == NULL) RETURN(-ENOMEM); @@ -87,17 +101,19 @@ static int mds_dt_read_objids(struct obd_device *obd) if (mds->mds_dt_objid_filp->f_dentry->d_inode->i_size == 0) RETURN(0); + rc = fsfilt_read_record(obd, mds->mds_dt_objid_filp, ids, size, &off); if (rc < 0) { - CERROR("Error reading objids %d\n", rc); + CERROR("error reading objids %d\n", rc); } else { mds->mds_dt_objids_valid = 1; rc = 0; } - for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) - CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n", - mds->mds_dt_objids[i], i); + for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) { + CDEBUG(D_INFO, "read last object "LPU64 + " for idx %d\n", mds->mds_dt_objids[i], i); + } RETURN(rc); } @@ -105,25 +121,26 @@ static int mds_dt_read_objids(struct obd_device *obd) int mds_dt_write_objids(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; + int i, rc, size; loff_t off = 0; - int i, rc, size = mds->mds_dt_desc.ld_tgt_count * sizeof(obd_id); ENTRY; for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n", mds->mds_dt_objids[i], i); + size = mds->mds_dt_desc.ld_tgt_count * sizeof(obd_id); rc = fsfilt_write_record(obd, mds->mds_dt_objid_filp, mds->mds_dt_objids, size, &off, 0); RETURN(rc); } -int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) +int mds_dt_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) { - int rc; + struct lov_stripe_md *empty_ea = NULL; + struct obd_trans_info oti = { 0 }; struct obdo *oa = NULL; - struct obd_trans_info oti = {0}; - struct lov_stripe_md *empty_ea = NULL; + int rc; ENTRY; LASSERT(mds->mds_dt_objids != NULL); @@ -138,32 +155,27 @@ int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) RETURN(-ENOMEM); memset(oa, 0, sizeof(*oa)); + oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; oa->o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP; oa->o_flags = OBD_FL_DELORPHAN; if (ost_uuid != NULL) { - memcpy(&oa->o_inline, ost_uuid, sizeof(*ost_uuid)); + memcpy(&oa->o_inline, ost_uuid, + sizeof(*ost_uuid)); oa->o_valid |= OBD_MD_FLINLINE; } - rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &empty_ea, &oti); - obdo_free(oa); - RETURN(rc); -} -/* update the LOV-OSC knowledge of the last used object id's */ -int mds_dt_set_nextid(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - int rc; - ENTRY; - - LASSERT(!obd->obd_recovering); - - LASSERT(mds->mds_dt_objids != NULL); - - rc = obd_set_info(mds->mds_dt_exp, strlen("next_id"), "next_id", - mds->mds_dt_desc.ld_tgt_count, mds->mds_dt_objids); + /* + * passing current objids for letting data layer know last objids MDS + * knows about and do appropriate. --umka + */ + oti.oti_objid = mds->mds_dt_objids; + + rc = obd_create(mds->mds_dt_exp, oa, + NULL, 0, &empty_ea, &oti); + + obdo_free(oa); RETURN(rc); } @@ -226,8 +238,8 @@ static int mds_dt_update_desc(struct obd_device *obd, struct obd_export *lov) int mds_dt_connect(struct obd_device *obd, char *lov_name) { struct mds_obd *mds = &obd->u.mds; - struct lustre_handle conn = {0,}; - int rc, i; + struct lustre_handle conn = { 0 }; + int i, rc = 0; ENTRY; if (IS_ERR(mds->mds_dt_obd)) @@ -247,7 +259,8 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name) if (mds->mds_ost_sec) { rc = obd_set_info(mds->mds_dt_obd->obd_self_export, strlen("sec"), "sec", - strlen(mds->mds_ost_sec), mds->mds_ost_sec); + strlen(mds->mds_ost_sec), + mds->mds_ost_sec); if (rc) { mds->mds_dt_obd = ERR_PTR(rc); RETURN(rc); @@ -290,10 +303,13 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name) GOTO(err_reg, rc); } - /* If we're mounting this code for the first time on an existing FS, - * we need to populate the objids array from the real OST values */ + /* + * If we're mounting this code for the first time on an existing FS, we + * need to populate the objids array from the real OST values. + */ if (!mds->mds_dt_objids_valid) { __u32 size = sizeof(obd_id) * mds->mds_dt_desc.ld_tgt_count; + rc = obd_get_info(mds->mds_dt_exp, strlen("last_id"), "last_id", &size, mds->mds_dt_objids); if (!rc) { @@ -307,12 +323,12 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name) "writing objids file: %d\n", rc); } } - - /* I want to see a callback happen when the OBD moves to a - * "For General Use" state, and that's when we'll call - * set_nextid(). The class driver can help us here, because - * it can use the obd_recovering flag to determine when the - * the OBD is full available. */ + /* + * I want to see a callback happen when the OBD moves to a "For General + * Use" state, and that's when we'll call set_nextid(). The class driver + * can help us here, because it can use the obd_recovering flag to + * determine when the the OBD is full available. + */ if (!obd->obd_recovering) { CDEBUG(D_OTHER, "call mds_postrecov_common()\n"); rc = mds_postrecov_common(obd); @@ -325,8 +341,8 @@ err_reg: obd_register_observer(mds->mds_dt_obd, NULL); err_discon: obd_disconnect(mds->mds_dt_exp, 0); - mds->mds_dt_exp = NULL; mds->mds_dt_obd = ERR_PTR(rc); + mds->mds_dt_exp = NULL; return rc; } @@ -665,9 +681,9 @@ int mds_dt_synchronize(void *data) CWARN("MDS %s: %s now active, resetting orphans\n", obd->obd_name, uuid->uuid); - rc = mds_dt_clearorphans(&obd->u.mds, uuid); + rc = mds_dt_clear_orphans(&obd->u.mds, uuid); if (rc != 0) { - CERROR("%s: failed at mds_dt_clearorphans(): %d\n", + CERROR("%s: failed at mds_dt_clear_orphans(): %d\n", obd->obd_name, rc); GOTO(cleanup, rc); } diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index fac6e9312a2c8ad0d82d20150d62b293b0b01e39..2f45a66a8290d902c441cd641ba30bd9b7c23680 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -247,35 +247,34 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry, struct mds_obd *mds = mds_req2mds(req); struct mds_file_data *mfd; struct mds_body *body; - int error; + int rc = 0; ENTRY; mfd = mds_mfd_new(); if (mfd == NULL) { CERROR("mds: out of memory\n"); - GOTO(cleanup_dentry, error = -ENOMEM); + GOTO(cleanup_dentry, rc = -ENOMEM); } body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body)); if (flags & FMODE_WRITE) { /* FIXME: in recovery, need to pass old epoch here */ - error = mds_get_write_access(mds, dentry->d_inode, 0); - if (error) - GOTO(cleanup_mfd, error); + rc = mds_get_write_access(mds, dentry->d_inode, 0); + if (rc) + GOTO(cleanup_mfd, rc); #ifdef IFILTERDATA_ACTUALLY_USED body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch; -#endif /*IFILTERDATA_ACTUALLY_USED*/ +#endif } else if (flags & FMODE_EXEC) { - error = mds_deny_write_access(mds, dentry->d_inode); - if (error) - GOTO(cleanup_mfd, error); + rc = mds_deny_write_access(mds, dentry->d_inode); + if (rc) + GOTO(cleanup_mfd, rc); } dget(dentry); - /* Mark the file as open to handle open-unlink. */ - + /* mark the file as open to handle open-unlink. */ DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode); mds_orphan_open_inc(dentry->d_inode); UP_WRITE_I_ALLOC_SEM(dentry->d_inode); @@ -290,41 +289,50 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry, mds_mfd_put(mfd); body->handle.cookie = mfd->mfd_handle.h_cookie; - RETURN(mfd); - cleanup_mfd: mds_mfd_put(mfd); mds_mfd_destroy(mfd); cleanup_dentry: - return ERR_PTR(error); + return ERR_PTR(rc); } -static void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm, - struct lov_desc *desc) +/* this is object id allocation callback */ +static int mds_obj_alloc(obd_id *objid) +{ + ENTRY; + LASSERT(objid != NULL); + RETURN(++(*objid)); +} + +static inline void +mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm, + struct lov_desc *desc) { int i; + for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) { ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] = le64_to_cpu(lmm->lmm_objects[i].l_object_id); } } -/* Must be called with i_sem held */ -static int mds_create_objects(struct ptlrpc_request *req, int offset, - struct mds_update_record *rec, - struct mds_obd *mds, struct obd_device *obd, - struct dentry *dchild, void **handle, - obd_id **ids) +/* must be called with i_sem held */ +int +mds_create_object(struct obd_device *obd, struct ptlrpc_request *req, + int offset, struct mds_update_record *rec, + struct dentry *dchild, void **handle, + obd_id *ids) { - struct obdo *oa = NULL; + struct inode *inode = dchild->d_inode; + struct mds_obd *mds = &obd->u.mds; struct obd_trans_info oti = { 0 }; - struct mds_body *body; struct lov_stripe_md *lsm = NULL; struct lov_mds_md *lmm = NULL; - struct inode *inode = dchild->d_inode; - void *lmm_buf; int rc, lmm_bufsize, lmm_size; + struct obdo *oa = NULL; + struct mds_body *body; + void *lmm_buf; ENTRY; if (rec->ur_flags & MDS_OPEN_DELAY_CREATE || @@ -338,11 +346,8 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, if (body->valid & OBD_MD_FLEASIZE) RETURN(0); - OBD_ALLOC(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids)); - if (*ids == NULL) - RETURN(-ENOMEM); - oti.oti_objid = *ids; - + oti.oti_objid = ids; + /* replay case */ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { LASSERT(id_ino(rec->ur_id2)); @@ -352,19 +357,25 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, LASSERT(lmm); if (*handle == NULL) - *handle = fsfilt_start(obd,inode,FSFILT_OP_CREATE,NULL); + *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL); if (IS_ERR(*handle)) { rc = PTR_ERR(*handle); *handle = NULL; - GOTO(out_ids, rc); + RETURN(rc); } - mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc); + /* + * FIXME: this is evil layering violation, all things related to + * stripping should be done by LOV. --umka. + */ + mds_objids_from_lmm(ids, lmm, &mds->mds_dt_desc); lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0); lmm_bufsize = req->rq_repmsg->buflens[offset]; - LASSERT(lmm_buf); + + LASSERT(lmm_buf != NULL); LASSERT(lmm_bufsize >= lmm_size); + memcpy(lmm_buf, lmm, lmm_size); rc = fsfilt_set_md(obd, inode, *handle, lmm, lmm_size, EA_LOV); @@ -374,11 +385,11 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, } if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO)) - GOTO(out_ids, rc = -ENOMEM); + RETURN(-ENOMEM); oa = obdo_alloc(); if (oa == NULL) - GOTO(out_ids, rc = -ENOMEM); + RETURN(-ENOMEM); oa->o_mode = S_IFREG | 0600; oa->o_id = inode->i_ino; oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; @@ -389,8 +400,8 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP; oa->o_size = 0; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME| - OBD_MD_FLCTIME); + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); if (!(rec->ur_flags & MDS_OPEN_HAS_OBJS)) { /* check if things like lfs setstripe are sending us the ea */ @@ -415,15 +426,25 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, OBD_FREE(lmm, mds->mds_max_mdsize); if (rc) GOTO(out_oa, rc); - } + } + + /* + * create with CROW flag and base ids for allocating new ids on + * them. + */ + oti.oti_flags |= OBD_MODE_CROW; + oti.oti_obj_alloc = mds_obj_alloc; + LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS); rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti); + if (rc) { int level = D_ERROR; if (rc == -ENOSPC) level = D_INODE; - CDEBUG(level, "error creating objects for " - "inode %lu: rc = %d\n", + CDEBUG((rc == -ENOSPC ? D_INODE : D_ERROR), + "error creating objects for " + "inode %lu: rc = %d\n", inode->i_ino, rc); if (rc > 0) { CERROR("obd_create returned invalid " @@ -435,16 +456,17 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, } else { rc = obd_iocontrol(OBD_IOC_LOV_SETEA, mds->mds_dt_exp, 0, &lsm, rec->ur_eadata); - if (rc) { + if (rc) GOTO(out_oa, rc); - } + lsm->lsm_object_id = oa->o_id; lsm->lsm_object_gr = oa->o_gr; } if (inode->i_size) { oa->o_size = inode->i_size; - obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME| - OBD_MD_FLMTIME| OBD_MD_FLCTIME| OBD_MD_FLSIZE); + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE); + rc = obd_setattr(mds->mds_dt_exp, oa, lsm, &oti); if (rc) { CERROR("error setting attrs for inode %lu: rc %d\n", @@ -465,7 +487,11 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, rc = obd_packmd(mds->mds_dt_exp, &lmm, lsm); if (!id_ino(rec->ur_id2)) obd_free_memmd(mds->mds_dt_exp, &lsm); - LASSERT(rc >= 0); + if (rc < 0) { + CERROR("cannot pack lsm, err = %d\n", rc); + GOTO(out_oa, rc); + } + lmm_size = rc; body->eadatasize = rc; @@ -487,22 +513,18 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, memcpy(lmm_buf, lmm, lmm_size); obd_free_diskmd(mds->mds_dt_exp, &lmm); - out_oa: +out_oa: oti_free_cookies(&oti); obdo_free(oa); - out_ids: - if (rc) { - OBD_FREE(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids)); - *ids = NULL; - } - if(lsm) + + if (lsm) obd_free_memmd(mds->mds_dt_exp, &lsm); RETURN(rc); } int -mds_destroy_objects(struct obd_device *obd, - struct inode *inode, int async) +mds_destroy_object(struct obd_device *obd, + struct inode *inode, int async) { struct mds_obd *mds = &obd->u.mds; struct lov_mds_md *lmm = NULL; @@ -512,8 +534,8 @@ mds_destroy_objects(struct obd_device *obd, LASSERT(inode != NULL); if (inode->i_nlink != 0) { - CWARN("attempt to destroy OSS object when " - "i_nlink == %d\n", (int)inode->i_nlink); + CDEBUG(D_INODE, "attempt to destroy OSS object when " + "i_nlink == %d\n", (int)inode->i_nlink); RETURN(0); } @@ -696,10 +718,10 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, struct mds_body *body, int flags, void **handle, struct mds_update_record *rec, struct ldlm_reply *rep) { - struct mds_obd *mds = mds_req2mds(req); struct obd_device *obd = req->rq_export->exp_obd; + struct mds_obd *mds = mds_req2mds(req); struct mds_file_data *mfd = NULL; - obd_id *ids = NULL; /* object IDs created */ + obd_id *ids = NULL; unsigned mode; int rc = 0; ENTRY; @@ -707,6 +729,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, /* atomically create objects if necessary */ down(&dchild->d_inode->i_sem); mode = dchild->d_inode->i_mode; + if ((S_ISREG(mode) && !(body->valid & OBD_MD_FLEASIZE)) || (S_ISDIR(mode) && !(body->valid & OBD_MD_FLDIREA))) { rc = mds_pack_md(obd, req->rq_repmsg, 2, body, @@ -716,6 +739,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, RETURN(rc); } } + if (rec != NULL) { /* no EA: create objects */ if ((body->valid & OBD_MD_FLEASIZE) && @@ -723,16 +747,49 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, up(&dchild->d_inode->i_sem); RETURN(-EEXIST); } + if (!(body->valid & OBD_MD_FLEASIZE)) { - /* no EA: create objects */ - rc = mds_create_objects(req, 2, rec, mds, obd, - dchild, handle, &ids); + int ids_size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids); + + OBD_ALLOC(ids, ids_size); + if (ids == NULL) { + up(&dchild->d_inode->i_sem); + RETURN(-ENOMEM); + } + + /* + * synchronizing object creating to prevent another + * threads take the same base objid values. + */ + down(&mds->mds_create_sem); + + /* preparing base ids */ + mds_dt_save_objids(obd, ids); + + /* + * create objects, @ids will contain new allocated obj + * ids. + */ + rc = mds_create_object(obd, req, 2, rec, + dchild, handle, ids); if (rc) { - CERROR("mds_create_objects: rc = %d\n", rc); + CERROR("mds_create_object: rc = %d\n", rc); + up(&mds->mds_create_sem); up(&dchild->d_inode->i_sem); + OBD_FREE(ids, ids_size); RETURN(rc); } + + /* + * update MDS objids by new ones allocated in + * mds_create_object(). + */ + mds_dt_update_objids(obd, ids); + OBD_FREE(ids, ids_size); + + up(&mds->mds_create_sem); } + if (S_ISREG(dchild->d_inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) { rc = mds_revalidate_lov_ea(obd, dchild->d_inode, @@ -746,6 +803,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, } } } + rc = mds_pack_acl(obd, req->rq_repmsg, 3, body, dchild->d_inode); if (rc < 0) { CERROR("mds_pack_acl: rc = %d\n", rc); @@ -759,6 +817,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME | OBD_MD_FLMTIME); } + up(&dchild->d_inode->i_sem); intent_set_disposition(rep, DISP_OPEN_OPEN); @@ -768,12 +827,6 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild, CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd, mfd->mfd_handle.h_cookie); - if (ids != NULL) { - mds_dt_update_objids(obd, ids); - OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count); - } - //if (rc) - // mds_mfd_destroy(mfd); RETURN(rc); } @@ -949,8 +1002,8 @@ int mds_open(struct mds_update_record *rec, int offset, LASSERT(id_ino(rec->ur_id2)); - rc = mds_open_by_id(req, rec->ur_id2, body, rec->ur_flags, - rec, rep); + rc = mds_open_by_id(req, rec->ur_id2, body, + rec->ur_flags, rec, rep); if (rc != -ENOENT) { mds_body_do_reverse_map(med, body); RETURN(rc); @@ -1511,7 +1564,7 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset, reply_body->valid |= OBD_MD_FLCOOKIE; } - rc = mds_destroy_objects(obd, inode, 1); + rc = mds_destroy_object(obd, inode, 1); if (rc) { CERROR("cannot destroy OSS object on close, err %d\n", rc); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 88eef7b98926c08f7d224226455fe51ff458d976..4b769273d1a18a191fc15820276f8d21143404ee 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -499,35 +499,38 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, } else if (rec->ur_iattr.ia_valid & ATTR_EA_RM) { rc = -EOPNOTSUPP; if (inode->i_op && inode->i_op->removexattr) - rc = inode->i_op->removexattr(de, - rec->ur_eadata); - } else if ((S_ISREG(inode->i_mode) || - S_ISDIR(inode->i_mode)) && rec->ur_eadata != NULL) { + rc = inode->i_op->removexattr(de, rec->ur_eadata); + } else if (S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) { struct lov_stripe_md *lsm = NULL; struct lov_user_md *lum = NULL; - - rc = ll_permission(inode, MAY_WRITE, NULL); - if (rc < 0) - GOTO(cleanup, rc); - lum = rec->ur_eadata; - /* if lmm_stripe_size is -1 delete default stripe from dir */ - if (S_ISDIR(inode->i_mode) && - lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){ - rc = fsfilt_set_md(obd, inode, handle, NULL, 0, EA_LOV); - if (rc) - GOTO(cleanup, rc); - } else { - rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, mds->mds_dt_exp, - 0, &lsm, rec->ur_eadata); - if (rc) + if (rec->ur_eadata != NULL) { + rc = ll_permission(inode, MAY_WRITE, NULL); + if (rc < 0) GOTO(cleanup, rc); + + lum = rec->ur_eadata; + + /* if lmm_stripe_size is -1 delete default + * stripe from dir */ + if (S_ISDIR(inode->i_mode) && + lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){ + rc = fsfilt_set_md(obd, inode, handle, NULL, 0, EA_LOV); + if (rc) + GOTO(cleanup, rc); + } else { + rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, + mds->mds_dt_exp, 0, + &lsm, rec->ur_eadata); + if (rc) + GOTO(cleanup, rc); - obd_free_memmd(mds->mds_dt_exp, &lsm); - rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata, - rec->ur_eadatalen, EA_LOV); - if (rc) - GOTO(cleanup, rc); + obd_free_memmd(mds->mds_dt_exp, &lsm); + rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata, + rec->ur_eadatalen, EA_LOV); + if (rc) + GOTO(cleanup, rc); + } } } } @@ -2249,7 +2252,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, body->valid |= OBD_MD_FLCOOKIE; } - rc = mds_destroy_objects(obd, child_inode, 1); + rc = mds_destroy_object(obd, child_inode, 1); if (rc) { CERROR("can't remove OST object, err %d\n", rc); @@ -3507,7 +3510,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, body->valid |= OBD_MD_FLCOOKIE; } - rc = mds_destroy_objects(obd, old_inode, 1); + rc = mds_destroy_object(obd, old_inode, 1); if (rc) { CERROR("can't remove OST object, err %d\n", rc); diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index b1a171e74e98c47a6198eb41cd6f5d168f99abfb..c48088588ecee405151c2bdaa3b3138cdd6e0342 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -83,8 +83,10 @@ mds_unlink_object(struct mds_obd *mds, struct inode *inode, CDEBUG(D_INODE, "destroy OSS object %d/%d\n", (int)oa->o_id, (int)oa->o_gr); + + if (async) + oti.oti_flags |= OBD_MODE_ASYNC; - oti.oti_async = async; rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti); obdo_free(oa); out_free_memmd: diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 1f9aa77e366e4827aab9e2d10a1e73879ed1572c..05c54f867e41eae67db225fca09de304fb0a3655 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1388,6 +1388,7 @@ static int filter_post_fs_cleanup(struct obd_device *obd) RETURN(rc); } + #if 0 static int filter_group_set_fs_flags(struct obd_device *obd, int group) { @@ -1409,6 +1410,7 @@ static int filter_group_set_fs_flags(struct obd_device *obd, int group) RETURN(rc); } #endif + static int filter_post_fs_setup(struct obd_device *obd) { struct filter_obd *filter = &obd->u.filter; @@ -1686,7 +1688,7 @@ static int filter_connect_post(struct obd_export *exp, unsigned initial, char str[PTL_NALFMT_SIZE]; struct obd_llogs *llog; struct llog_ctxt *ctxt; - int rc; + int rc = 0; ENTRY; fed = &exp->exp_filter_data; @@ -1701,8 +1703,10 @@ static int filter_connect_post(struct obd_export *exp, unsigned initial, LASSERT(ctxt != NULL); rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); + portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number, exp->exp_connection->c_peer.peer_id.nid, str); + CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n", obd->obd_name, exp->exp_connection->c_peer.peer_id.nid, str, fed->fed_group); @@ -2045,6 +2049,7 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, { struct dentry *dchild = NULL; obd_gr group = 0; + ENTRY; if (oa->o_valid & OBD_MD_FLGROUP) group = oa->o_gr; @@ -2058,13 +2063,13 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, } if (dchild->d_inode == NULL) { - CERROR("%s: %s on non-existent object: "LPU64"\n", - obd->obd_name, what, oa->o_id); + CDEBUG(D_INFO, "%s: %s on non-existent object: " + LPU64"\n", obd->obd_name, what, oa->o_id); f_dput(dchild); RETURN(ERR_PTR(-ENOENT)); } - return dchild; + RETURN(dchild); } static int filter_getattr(struct obd_export *exp, struct obdo *oa, @@ -2094,79 +2099,106 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa, RETURN(rc); } -/* this is called from filter_truncate() until we have filter_punch() */ -static int filter_setattr(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, struct obd_trans_info *oti) +int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, + struct obdo *oa, struct obd_trans_info *oti) { - struct lvfs_run_ctxt saved; struct filter_obd *filter; - struct dentry *dentry; struct iattr iattr; - struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } }; - struct ldlm_resource *res; void *handle; - int rc, rc2; + int rc, err; ENTRY; - LASSERT(oti != NULL); - - dentry = filter_oa2dentry(exp->exp_obd, oa); - if (IS_ERR(dentry)) - RETURN(PTR_ERR(dentry)); - + LASSERT(dentry != NULL); + LASSERT(!IS_ERR(dentry)); + LASSERT(dentry->d_inode != NULL); + filter = &exp->exp_obd->u.filter; - iattr_from_obdo(&iattr, oa, oa->o_valid); - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - lock_kernel(); - if (iattr.ia_valid & ATTR_SIZE) down(&dentry->d_inode->i_sem); - handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR, - oti); + handle = fsfilt_start(exp->exp_obd, dentry->d_inode, + FSFILT_OP_SETATTR, oti); if (IS_ERR(handle)) GOTO(out_unlock, rc = PTR_ERR(handle)); /* XXX this could be a rwsem instead, if filter_preprw played along */ if (iattr.ia_valid & ATTR_ATTR_FLAG) - rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL, - EXT3_IOC_SETFLAGS, + rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, + NULL, EXT3_IOC_SETFLAGS, (long)&iattr.ia_attr_flags); else - rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1); + rc = fsfilt_setattr(exp->exp_obd, dentry, handle, + &iattr, 1); + rc = filter_finish_transno(exp, oti, rc); - rc2 = fsfilt_commit(exp->exp_obd, filter->fo_sb, dentry->d_inode, - handle, exp->exp_sync); - if (rc2) { - CERROR("error on commit, err = %d\n", rc2); + + err = fsfilt_commit(exp->exp_obd, filter->fo_sb, + dentry->d_inode, handle, + exp->exp_sync); + if (err) { + CERROR("error on commit, err = %d\n", err); if (!rc) - rc = rc2; + rc = err; } + EXIT; +out_unlock: + if (iattr.ia_valid & ATTR_SIZE) + up(&dentry->d_inode->i_sem); + return rc; +} + +/* this is called from filter_truncate() until we have filter_punch() */ +int filter_setattr(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *md, struct obd_trans_info *oti) +{ + struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } }; + struct ldlm_valblock_ops *ns_lvbo; + struct lvfs_run_ctxt saved; + struct filter_obd *filter; + struct ldlm_resource *res; + struct dentry *dentry; + int rc; + ENTRY; + + LASSERT(oti != NULL); + + filter = &exp->exp_obd->u.filter; + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + + /* make sure that object is allocated. */ + dentry = filter_crow_object(exp->exp_obd, + oa->o_gr, oa->o_id); + if (IS_ERR(dentry)) + GOTO(out_pop, rc = PTR_ERR(dentry)); + + lock_kernel(); + + /* setting objects attributes (including owner/group) */ + rc = filter_setattr_internal(exp, dentry, oa, oti); + if (rc) + GOTO(out_unlock, rc); res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL, res_id, LDLM_EXTENT, 0); + if (res != NULL) { - if (res->lr_namespace->ns_lvbo && - res->lr_namespace->ns_lvbo->lvbo_update) - rc = res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, - 0, 0); + ns_lvbo = res->lr_namespace->ns_lvbo; + if (ns_lvbo && ns_lvbo->lvbo_update) + rc = ns_lvbo->lvbo_update(res, NULL, 0, 0); ldlm_resource_putref(res); - } else if (iattr.ia_valid & ATTR_SIZE) { - /* called from MDS. */ } - + oa->o_valid = OBD_MD_FLID; obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); + EXIT; out_unlock: - if (iattr.ia_valid & ATTR_SIZE) - up(&dentry->d_inode->i_sem); unlock_kernel(); - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - f_dput(dentry); - RETURN(rc); +out_pop: + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + return rc; } /* XXX identical to osc_unpackmd */ @@ -2219,111 +2251,6 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(lsm_size); } -static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, - struct filter_obd *filter) -{ - struct obdo *doa = NULL; - __u64 last, id; - ENTRY; - - LASSERT(oa); - LASSERT(oa->o_gr != 0); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); - - doa = obdo_alloc(); - if (doa == NULL) { - CERROR("cannot allocate doa, error %d\n", - -ENOMEM); - EXIT; - return; - } - - doa->o_mode = S_IFREG; - doa->o_gr = oa->o_gr; - doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID); - - set_bit(doa->o_gr, &filter->fo_destroys_in_progress); - down(&filter->fo_create_locks[doa->o_gr]); - if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) { - CERROR("%s:["LPU64"] destroy_in_progress already cleared\n", - exp->exp_obd->obd_name, doa->o_gr); - up(&filter->fo_create_locks[doa->o_gr]); - GOTO(out_free_doa, 0); - } - - last = filter_last_id(filter, doa->o_gr); - CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "LPU64"\n", - exp->exp_obd->obd_name, doa->o_gr, oa->o_id + 1, last); - for (id = oa->o_id + 1; id <= last; id++) { - doa->o_id = id; - filter_destroy(exp, doa, NULL, NULL); - } - - CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "LPU64"\n", - exp->exp_obd->obd_name, doa->o_gr, oa->o_id); - - filter_set_last_id(filter, doa->o_gr, oa->o_id); - - clear_bit(doa->o_gr, &filter->fo_destroys_in_progress); - up(&filter->fo_create_locks[doa->o_gr]); - - EXIT; -out_free_doa: - obdo_free(doa); -} - -/* returns a negative error or a nonnegative number of files to create */ -static int filter_should_precreate(struct obd_export *exp, struct obdo *oa, - obd_gr group) -{ - struct obd_device *obd = exp->exp_obd; - struct filter_obd *filter = &obd->u.filter; - int diff, rc; - ENTRY; - - diff = oa->o_id - filter_last_id(filter, oa->o_gr); - CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n", - filter_last_id(filter, oa->o_gr), diff); - - /* delete orphans request */ - if ((oa->o_valid & OBD_MD_FLFLAGS) && - (oa->o_flags & OBD_FL_DELORPHAN)) { - if (diff >= 0) - RETURN(diff); - if (-diff > OST_MAX_PRECREATE) { - CERROR("ignoring bogus orphan destroy request: obdid " - LPU64" last_id "LPU64"\n", - oa->o_id, filter_last_id(filter, oa->o_gr)); - RETURN(-EINVAL); - } - filter_destroy_precreated(exp, oa, filter); - rc = filter_update_last_objid(obd, group, 0); - if (rc) - CERROR("unable to write lastobjid, but orphans" - "were deleted\n"); - RETURN(0); - } else { - /* only precreate if group == 0 and o_id is specfied */ - if (!(oa->o_valid & OBD_FL_DELORPHAN) && - (/*group != 0 ||*/ oa->o_id == 0)) - RETURN(1); - - LASSERTF(diff >= 0, LPU64" - "LPU64" = %d\n", oa->o_id, - filter_last_id(filter, oa->o_gr), diff); - RETURN(diff); - } -} -static int filter_precreate_rec(struct obd_device *obd, struct dentry *dentry, - int *number, struct obdo *oa) -{ - int rc; - ENTRY; - - rc = fsfilt_precreate_rec(obd, dentry, number, oa); - - RETURN(rc); -} - static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, unsigned long max_age) { @@ -2355,188 +2282,252 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, RETURN(rc); } -/* We rely on the fact that only one thread will be creating files in a given - * group at a time, which is why we don't need an atomic filter_get_new_id. - * Even if we had that atomic function, the following race would exist: - * - * thread 1: gets id x from filter_next_id - * thread 2: gets id (x + 1) from filter_next_id - * thread 2: creates object (x + 1) - * thread 1: tries to create object x, gets -ENOSPC - */ -static int filter_precreate(struct obd_device *obd, struct obdo *oa, - obd_gr group, int *num) +int filter_create_object(struct obd_device *obd, struct obdo *oa, + obd_gr group) { - struct dentry *dchild = NULL, *dparent = NULL; - int err = 0, rc = 0, recreate_obj = 0, i; + struct dentry *dparent = NULL; + struct dentry *dchild = NULL; struct filter_obd *filter; + struct obd_statfs *osfs; + int cleanup_phase = 0; + int err = 0, rc = 0; void *handle = NULL; void *lock = NULL; - struct obd_statfs *osfs; - unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3; - __u64 next_id; ENTRY; filter = &obd->u.filter; - if ((oa->o_valid & OBD_MD_FLFLAGS) && - (oa->o_flags & OBD_FL_RECREATE_OBJS)) { - recreate_obj = 1; - } else { - OBD_ALLOC(osfs, sizeof(*osfs)); - if (osfs == NULL) - RETURN(-ENOMEM); - rc = filter_statfs(obd, osfs, jiffies-HZ); - if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { - CDEBUG(D_HA, "OST out of space! avail "LPU64"\n", - osfs->os_bavail<<filter->fo_sb->s_blocksize_bits); - *num = 0; - rc = -ENOSPC; - } - OBD_FREE(osfs, sizeof(*osfs)); - if (rc) { - RETURN(rc); - } + OBD_ALLOC(osfs, sizeof(*osfs)); + if (osfs == NULL) + RETURN(-ENOMEM); + rc = filter_statfs(obd, osfs, jiffies - HZ); + if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { + CDEBUG(D_HA, "OST out of space! avail "LPU64"\n", + osfs->os_bavail << filter->fo_sb->s_blocksize_bits); + rc = -ENOSPC; } - - CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num); + OBD_FREE(osfs, sizeof(*osfs)); + if (rc) + RETURN(rc); down(&filter->fo_create_locks[group]); - for (i = 0; i < *num && err == 0; i++) { - int cleanup_phase = 0; + if (test_bit(group, &filter->fo_destroys_in_progress)) { + CWARN("%s: precreate aborted by destroy\n", + obd->obd_name); + GOTO(out, rc = -EALREADY); + } - if (test_bit(group, &filter->fo_destroys_in_progress)) { - CWARN("%s: precreate aborted by destroy\n", - obd->obd_name); - break; - } + CDEBUG(D_INFO, "precreate objid "LPU64"\n", oa->o_id); - if (recreate_obj) { - __u64 last_id; - next_id = oa->o_id; - last_id = filter_last_id(filter, group); - if (next_id > last_id) { - CERROR("Error: Trying to recreate obj greater" - "than last id "LPD64" > "LPD64"\n", - next_id, last_id); - GOTO(cleanup, rc = -EINVAL); - } - } else { - next_id = filter_last_id(filter, group) + 1; - } + dparent = filter_parent_lock(obd, group, oa->o_id, &lock); + if (IS_ERR(dparent)) + GOTO(cleanup, rc = PTR_ERR(dparent)); + cleanup_phase = 1; - CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id); - - dparent = filter_parent_lock(obd, group, next_id, &lock); - if (IS_ERR(dparent)) - GOTO(cleanup, rc = PTR_ERR(dparent)); - cleanup_phase = 1; - - /* precreate objects are not logged */ - fsfilt_set_fs_flags(obd, dparent->d_inode, SM_PRECREATE); - - dchild = filter_id2dentry(obd, dparent, group, next_id); - if (IS_ERR(dchild)) - GOTO(cleanup, rc = PTR_ERR(dchild)); - cleanup_phase = 2; - - if (dchild->d_inode != NULL) { - /* This would only happen if lastobjid was bad on disk*/ - /* Could also happen if recreating missing obj but - * already exists - */ - if (recreate_obj) { - CERROR("%s: recreating existing object %.*s?\n", - obd->obd_name, dchild->d_name.len, - dchild->d_name.name); - } else { - CERROR("%s: Serious error: objid %.*s already " - "exists; is this filesystem corrupt?\n", - obd->obd_name, dchild->d_name.len, - dchild->d_name.name); - LBUG(); - } - GOTO(cleanup, rc = -EEXIST); - } + dchild = filter_id2dentry(obd, dparent, group, oa->o_id); + if (IS_ERR(dchild)) + GOTO(cleanup, rc = PTR_ERR(dchild)); + cleanup_phase = 2; + + if (dchild->d_inode != NULL) { + CERROR("%s: serious error: objid %.*s already " + "exists; is this filesystem corrupted?\n", + obd->obd_name, dchild->d_name.len, + dchild->d_name.name); + GOTO(cleanup, rc = -EEXIST); + } + + handle = fsfilt_start_log(obd, dparent->d_inode, + FSFILT_OP_CREATE, NULL, 1); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); + cleanup_phase = 3; - handle = fsfilt_start_log(obd, dparent->d_inode, - FSFILT_OP_CREATE, NULL, 1); - if (IS_ERR(handle)) - GOTO(cleanup, rc = PTR_ERR(handle)); - cleanup_phase = 3; + rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL); + if (rc) { + CERROR("create failed rc = %d\n", rc); + GOTO(cleanup, rc); + } + + fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC); + + if (oa->o_id > filter_last_id(filter, group)) { + /* + * saving last created object id, it will be needed in recovery + * for deleting orphanes. + */ + filter_set_last_id(filter, group, oa->o_id); - rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL); + rc = filter_update_last_objid(obd, group, 0); if (rc) { - CERROR("create failed rc = %d\n", rc); - GOTO(cleanup, rc); + CERROR("unable to write lastobjid, but " + "orphans were deleted, err = %d\n", + rc); + rc = 0; } - - if (!recreate_obj) { - filter_set_last_id(filter, group, next_id); - err = filter_update_last_objid(obd, group, 0); - if (err) - CERROR("unable to write lastobjid " - "but file created\n"); + } +cleanup: + switch(cleanup_phase) { + case 3: + err = fsfilt_commit(obd, filter->fo_sb, + dparent->d_inode, handle, 0); + if (err) { + CERROR("error on commit, err = %d\n", err); + if (!rc) + rc = err; } - fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC); + case 2: + f_dput(dchild); + case 1: + filter_parent_unlock(dparent, lock); + case 0: + break; + } + + if (rc) + GOTO(out, rc); + +out: + up(&filter->fo_create_locks[group]); + RETURN(rc); +} + +struct dentry *filter_crow_object(struct obd_device *obd, + __u64 ogr, __u64 oid) +{ + struct dentry *dentry; + struct obdo *oa; + int rc = 0; + ENTRY; + + /* check if object is already allocated */ + dentry = filter_id2dentry(obd, NULL, ogr, oid); + if (IS_ERR(dentry)) + RETURN(dentry); + + if (dentry->d_inode) + RETURN(dentry); + + f_dput(dentry); - cleanup: - switch(cleanup_phase) { - case 3: - err = fsfilt_commit(obd, filter->fo_sb, - dparent->d_inode, handle, 0); - if (err) { - CERROR("error on commit, err = %d\n", err); - if (!rc) - rc = err; - } - case 2: - f_dput(dchild); - case 1: - filter_parent_unlock(dparent, lock); - case 0: - break; - } + /* allocate object as it does not exist */ + oa = obdo_alloc(); + if (oa == NULL) + RETURN(ERR_PTR(-ENOMEM)); - if (rc) - break; - if (time_after(jiffies, enough_time)) { - CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n", - obd->obd_name, *num, i); - break; - } + oa->o_id = oid; + oa->o_gr = ogr; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + + CDEBUG(D_INODE, "OSS object "LPU64"/"LPU64 + " does not exists - allocate now\n", + oid, ogr); + + rc = filter_create_object(obd, oa, oa->o_gr); + if (rc) { + CERROR("cannot create OSS object "LPU64"/"LPU64 + ", err = %d\n", oa->o_id, oa->o_gr, rc); + GOTO(out_free_oa, dentry = ERR_PTR(rc)); } - *num = i; + /* lookup for just created object and return it to caller */ + dentry = filter_id2dentry(obd, NULL, ogr, oid); + if (IS_ERR(dentry)) + GOTO(out_free_oa, dentry); + + if (dentry->d_inode == NULL) { + f_dput(dentry); + dentry = ERR_PTR(-ENOENT); + CERROR("cannot find just created OSS object " + LPU64"/"LPU64" err = %d\n", oid, + ogr, (int)PTR_ERR(dentry)); + GOTO(out_free_oa, dentry); + } - /* check if we have an error after ll_vfs_create(). It is possible that - * there will be say -ENOSPC and we will leak it. */ - if (rc == 0) - rc = filter_precreate_rec(obd, dparent, num, oa); + EXIT; +out_free_oa: + obdo_free(oa); + return dentry; +} - up(&filter->fo_create_locks[group]); +static int +filter_clear_orphans(struct obd_export *exp, struct obdo *oa) +{ + struct obd_device *obd = NULL; + struct filter_obd *filter; + struct obdo *doa = NULL; + int rc = 0, orphans; + __u64 last, id; + ENTRY; + + LASSERT(oa); + LASSERT(oa->o_gr != 0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); + + obd = exp->exp_obd; + filter = &obd->u.filter; - CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n", - obd->obd_name, group, filter->fo_last_objids[group]); + last = filter_last_id(filter, oa->o_gr); + orphans = last - oa->o_id; + + if (orphans <= 0) + RETURN(0); + + doa = obdo_alloc(); + if (doa == NULL) + RETURN(-ENOMEM); - CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n", - obd->obd_name, i); + doa->o_gr = oa->o_gr; + doa->o_mode = S_IFREG; + doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID); - RETURN(rc); + set_bit(doa->o_gr, &filter->fo_destroys_in_progress); + down(&filter->fo_create_locks[doa->o_gr]); + if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) { + CERROR("%s:["LPU64"] destroy_in_progress already cleared\n", + exp->exp_obd->obd_name, doa->o_gr); + up(&filter->fo_create_locks[doa->o_gr]); + GOTO(out_free_doa, 0); + } + + CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to " + LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, + oa->o_id + 1, last); + + for (id = oa->o_id + 1; id <= last; id++) { + doa->o_id = id; + filter_destroy(exp, doa, NULL, NULL); + } + + CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = " + LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id); + + filter_set_last_id(filter, oa->o_gr, oa->o_id); + clear_bit(doa->o_gr, &filter->fo_destroys_in_progress); + up(&filter->fo_create_locks[oa->o_gr]); + + EXIT; +out_free_doa: + obdo_free(doa); + return rc; } -static int filter_create(struct obd_export *exp, struct obdo *oa, - void *acl, int acl_size, - struct lov_stripe_md **ea, struct obd_trans_info *oti) +/* + * by now this function is only needed as entry point for deleting orphanes on + * OSS as objects are created on first write attempt. --umka + */ +static int +filter_create(struct obd_export *exp, struct obdo *oa, void *acl, + int acl_size, struct lov_stripe_md **ea, + struct obd_trans_info *oti) { + struct filter_export_data *fed; struct obd_device *obd = NULL; - struct filter_obd *filter; + int group = oa->o_gr, rc = 0; struct lvfs_run_ctxt saved; - struct lov_stripe_md *lsm = NULL; - struct filter_export_data *fed; + struct filter_obd *filter; char str[PTL_NALFMT_SIZE]; - int group = oa->o_gr, rc = 0, diff, recreate_objs = 0; ENTRY; LASSERT(acl == NULL && acl_size == 0); @@ -2549,19 +2540,14 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, RETURN(-EINVAL); } - if ((oa->o_valid & OBD_MD_FLFLAGS) && - (oa->o_flags & OBD_FL_RECREATE_OBJS)) - recreate_objs = 1; - obd = exp->exp_obd; fed = &exp->exp_filter_data; filter = &obd->u.filter; - if (fed->fed_group != group && !recreate_objs && - !(oa->o_valid & OBD_MD_REINT)) { + if (fed->fed_group != group) { portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number, exp->exp_connection->c_peer.peer_id.nid, str); - CERROR("!!! This export (nid "LPX64"/%s) used object group %d " + CERROR("!!! this export (nid "LPX64"/%s) used object group %d " "earlier; now it's trying to use group %d! This could " "be a bug in the MDS. Tell CFS.\n", exp->exp_connection->c_peer.peer_id.nid, str, @@ -2571,54 +2557,28 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n", group, oa->o_id); - if (ea != NULL) { - lsm = *ea; - if (lsm == NULL) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - RETURN(rc); - } - } obd = exp->exp_obd; push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - if (oa->o_valid & OBD_MD_REINT) { - int num = *((int*)oa->o_inline); - rc = filter_precreate(obd, oa, oa->o_gr, &num); - } else if (recreate_objs) { - if (oa->o_id > filter_last_id(&obd->u.filter, group)) { - CERROR("recreate objid "LPU64" > last id "LPU64"\n", - oa->o_id, filter_last_id(&obd->u.filter, group)); - rc = -EINVAL; - } else { - diff = 1; - rc = filter_precreate(obd, oa, group, &diff); - } + LASSERT((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags == OBD_FL_DELORPHAN)); + + rc = filter_clear_orphans(exp, oa); + if (rc) { + CERROR("cannot clear orphanes starting from " + LPU64", err = %d\n", oa->o_id, rc); } else { - diff = filter_should_precreate(exp, oa, group); - if (diff > 0) { - oa->o_id = filter_last_id(&obd->u.filter, group); - rc = filter_precreate(obd, oa, group, &diff); - oa->o_id = filter_last_id(&obd->u.filter, oa->o_gr); - oa->o_valid = OBD_MD_FLID; + rc = filter_update_last_objid(obd, group, 0); + if (rc) { + CERROR("unable to write lastobjid, but " + "orphans were deleted, err = %d\n", + rc); } } - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - if (rc && ea != NULL && *ea != lsm) { - obd_free_memmd(exp, &lsm); - } else if (rc == 0 && ea != NULL) { - /* XXX LOV STACKING: the lsm that is passed to us from - * LOV does not have valid lsm_oinfo data structs, so - * don't go touching that. This needs to be fixed in a - * big way. */ - lsm->lsm_object_id = oa->o_id; - lsm->lsm_object_gr = oa->o_gr; - *ea = lsm; - } - - RETURN(rc); + + RETURN(0); } static int filter_destroy(struct obd_export *exp, struct obdo *oa, diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 06b802ab6cbcfdbb8797895c6829355dfc4407a7..95583aeeb6ff83d8c0dcb7d4fe94afa78d5dba69 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -111,6 +111,18 @@ int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync); int filter_common_setup(struct obd_device *, obd_count len, void *buf, char *option); +struct dentry *filter_crow_object(struct obd_device *obd, __u64 ogr, + __u64 oid); + +int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, + struct obdo *oa, struct obd_trans_info *oti); + +int filter_setattr(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *md, struct obd_trans_info *oti); + +int filter_create_object(struct obd_device *obd, struct obdo *oa, + obd_gr group); + /* filter_lvb.c */ extern struct ldlm_valblock_ops filter_lvbo; diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 21d7464b0367c4a6750fa7541064bddda8d262f4..1c9cd4d2f64db09e3fc9f9290f6dd97508df7583 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -297,16 +297,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, if (rc) GOTO(cleanup, rc); - dentry = filter_oa2dentry(obd, oa); + dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); - } - inode = dentry->d_inode; fsfilt_check_slow(now, obd_timeout, "preprw_read setup"); @@ -318,13 +312,14 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, lnb->len = rnb->len; lnb->flags = rnb->flags; - if (inode->i_size <= rnb->offset) - /* If there's no more data, abort early. - * lnb->page == NULL and lnb->rc == 0, so it's - * easy to detect later. */ + if ((inode && inode->i_size <= rnb->offset) || inode == NULL) + /* + * if there's no more data, abort early. lnb->page == * + * NULL and lnb->rc == 0, so it's easy to detect later. + */ break; - else - rc = filter_alloc_dio_page(obd, inode, lnb); + + rc = filter_alloc_dio_page(obd, inode, lnb); if (rc) { CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"LPU64" %u/%u %p: rc %d\n", @@ -345,33 +340,37 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, fsfilt_check_slow(now, obd_timeout, "start_page_read"); - rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, - NULL, NULL, NULL); - if (rc) - GOTO(cleanup, rc); - - lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); + if (inode != NULL) { + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, + exp, NULL, NULL, NULL); + if (rc) + GOTO(cleanup, rc); + } + lprocfs_counter_add(obd->obd_stats, + LPROC_FILTER_READ_BYTES, tot_bytes); filter_tally_read(&exp->exp_obd->u.filter, res, niocount); EXIT; - cleanup: - if (rc != 0) { - filter_free_dio_pages(objcount, obj, niocount, res); - - if (dentry != NULL) - f_dput(dentry); - else - CERROR("NULL dentry in cleanup -- tell CFS\n"); + if (rc) { + filter_free_dio_pages(objcount, obj, + niocount, res); + /* + * in other cases (no errors) dentry is released in + * filter_commitrw_read(). + */ + f_dput(dentry); } if (iobuf != NULL) filter_free_iobuf(iobuf); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + if (rc) CERROR("io error %d\n", rc); + return rc; } @@ -498,15 +497,17 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, struct niobuf_local *res, struct obd_trans_info *oti) { + int rc = 0, i, tot_bytes = 0, cleanup_phase = 0; + struct niobuf_local *lnb = res; + struct dentry *dentry = NULL; + unsigned long now = jiffies; struct lvfs_run_ctxt saved; struct niobuf_remote *rnb; - struct niobuf_local *lnb = res; struct fsfilt_objinfo fso; - struct dentry *dentry = NULL; - void *iobuf; + struct obd_device *obd; obd_size left; - unsigned long now = jiffies; - int rc = 0, i, tot_bytes = 0, cleanup_phase = 0; + void *iobuf; + ENTRY; LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); @@ -518,26 +519,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, GOTO(cleanup, rc); cleanup_phase = 1; - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr, - obj->ioo_id); + obd = exp->exp_obd; + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + /* make sure that object is already allocated */ + dentry = filter_crow_object(obd, obj->ioo_gr, + obj->ioo_id); + if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); - + cleanup_phase = 2; - - if (dentry->d_inode == NULL) { - CERROR("trying to BRW to non-existent file "LPU64"\n", - obj->ioo_id); - GOTO(cleanup, rc = -ENOENT); - } + /* + * setting attrs passed along with write requests (owner/group). We + * goind it here as object should not exist with wrong owner/group as + * this may break quotas. --umka + */ + rc = filter_setattr_internal(exp, dentry, oa, NULL); + if (rc) { + CERROR("cannot set attrs on write, err %d\n", + rc); + GOTO(cleanup, rc); + } + fso.fso_dentry = dentry; fso.fso_bufcnt = obj->ioo_bufcnt; fsfilt_check_slow(now, obd_timeout, "preprw_write setup"); - spin_lock(&exp->exp_obd->obd_osfs_lock); + spin_lock(&obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); @@ -554,7 +565,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, * o_valid here. */ oa->o_valid = 0; - spin_unlock(&exp->exp_obd->obd_osfs_lock); + spin_unlock(&obd->obd_osfs_lock); if (rc) GOTO(cleanup, rc); @@ -569,7 +580,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, lnb->len = rnb->len; lnb->flags = rnb->flags; - rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb); + rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb); if (rc) { CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset, @@ -586,8 +597,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, * asked to read unmapped blocks -- brw_kiovec() does this. */ if (lnb->len != PAGE_SIZE) { if (lnb->offset + lnb->len < dentry->d_inode->i_size) { - filter_iobuf_add_page(exp->exp_obd, iobuf, - dentry->d_inode, + filter_iobuf_add_page(obd, iobuf, dentry->d_inode, lnb->page); } else { memset(kmap(lnb->page) + lnb->len, 0, @@ -604,7 +614,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, fsfilt_check_slow(now, obd_timeout, "start_page_write"); - lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES, + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes); EXIT; cleanup: @@ -613,18 +623,18 @@ cleanup: if (rc) filter_free_dio_pages(objcount, obj, niocount, res); case 3: - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); filter_free_iobuf(iobuf); case 2: - if (rc) + if (rc && dentry && !IS_ERR(dentry)) f_dput(dentry); break; case 1: - spin_lock(&exp->exp_obd->obd_osfs_lock); + spin_lock(&obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); - spin_unlock(&exp->exp_obd->obd_osfs_lock); - pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + spin_unlock(&obd->obd_osfs_lock); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); filter_free_iobuf(iobuf); break; default:; diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c index c54b1d7d54a9718785c1aa53863a6b47b4fb2ebf..500ff0730e62021762e5b5eebe832198a670be90 100644 --- a/lustre/obdfilter/filter_lvb.c +++ b/lustre/obdfilter/filter_lvb.c @@ -39,11 +39,12 @@ /* Called with res->lr_lvb_sem held */ static int filter_lvbo_init(struct ldlm_resource *res) { - int rc = 0; - struct obdo *oa = NULL; struct ost_lvb *lvb = NULL; + struct filter_obd *filter; struct obd_device *obd; struct dentry *dentry; + __u64 ogr, oid; + int rc = 0; ENTRY; LASSERT(res); @@ -65,38 +66,34 @@ static int filter_lvbo_init(struct ldlm_resource *res) res->lr_lvb_len = sizeof(*lvb); obd = res->lr_namespace->ns_lvbp; + filter = &obd->u.filter; LASSERT(obd != NULL); - oa = obdo_alloc(); - if (oa == NULL) - GOTO(out, rc = -ENOMEM); - - oa->o_id = res->lr_name.name[0]; - oa->o_gr = res->lr_name.name[2]; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; + oid = res->lr_name.name[0]; + ogr = res->lr_name.name[2]; - dentry = filter_oa2dentry(obd, oa); + dentry = filter_id2dentry(obd, NULL, ogr, oid); if (IS_ERR(dentry)) GOTO(out, rc = PTR_ERR(dentry)); - /* Limit the valid bits in the return data to what we actually use */ - oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; - obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS); - f_dput(dentry); - - lvb->lvb_size = dentry->d_inode->i_size; - lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime); - lvb->lvb_blocks = dentry->d_inode->i_blocks; + if (dentry->d_inode == NULL) { + lvb->lvb_size = 0; + lvb->lvb_blocks = 0; + lvb->lvb_mtime = LTIME_S(CURRENT_TIME); + } else { + lvb->lvb_size = dentry->d_inode->i_size; + lvb->lvb_blocks = dentry->d_inode->i_blocks; + lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime); + } CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", " - "mtime: "LPU64", blocks: "LPU64"\n", - res->lr_name.name[0], lvb->lvb_size, - lvb->lvb_mtime, lvb->lvb_blocks); + "mtime: "LPU64", blocks: "LPU64"\n", res->lr_name.name[0], + lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_blocks); - out: - if (oa) - obdo_free(oa); - /* Don't free lvb data on lookup error */ + f_dput(dentry); + EXIT; +out: + /* don't free lvb data on lookup error */ return rc; } @@ -110,11 +107,11 @@ static int filter_lvbo_init(struct ldlm_resource *res) static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m, int buf_idx, int increase) { - int rc = 0; - struct obdo *oa = NULL; struct ost_lvb *lvb = res->lr_lvb_data; struct obd_device *obd; + struct obdo *oa = NULL; struct dentry *dentry; + int rc = 0; ENTRY; LASSERT(res); diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index 4f0035d9173f3bfe6da75ea2072d1bbb30acb30a..cda20af048aa928324ea604cab3c6048b7ba7e9d 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -166,63 +166,6 @@ int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, int count, return rc; } -int osc_rd_create_count(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, "%d\n", - obd->u.cli.cl_oscc.oscc_grow_count); -} - -int osc_wr_create_count(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = data; - int val, rc; - - if (obd == NULL) - return 0; - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val < 0) - return -ERANGE; - - obd->u.cli.cl_oscc.oscc_grow_count = val; - - return count; -} - -int osc_rd_prealloc_next_id(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, LPU64"\n", - obd->u.cli.cl_oscc.oscc_next_id); -} - -int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = data; - - if (obd == NULL) - return 0; - - return snprintf(page, count, LPU64"\n", - obd->u.cli.cl_oscc.oscc_last_id); -} - static struct lprocfs_vars lprocfs_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "blocksize", lprocfs_rd_blksize, 0, 0 }, @@ -241,9 +184,6 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 }, { "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 }, { "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 }, - { "create_count", osc_rd_create_count, osc_wr_create_count, 0 }, - { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 }, - { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 }, { 0 } }; diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index a80cd1adf6f198f1a6cf9b3276ed4eba1f026491..722cfc70b984345480816f835f4b3be973f98649 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -56,213 +56,21 @@ #include <linux/obd_class.h> #include "osc_internal.h" -static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc) -{ - struct osc_creator *oscc; - struct ost_body *body = NULL; - ENTRY; - - if (req->rq_repmsg) { - body = lustre_swab_repbuf(req, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL && rc == 0) - rc = -EPROTO; - } - - oscc = req->rq_async_args.pointer_arg[0]; - spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_CREATING; - if (rc == -ENOSPC || rc == -EROFS) { - oscc->oscc_flags |= OSCC_FLAG_NOSPC; - if (body && rc == -ENOSPC) { - oscc->oscc_grow_count = OST_MIN_PRECREATE; - oscc->oscc_last_id = body->oa.o_id; - } - spin_unlock(&oscc->oscc_lock); - DEBUG_REQ(D_INODE, req, "OST out of space, flagging"); - } else if (rc != 0 && rc != -EIO) { - oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - oscc->oscc_grow_count = OST_MIN_PRECREATE; - spin_unlock(&oscc->oscc_lock); - DEBUG_REQ(D_ERROR, req, - "unknown rc %d from async create: failing oscc", rc); - ptlrpc_fail_import(req->rq_import, req->rq_import_generation); - } else { - if (rc == 0) { - oscc->oscc_flags &= ~OSCC_FLAG_LOW; - if (body) { - int diff = body->oa.o_id - oscc->oscc_last_id; - if (diff != oscc->oscc_grow_count) - oscc->oscc_grow_count = - max(diff/3, OST_MIN_PRECREATE); - oscc->oscc_last_id = body->oa.o_id; - } - } - spin_unlock(&oscc->oscc_lock); - } - - CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n", - oscc->oscc_last_id, oscc->oscc_next_id); - - wake_up(&oscc->oscc_waitq); - RETURN(rc); -} - -static int oscc_internal_create(struct osc_creator *oscc) -{ - struct ptlrpc_request *request; - struct ost_body *body; - int size = sizeof(*body); - ENTRY; - - spin_lock(&oscc->oscc_lock); - if (oscc->oscc_grow_count < OST_MAX_PRECREATE && - !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) && - (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <= - (oscc->oscc_grow_count / 4 + 1)) { - oscc->oscc_flags |= OSCC_FLAG_LOW; - oscc->oscc_grow_count *= 2; - } - - if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2) - oscc->oscc_grow_count = OST_MAX_PRECREATE / 2; - - if (oscc->oscc_flags & OSCC_FLAG_CREATING || - oscc->oscc_flags & OSCC_FLAG_RECOVERING) { - spin_unlock(&oscc->oscc_lock); - RETURN(0); - } - oscc->oscc_flags |= OSCC_FLAG_CREATING; - spin_unlock(&oscc->oscc_lock); - - request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import, - LUSTRE_OBD_VERSION, OST_CREATE, - 1, &size, NULL); - if (request == NULL) { - spin_lock(&oscc->oscc_lock); - oscc->oscc_flags &= ~OSCC_FLAG_CREATING; - spin_unlock(&oscc->oscc_lock); - RETURN(-ENOMEM); - } - - request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249 - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body)); - - spin_lock(&oscc->oscc_lock); - body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count; - /* probably we should take frequence of request into account? -bzzz */ - if (oscc->oscc_grow_count < oscc->oscc_max_grow_count) { - oscc->oscc_grow_count *= 2; - if (oscc->oscc_grow_count > oscc->oscc_max_grow_count) - oscc->oscc_grow_count = oscc->oscc_max_grow_count; - } - body->oa.o_gr = oscc->oscc_gr; - LASSERT(body->oa.o_gr > 0); - body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; - spin_unlock(&oscc->oscc_lock); - CDEBUG(D_INFO, "preallocating through id "LPU64" (last used "LPU64")\n", - body->oa.o_id, oscc->oscc_next_id); - - request->rq_replen = lustre_msg_size(1, &size); - - request->rq_async_args.pointer_arg[0] = oscc; - request->rq_interpret_reply = osc_interpret_create; - ptlrpcd_add_req(request); - - RETURN(0); -} - -static int oscc_has_objects(struct osc_creator *oscc, int count) -{ - int have_objs; - spin_lock(&oscc->oscc_lock); - have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count); - spin_unlock(&oscc->oscc_lock); - - if (!have_objs) - oscc_internal_create(oscc); - - return have_objs; -} - -static int oscc_wait_for_objects(struct osc_creator *oscc, int count) -{ - int have_objs; - int ost_full; - int osc_invalid; - - have_objs = oscc_has_objects(oscc, count); - - spin_lock(&oscc->oscc_lock); - ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC); - spin_unlock(&oscc->oscc_lock); - - osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid; - - return have_objs || ost_full || osc_invalid; -} - -static int oscc_precreate(struct osc_creator *oscc, int wait) -{ - struct l_wait_info lwi = { 0 }; - int rc = 0; - ENTRY; - - if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2)) - RETURN(0); - - if (!wait) - RETURN(0); - - /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */ - l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi); - - if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC)) - rc = -ENOSPC; - - if (oscc->oscc_obd->u.cli.cl_import->imp_invalid) - rc = -EIO; - - RETURN(rc); -} - -int oscc_recovering(struct osc_creator *oscc) -{ - int recov = 0; - - spin_lock(&oscc->oscc_lock); - recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING; - spin_unlock(&oscc->oscc_lock); - - return recov; -} - +/* this only is used now for deleting orphanes */ int osc_create(struct obd_export *exp, struct obdo *oa, - void *acl, int acl_size, - struct lov_stripe_md **ea, struct obd_trans_info *oti) + void *acl, int acl_size, struct lov_stripe_md **ea, + struct obd_trans_info *oti) { - struct lov_stripe_md *lsm; struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; - int try_again = 1, rc = 0; + int rc = 0; ENTRY; + LASSERT(oa); LASSERT(ea); - LASSERT(oa->o_valid & OBD_MD_FLGROUP); LASSERT(oa->o_gr > 0); + LASSERT(oa->o_valid & OBD_MD_FLGROUP); LASSERT(acl == NULL && acl_size == 0); - if ((oa->o_valid & OBD_MD_FLFLAGS) && - oa->o_flags == OBD_FL_RECREATE_OBJS) { - /* Exceptional case where we are trying to repair missing - * objects for various groups. We have already validated that - * this is a valid group for the file. Don't set oscc->oscc_gr. - */ - RETURN(osc_real_create(exp, oa, ea, oti)); - } - - LASSERT(oscc->oscc_gr == 0 || oscc->oscc_gr == oa->o_gr); - oscc->oscc_gr = oa->o_gr; - if (oa->o_gr == FILTER_GROUP_LLOG || oa->o_gr == FILTER_GROUP_ECHO) RETURN(osc_real_create(exp, oa, ea, oti)); @@ -284,10 +92,6 @@ int osc_create(struct obd_export *exp, struct obdo *oa, exp->exp_obd->obd_name); LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING); - /* delete from next_id on up */ - oa->o_valid |= OBD_MD_FLID; - oa->o_id = oscc->oscc_next_id - 1; - CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n", oscc->oscc_obd->obd_name, oa->o_id); @@ -303,83 +107,18 @@ int osc_create(struct obd_export *exp, struct obdo *oa, if (rc == -ENOSPC) oscc->oscc_flags |= OSCC_FLAG_NOSPC; oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; - oscc->oscc_last_id = oa->o_id; - CDEBUG(D_HA, "%s: oscc recovery finished: %d\n", oscc->oscc_obd->obd_name, rc); - wake_up(&oscc->oscc_waitq); - } else { CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n", oscc->oscc_obd->obd_name, rc); } spin_unlock(&oscc->oscc_lock); - RETURN(rc); } - lsm = *ea; - if (lsm == NULL) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc < 0) - RETURN(rc); - } - - while (try_again) { - /* If orphans are being recovered, then we must wait until - it is finished before we can continue with create. */ - if (oscc_recovering(oscc)) { - struct l_wait_info lwi; - - CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n", - oscc); - lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL); - rc = l_wait_event(oscc->oscc_waitq, - !oscc_recovering(oscc), &lwi); - - LASSERT(rc == 0 || rc == -ETIMEDOUT); - if (rc == -ETIMEDOUT) { - CDEBUG(D_HA,"%p: timeout waiting on recovery\n", - oscc); - RETURN(rc); - } - CDEBUG(D_HA, "%s: oscc recovery over, waking up\n", - exp->exp_obd->obd_name); - } - - spin_lock(&oscc->oscc_lock); - if (oscc->oscc_flags & OSCC_FLAG_EXITING) { - spin_unlock(&oscc->oscc_lock); - break; - } - - if (oscc->oscc_last_id >= oscc->oscc_next_id) { - memcpy(oa, &oscc->oscc_oa, sizeof(*oa)); - oa->o_id = oscc->oscc_next_id; - oa->o_gr = oscc->oscc_gr; - lsm->lsm_object_id = oscc->oscc_next_id; - lsm->lsm_object_gr = oscc->oscc_gr; - *ea = lsm; - oscc->oscc_next_id++; - try_again = 0; - } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) { - rc = -ENOSPC; - spin_unlock(&oscc->oscc_lock); - break; - } - spin_unlock(&oscc->oscc_lock); - rc = oscc_precreate(oscc, try_again); - if (rc) - break; - } - - if (rc == 0) - CDEBUG(D_HA, "%s: returning objid "LPU64"\n", - oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid, - lsm->lsm_object_id); - else if (*ea == NULL) - obd_free_memmd(exp, &lsm); - RETURN(rc); + LBUG(); + RETURN(0); } void oscc_init(struct obd_device *obd) @@ -390,19 +129,9 @@ void oscc_init(struct obd_device *obd) return; oscc = &obd->u.cli.cl_oscc; - memset(oscc, 0, sizeof(*oscc)); - INIT_LIST_HEAD(&oscc->oscc_list); - init_waitqueue_head(&oscc->oscc_waitq); - spin_lock_init(&oscc->oscc_lock); - oscc->oscc_obd = obd; - oscc->oscc_kick_barrier = 100; - oscc->oscc_max_grow_count = 2000; - oscc->oscc_grow_count = OST_MIN_PRECREATE; - oscc->oscc_next_id = 2; - oscc->oscc_last_id = 1; + oscc->oscc_obd = obd; + spin_lock_init(&oscc->oscc_lock); oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - /* XXX the export handle should give the oscc the last object */ - /* oed->oed_oscc.oscc_last_id = exph->....; */ } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index a2deb85435a1bd035518cd4e79bfb7b2c8c85b03..bd0924c990db3d953eb89fea2ba2802d8834a84e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -268,17 +268,21 @@ static int osc_setattr(struct obd_export *exp, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); - - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) - GOTO(out, rc = -EPROTO); + if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) { + ptlrpcd_add_req(request); + rc = 0; + } else { + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); - memcpy(oa, &body->oa, sizeof(*oa)); + body = lustre_swab_repbuf(request, 0, sizeof(*body), + lustre_swab_ost_body); + if (body == NULL) + GOTO(out, rc = -EPROTO); + memcpy(oa, &body->oa, sizeof(*oa)); + } EXIT; out: ptlrpc_req_finished(request); @@ -417,7 +421,8 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa, } static int osc_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_size start, obd_size end) + struct lov_stripe_md *md, obd_size start, + obd_size end) { struct ptlrpc_request *request; struct ost_body *body; @@ -492,8 +497,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, memcpy(&body->oa, oa, sizeof(*oa)); request->rq_replen = lustre_msg_size(1, &size); - if (oti != NULL && oti->oti_async) { - /* asynchrounous destroy */ + if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) { ptlrpcd_add_req(request); rc = 0; } else { @@ -1264,7 +1268,6 @@ static int brw_interpret_oap(struct ptlrpc_request *request, osc_wake_cache_waiters(cli); osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); obdo_free(aa->aa_oa); @@ -2867,26 +2870,6 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, int rc = 0; ENTRY; - if (keylen == strlen("next_id") && - memcmp(key, "next_id", strlen("next_id")) == 0) { - if (vallen != sizeof(obd_id)) - RETURN(-EINVAL); - obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1; - CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n", - exp->exp_obd->obd_name, - obd->u.cli.cl_oscc.oscc_next_id); - - RETURN(0); - } - - if (keylen == strlen("growth_count") && - memcmp(key, "growth_count", strlen("growth_count")) == 0) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - obd->u.cli.cl_oscc.oscc_max_grow_count = *((int*)val); - RETURN(0); - } - if (keylen == strlen("unlinked") && memcmp(key, "unlinked", keylen) == 0) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; @@ -2915,7 +2898,8 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) { + if (keylen == strlen("async") && + memcmp(key, "async", keylen) == 0) { struct client_obd *cl = &obd->u.cli; if (vallen != sizeof(int)) RETURN(-EINVAL); @@ -2951,16 +2935,18 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, RETURN(-EINVAL); } - if (keylen < strlen("mds_conn") || memcmp(key, "mds_conn", keylen) != 0) + if (keylen < strlen("mds_conn") || + memcmp(key, "mds_conn", keylen) != 0) RETURN(-EINVAL); - ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT); + ctxt = llog_get_context(&exp->exp_obd->obd_llogs, + LLOG_UNLINK_ORIG_CTXT); if (ctxt) { if (rc == 0) rc = llog_initiator_connect(ctxt); else - CERROR("cannot establish the connect for ctxt %p: %d\n", - ctxt, rc); + CERROR("cannot establish the connect for " + "ctxt %p: %d\n", ctxt, rc); } imp->imp_server_timeout = 1; @@ -2976,6 +2962,7 @@ static struct llog_operations osc_size_repl_logops = { }; static struct llog_operations osc_unlink_orig_logops; + static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, struct obd_device *tgt, int count, struct llog_catid *catid) @@ -3013,7 +3000,6 @@ static int osc_llog_finish(struct obd_device *obd, RETURN(rc); } - static int osc_connect(struct lustre_handle *exph, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, diff --git a/lustre/smfs/fsfilt.c b/lustre/smfs/fsfilt.c index a8c7254ea6efda96b7fdd5d5ea3103829feb3ce7..be036c878b1791d3feac18388b40e8df796d3e54 100644 --- a/lustre/smfs/fsfilt.c +++ b/lustre/smfs/fsfilt.c @@ -565,10 +565,6 @@ static int fsfilt_smfs_set_fs_flags(struct inode *inode, int flags) if (flags & SM_ALL_PLG) /* enable all plugins */ SMFS_SET(I2SMI(inode)->smi_flags, SMFS_PLG_ALL); - if (flags & SM_PRECREATE) /* disable logs for precreated objs */ - SMFS_CLEAR(I2SMI(inode)->smi_flags, SMFS_PLG_ALL); - - #if 0 if (SMFS_DO_COW(S2SMI(inode->i_sb)) && (flags & SM_DO_COW)) SMFS_SET_INODE_COW(inode); @@ -588,9 +584,6 @@ static int fsfilt_smfs_clear_fs_flags(struct inode *inode, int flags) */ if(flags & SM_ALL_PLG) /* disable all plugins */ SMFS_CLEAR(I2SMI(inode)->smi_flags, SMFS_PLG_ALL); - if (flags & SM_PRECREATE) /* enable log again */ - SMFS_SET(I2SMI(inode)->smi_flags, SMFS_PLG_ALL); - RETURN(rc); } diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index bce9e8471f0e0c0f460de784ea80122967b4b856..ccfbf750f364d098b810d253f092128fbff0f83d 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -1057,8 +1057,11 @@ test_51b() { mkdir -p $DIR/$tdir-2 multiop $DIR/$tdir-1/f O_c & pid=$! + # give multiop a chance to open - sleep 1 + # 1 second seems to be not enough, we met already such a cases + # --umka + sleep 5 do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000107" touch $DIR/${tdir}-2/f & diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 47e73c30d0e0918ee4c8027f95b8aba6bdb70189..48f9c8aece7f4c2516b31a5123222977271dc2f2 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -1921,7 +1921,7 @@ test_54a() { $SOCKETCLIENT $DIR/socket || error $MUNLINK $DIR/socket } -run_test 54a "unix damain socket test ==========================" +run_test 54a "unix domain socket test ==========================" test_54b() { f="$DIR/f54b"