diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 5b708d2bd2bfb15167aa1030441de52bb1f7349d..216a81a4495bb9c2877c52e1b1223a1be5d855b2 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -160,6 +160,21 @@ Details : When MGC is disconnected from MGS long enough, MGS will evict the MGC, and late on MGC cannot successfully connect to MGS and a lot of the error messages complaining that MGS is not connected. +Severity : major +Bugzilla : 15027 +Frequency : on network error +Description: panic with double free request if network error +Details : mdc_finish_enqueue is finish request if any network error ocuring, + but it's true only for synchronus enqueue, for async enqueue + (via ptlrpcd) this incorrect and ptlrpcd want finish request + himself. + +Severity : enhancement +Bugzilla : 11401 +Description: client-side metadata stat-ahead during readdir(directory readahead) +Details : perform client-side metadata stat-ahead when the client detects + readdir and sequential stat of dir entries therein + Severity : major Frequency : on start mds Bugzilla : 14884 diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 40f1caaf7d2088b439c2f58544e377733d2d88ad..710ee2e45b28f8a0e9c2a23d19b5c5076088353c 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1006,6 +1006,18 @@ enum obd_cleanup_stage { struct lu_context; +static inline int it_to_lock_mode(struct lookup_intent *it) +{ + /* CREAT needs to be tested before open (both could be set) */ + if (it->it_op & IT_CREAT) + return LCK_CW; + else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) + return LCK_CR; + + LASSERTF(0, "Invalid it_op: %d\n", it->it_op); + return -EINVAL; +} + struct md_op_data { struct lu_fid op_fid1; /* operation fid1 (usualy parent) */ struct lu_fid op_fid2; /* operation fid2 (usualy child) */ @@ -1049,6 +1061,22 @@ struct md_op_data { __u32 op_opc; }; +struct md_enqueue_info; +/* metadata stat-ahead */ +typedef int (* md_enqueue_cb_t)(struct ptlrpc_request *req, + struct md_enqueue_info *minfo, + int rc); + +struct md_enqueue_info { + struct md_op_data mi_data; + struct lookup_intent mi_it; + struct lustre_handle mi_lockh; + struct dentry *mi_dentry; + md_enqueue_cb_t mi_cb; + unsigned int mi_generation; + void *mi_cbdata; +}; + struct obd_ops { struct module *o_owner; int (*o_iocontrol)(unsigned int cmd, struct obd_export *exp, int len, @@ -1368,6 +1396,14 @@ struct md_ops { struct obd_capa *, __u32, struct ptlrpc_request **); + int (*m_intent_getattr_async)(struct obd_export *, + struct md_enqueue_info *, + struct ldlm_enqueue_info *); + + int (*m_revalidate_lock)(struct obd_export *, + struct lookup_intent *, + struct lu_fid *); + /* * NOTE: If adding ops, add another LPROCFS_MD_OP_INIT() line to * lprocfs_alloc_md_stats() in obdclass/lprocfs_status.c. Also, add a diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 76ce36bf271c9d7e489f279ea3296ec2155ade2f..f6ea1348fa44686df50c888ae9e49e7e0bd99d33 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -332,7 +332,7 @@ do { \ if (!OBT((exp)->exp_obd) || !MDP((exp)->exp_obd, op)) { \ CERROR("obd_" #op ": dev %s/%d no operation\n", \ (exp)->exp_obd->obd_name, \ - (exp)->exp_obd->obd_minor); \ + (exp)->exp_obd->obd_minor); \ RETURN(-EOPNOTSUPP); \ } \ } while (0) @@ -2000,6 +2000,31 @@ static inline int md_renew_capa(struct obd_export *exp, struct obd_capa *ocapa, RETURN(rc); } +static inline int md_intent_getattr_async(struct obd_export *exp, + struct md_enqueue_info *minfo, + struct ldlm_enqueue_info *einfo) +{ + int rc; + ENTRY; + EXP_CHECK_MD_OP(exp, intent_getattr_async); + EXP_MD_COUNTER_INCREMENT(exp, intent_getattr_async); + rc = MDP(exp->exp_obd, intent_getattr_async)(exp, minfo, einfo); + RETURN(rc); +} + +static inline int md_revalidate_lock(struct obd_export *exp, + struct lookup_intent *it, + struct lu_fid *fid) +{ + int rc; + ENTRY; + EXP_CHECK_MD_OP(exp, revalidate_lock); + EXP_MD_COUNTER_INCREMENT(exp, revalidate_lock); + rc = MDP(exp->exp_obd, revalidate_lock)(exp, it, fid); + RETURN(rc); +} + + /* OBD Metadata Support */ extern int obd_init_caches(void); diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 71c5046830d414ab51a85903cacc8697d815bb39..26e5eb76dcbea1b853d3084519aa88c94d927d41 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -246,6 +246,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_MDC_REVALIDATE_PAUSE 0x800 #define OBD_FAIL_MDC_ENQUEUE_PAUSE 0x801 +#define OBD_FAIL_MDC_GETATTR_ENQUEUE 0x803 #define OBD_FAIL_MGS 0x900 #define OBD_FAIL_MGS_ALL_REQUEST_NET 0x901 diff --git a/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch b/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch index ee655c32d0e3811647ab731bb909d83eb9128477..66e65fb09a82769cdea2055c7bb2b5ed712ba9b4 100644 --- a/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch +++ b/lustre/kernel_patches/patches/vfs_intent-2.6-rhel4.patch @@ -1205,8 +1205,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c int error; + intent_init(&nd.intent, IT_GETATTR); - error = user_path_walk(name, &nd); - if (!error) { +- error = user_path_walk(name, &nd); ++ error = user_path_walk_it(name, &nd); + if (!error) { - error = vfs_getattr64(nd.mnt, nd.dentry, stat); + error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat); path_release(&nd); @@ -1218,8 +1219,9 @@ diff -rup RH_2_6_9_55.orig/fs/stat.c RH_2_6_9_55/fs/stat.c int error; + intent_init(&nd.intent, IT_GETATTR); - error = user_path_walk_link(name, &nd); - if (!error) { +- error = user_path_walk_link(name, &nd); ++ error = user_path_walk_link_it(name, &nd); + if (!error) { - error = vfs_getattr64(nd.mnt, nd.dentry, stat); + error = vfs_getattr64_it(nd.mnt, nd.dentry, &nd.intent, stat); path_release(&nd); diff --git a/lustre/llite/Makefile.in b/lustre/llite/Makefile.in index 5cfb43168e7b5b043881063ff7dd2523249b1982..8d02c85f1addbd6beba271b7b2326dcc65f24843 100644 --- a/lustre/llite/Makefile.in +++ b/lustre/llite/Makefile.in @@ -2,7 +2,7 @@ MODULES := lustre llite_lloop lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o lustre-objs += llite_fid.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o lustre-objs += xattr.o remote_perm.o llite_rmtacl.o llite_capa.o -lustre-objs += rw26.o super25.o +lustre-objs += rw26.o super25.o statahead.o llite_lloop-objs := lloop.o diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 878fd8e46c1235adaa3e59e072435d6aaf2a56d1..4d8b687796b65ec9f78d639cfc24f0f06fe74374 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -120,11 +120,21 @@ void ll_set_dd(struct dentry *de) CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n", de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode, atomic_read(&de->d_count)); - lock_kernel(); + if (de->d_fsdata == NULL) { - OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data)); + struct ll_dentry_data *lld; + + OBD_ALLOC_PTR(lld); + if (likely(lld != NULL)) { + cfs_waitq_init(&lld->lld_waitq); + lock_dentry(de); + if (likely(de->d_fsdata == NULL)) + de->d_fsdata = lld; + else + OBD_FREE_PTR(lld); + unlock_dentry(de); + } } - unlock_kernel(); EXIT; } @@ -332,12 +342,12 @@ void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft) int ll_revalidate_it(struct dentry *de, int lookup_flags, struct lookup_intent *it) { - int rc; struct md_op_data *op_data; struct ptlrpc_request *req = NULL; struct lookup_intent lookup_it = { .it_op = IT_LOOKUP }; struct obd_export *exp; struct inode *parent; + int rc, first = 0; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name, @@ -359,7 +369,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, rc = ll_have_md_lock(de->d_parent->d_inode, MDS_INODELOCK_UPDATE); - RETURN(rc); + GOTO(out_sa, rc); } exp = ll_i2mdexp(de->d_inode); @@ -367,12 +377,12 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, /* Never execute intents for mount points. * Attributes will be fixed up in ll_inode_revalidate_it */ if (d_mountpoint(de)) - RETURN(1); + GOTO(out_sa, rc = 1); /* Root of the lustre tree. Always valid. * Attributes will be fixed up in ll_inode_revalidate_it */ if (de == de->d_sb->s_root) - RETURN(1); + GOTO(out_sa, rc = 1); OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5); ll_frob_intent(&it, &lookup_it); @@ -434,6 +444,9 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, } } + if (it->it_op == IT_GETATTR) + first = ll_statahead_enter(de->d_parent->d_inode, &de, 0); + do_lock: it->it_create_mode &= ~current->fs->umask; it->it_flags |= O_CHECK_STALE; @@ -442,6 +455,9 @@ do_lock: &req, ll_md_blocking_ast, 0); it->it_flags &= ~O_CHECK_STALE; ll_finish_md_op_data(op_data); + if (it->it_op == IT_GETATTR && !first) + ll_statahead_exit(de, rc); + /* If req is NULL, then md_intent_lock only tried to do a lock match; * if all was well, it will return 1 if it found locks, 0 otherwise. */ if (req == NULL && rc >= 0) { @@ -564,6 +580,19 @@ do_lookup: } ll_finish_md_op_data(op_data); GOTO(out, rc = 0); + +out_sa: + /* + * For rc == 1 case, should not return directly to prevent losing + * statahead windows; for rc == 0 case, the "lookup" will be done later. + */ + if (it && it->it_op == IT_GETATTR && rc == 1) { + first = ll_statahead_enter(de->d_parent->d_inode, &de, 0); + if (!first) + ll_statahead_exit(de, rc); + } + + return rc; } /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag) @@ -747,3 +776,45 @@ struct dentry_operations ll_d_ops = { .d_unpin = ll_unpin, #endif }; + +static int ll_fini_revalidate_nd(struct dentry *dentry, struct nameidata *nd) +{ + ENTRY; + /* need lookup */ + RETURN(0); +} + +struct dentry_operations ll_fini_d_ops = { + .d_revalidate = ll_fini_revalidate_nd, + .d_release = ll_release, +}; + +/* + * It is for the following race condition: + * When someone (maybe statahead thread) adds the dentry to the dentry hash + * table, the dentry's "d_op" maybe NULL, at the same time, another (maybe + * "ls -l") process finds such dentry by "do_lookup()" without "do_revalidate()" + * called. It causes statahead window lost, and maybe other issues. --Fan Yong + */ +static int ll_init_revalidate_nd(struct dentry *dentry, struct nameidata *nd) +{ + struct l_wait_info lwi = { 0 }; + struct ll_dentry_data *lld; + ENTRY; + + ll_set_dd(dentry); + lld = ll_d2d(dentry); + if (unlikely(lld == NULL)) + RETURN(-ENOMEM); + + l_wait_event(lld->lld_waitq, dentry->d_op != &ll_init_d_ops, &lwi); + if (likely(dentry->d_op == &ll_d_ops)) + RETURN(ll_revalidate_nd(dentry, nd)); + else + RETURN(dentry->d_op == &ll_fini_d_ops ? 0 : -EINVAL); +} + +struct dentry_operations ll_init_d_ops = { + .d_revalidate = ll_init_revalidate_nd, + .d_release = ll_release, +}; diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index f88f514f0b41a49cd680111e85895cc986a39d06..dc8a211b17c507c86742d1a66f1eaf3b4c558da1 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -27,7 +27,6 @@ */ #include <linux/fs.h> -#include <linux/ext2_fs.h> #include <linux/pagemap.h> #include <linux/mm.h> #include <linux/version.h> @@ -144,7 +143,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) int rc; ENTRY; - hash = hash_x_index(page->index); + hash = (__u64)hash_x_index(page->index); CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) off %lu\n", inode->i_ino, inode->i_generation, inode, (unsigned long)hash); @@ -175,32 +174,33 @@ struct address_space_operations ll_dir_aops = { .readpage = ll_dir_readpage, }; -static inline unsigned long dir_pages(struct inode *inode) -{ - return (i_size_read(inode) + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; -} - -static inline unsigned ll_chunk_size(struct inode *inode) -{ - return inode->i_sb->s_blocksize; -} - static void ll_check_page(struct inode *dir, struct page *page) { /* XXX: check page format later */ SetPageChecked(page); } -static inline void ll_put_page(struct page *page) +static void ll_release_page(struct page *page, __u64 hash, + __u64 start, __u64 end) { kunmap(page); + lock_page(page); + if (likely(page->mapping != NULL)) { + ll_truncate_complete_page(page); + unlock_page(page); + } else { + unlock_page(page); + CWARN("NULL mapping page %p, truncated by others: " + "hash(%#llx) | start(%#llx) | end(%#llx)\n", + page, hash, start, end); + } page_cache_release(page); } /* * Find, kmap and return page that contains given hash. */ -static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, +static struct page *ll_dir_page_locate(struct inode *dir, __u64 hash, __u64 *start, __u64 *end) { struct address_space *mapping = dir->i_mapping; @@ -209,17 +209,17 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, * radix_tree_gang_lookup() can be used to find a page with starting * hash _smaller_ than one we are looking for. */ - unsigned long offset = hash_x_index(hash); + unsigned long offset = hash_x_index((__u32)hash); struct page *page; int found; TREE_READ_LOCK_IRQ(mapping); - found = radix_tree_gang_lookup(&mapping->page_tree, + found = radix_tree_gang_lookup(&mapping->page_tree, (void **)&page, offset, 1); - if (found > 0) { + if (found > 0) { struct lu_dirpage *dp; - page_cache_get(page); + page_cache_get(page); TREE_READ_UNLOCK_IRQ(mapping); /* * In contrast to find_lock_page() we are sure that directory @@ -236,11 +236,7 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, *end = le64_to_cpu(dp->ldp_hash_end); LASSERT(*start <= hash); if (hash > *end || (*end != *start && hash == *end)) { - kunmap(page); - lock_page(page); - ll_truncate_complete_page(page); - unlock_page(page); - page_cache_release(page); + ll_release_page(page, hash, *start, *end); page = NULL; } } else { @@ -248,15 +244,15 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash, page = ERR_PTR(-EIO); } - } else { + } else { TREE_READ_UNLOCK_IRQ(mapping); page = NULL; } return page; } -static struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, - struct ll_dir_chain *chain) +struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, + struct ll_dir_chain *chain) { ldlm_policy_data_t policy = {.l_inodebits = {MDS_INODELOCK_UPDATE} }; struct address_space *mapping = dir->i_mapping; @@ -278,7 +274,7 @@ static struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, struct ptlrpc_request *request; struct md_op_data *op_data; - op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0, + op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) return (void *)op_data; @@ -328,17 +324,15 @@ static struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, * entries with smaller hash values. Stale page should * be invalidated, and new one fetched. */ - CWARN("Stale readpage page %p: %#lx != %#lx\n", page, - (unsigned long)hash, (unsigned long)start); - lock_page(page); - ll_truncate_complete_page(page); - unlock_page(page); - page_cache_release(page); - } else + CWARN("Stale readpage page %p: %#llx != %#llx\n", + page, hash, start); + ll_release_page(page, hash, start, end); + } else { GOTO(hash_collision, page); + } } - page = read_cache_page(mapping, hash_x_index(hash), + page = read_cache_page(mapping, hash_x_index((__u32)hash), (filler_t*)mapping->a_ops->readpage, NULL); if (IS_ERR(page)) GOTO(out_unlock, page); @@ -411,9 +405,9 @@ int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) struct lu_dirent *ent; if (!IS_ERR(page)) { - /* + /* * If page is empty (end of directoryis reached), - * use this value. + * use this value. */ __u64 hash = DIR_END_OFF; __u64 next; @@ -610,8 +604,8 @@ end: return rc; } -int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp, - int *lmm_size, struct ptlrpc_request **request) +int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp, + int *lmm_size, struct ptlrpc_request **request) { struct ll_sb_info *sbi = ll_i2sbi(inode); struct mdt_body *body; @@ -619,7 +613,7 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmmp, struct ptlrpc_request *req = NULL; int rc, lmmsize; struct obd_capa *oc; - + rc = ll_get_max_mdsize(sbi, &lmmsize); if (rc) RETURN(rc); @@ -768,7 +762,7 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file, if (IS_ERR(filename)) RETURN(PTR_ERR(filename)); - rc = ll_lov_getstripe_ea_info(inode, filename, &lmm, + rc = ll_lov_getstripe_ea_info(inode, filename, &lmm, &lmmsize, &request); } else { rc = ll_dir_getstripe(inode, &lmm, &lmmsize, &request); @@ -783,7 +777,7 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file, } if (rc < 0) { - if (rc == -ENODATA && (cmd == IOC_MDC_GETFILEINFO || + if (rc == -ENODATA && (cmd == IOC_MDC_GETFILEINFO || cmd == LL_IOC_MDC_GETINFO)) GOTO(skip_lmm, rc = 0); else diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 32360dc1e655973f14d46b7a28cfbb018d67cfcd..7d1765aac858d677ef64ba5ab40a30bc9c93bff3 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -300,11 +300,17 @@ int ll_file_release(struct inode *inode, struct file *file) } #endif - ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1); + if (inode->i_sb->s_root != file->f_dentry) + ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1); fd = LUSTRE_FPRIVATE(file); LASSERT(fd != NULL); - /* don't do anything for / */ + /* The last ref on @file, maybe not the the owner pid of statahead. + * Different processes can open the same dir, "ll_opendir_key" means: + * it is me that should stop the statahead thread. */ + if (lli->lli_opendir_key == fd) + ll_stop_statahead(inode, fd); + if (inode->i_sb->s_root == file->f_dentry) { LUSTRE_FPRIVATE(file) = NULL; ll_file_data_put(fd); @@ -329,6 +335,7 @@ static int ll_intent_file_open(struct file *file, void *lmm, struct md_op_data *op_data; struct ptlrpc_request *req; int rc; + ENTRY; if (!parent) RETURN(-ENOENT); @@ -465,7 +472,7 @@ int ll_file_open(struct inode *inode, struct file *file) struct obd_client_handle **och_p; __u64 *och_usecount; struct ll_file_data *fd; - int rc = 0; + int rc = 0, opendir_set = 0; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino, @@ -482,7 +489,29 @@ int ll_file_open(struct inode *inode, struct file *file) if (fd == NULL) RETURN(-ENOMEM); - /* don't do anything for / */ + if (S_ISDIR(inode->i_mode)) { + spin_lock(&lli->lli_lock); + /* "lli->lli_opendir_pid != 0" means someone has set it. + * "lli->lli_sai != NULL" means the previous statahead has not + * been cleanup. */ + if (lli->lli_opendir_pid == 0 && lli->lli_sai == NULL) { + opendir_set = 1; + lli->lli_opendir_pid = cfs_curproc_pid(); + lli->lli_opendir_key = fd; + } else if (unlikely(lli->lli_opendir_pid == cfs_curproc_pid())) { + /* Two cases for this: + * (1) The same process open such directory many times. + * (2) The old process opened the directory, and exited + * before its children processes. Then new process + * with the same pid opens such directory before the + * old process's children processes exit. + * Change the owner to the latest one. */ + opendir_set = 2; + lli->lli_opendir_key = fd; + } + spin_unlock(&lli->lli_lock); + } + if (inode->i_sb->s_root == file->f_dentry) { LUSTRE_FPRIVATE(file) = fd; RETURN(0); @@ -632,9 +661,13 @@ out_och_free: (*och_usecount)--; } up(&lli->lli_och_sem); -out_openerr: ;/* Looks weierd, eh? Just wait for statahead code to insert - a statement here <-- remove this comment after statahead - landing */ +out_openerr: + if (opendir_set == 1) { + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + } else if (unlikely(opendir_set == 2)) { + ll_stop_statahead(inode, fd); + } } return rc; @@ -686,7 +719,8 @@ int ll_inode_getattr(struct inode *inode, struct obdo *obdo) obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid); CDEBUG(D_INODE, "objid "LPX64" size %Lu, blocks %llu, blksize %lu\n", lli->lli_smd->lsm_object_id, i_size_read(inode), - (unsigned long long)inode->i_blocks, ll_inode_blksize(inode)); + (unsigned long long)inode->i_blocks, + (unsigned long)ll_inode_blksize(inode)); RETURN(0); } diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 6aed986919d70476c1e0a29190b9fdf899b92d9c..3b73342f174b7f71d995ebbd8db22094247bb742 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -5,7 +5,7 @@ #ifndef LLITE_INTERNAL_H #define LLITE_INTERNAL_H -# include <linux/lustre_acl.h> +#include <linux/lustre_acl.h> #ifdef CONFIG_FS_POSIX_ACL # include <linux/fs.h> @@ -42,11 +42,13 @@ struct ll_dentry_data { struct obd_client_handle lld_cwd_och; struct obd_client_handle lld_mnt_och; #ifndef HAVE_VFS_INTENT_PATCHES - struct lookup_intent *lld_it; + struct lookup_intent *lld_it; #endif + unsigned int lld_sa_generation; + cfs_waitq_t lld_waitq; }; -#define ll_d2d(de) ((struct ll_dentry_data*) de->d_fsdata) +#define ll_d2d(de) ((struct ll_dentry_data*)((de)->d_fsdata)) extern struct file_operations ll_pgcache_seq_fops; @@ -141,6 +143,19 @@ struct ll_inode_info { atomic_t lli_open_count; struct obd_capa *lli_mds_capa; struct list_head lli_oss_capas; + + /* metadata stat-ahead */ + /* + * "opendir_pid" is the token when lookup/revalid -- I am the owner of + * dir statahead. + */ + pid_t lli_opendir_pid; + /* + * since parent-child threads can share the same @file struct, + * "opendir_key" is the token when dir close for case of parent exit + * before child -- it is me should cleanup the dir readahead. */ + void *lli_opendir_key; + struct ll_statahead_info *lli_sai; }; /* @@ -325,6 +340,18 @@ struct ll_sb_info { int ll_stats_track_id; int ll_rw_stats_on; + /* metadata stat-ahead */ + unsigned int ll_sa_max; /* max statahead RPCs */ + unsigned int ll_sa_wrong; /* statahead thread stopped for + * low hit ratio */ + unsigned int ll_sa_total; /* statahead thread started + * count */ + unsigned long long ll_sa_blocked; /* ls count waiting for + * statahead */ + unsigned long long ll_sa_cached; /* ls count got in cache */ + unsigned long long ll_sa_hit; /* hit count */ + unsigned long long ll_sa_miss; /* miss count */ + dev_t ll_sdev_orig; /* save s_dev before assign for * clustred nfs */ struct rmtacl_ctl_table ll_rct; @@ -529,21 +556,30 @@ static void lprocfs_llite_init_vars(struct lprocfs_static_vars *lvars) /* llite/dir.c */ +static inline void ll_put_page(struct page *page) +{ + kunmap(page); + page_cache_release(page); +} + extern struct file_operations ll_dir_operations; extern struct inode_operations ll_dir_inode_operations; - +struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, + struct ll_dir_chain *chain); /* llite/namei.c */ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir); struct inode *ll_iget(struct super_block *sb, ino_t hash, struct lustre_md *lic); -struct dentry *ll_find_alias(struct inode *, struct dentry *); int ll_md_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *, void *data, int flag); #ifndef HAVE_VFS_INTENT_PATCHES struct lookup_intent *ll_convert_intent(struct open_intent *oit, int lookup_flags); #endif +int ll_lookup_it_finish(struct ptlrpc_request *request, + struct lookup_intent *it, void *data); +void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry); /* llite/rw.c */ int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); @@ -621,6 +657,9 @@ int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm, int *lmm_size, struct ptlrpc_request **request); /* llite/dcache.c */ +extern struct dentry_operations ll_init_d_ops; +extern struct dentry_operations ll_d_ops; +extern struct dentry_operations ll_fini_d_ops; void ll_intent_drop_lock(struct lookup_intent *); void ll_intent_release(struct lookup_intent *); int ll_drop_dentry(struct dentry *dentry); @@ -856,6 +895,93 @@ void et_init(struct eacl_table *et); void et_fini(struct eacl_table *et); #endif +/* statahead.c */ + +#define LL_SA_RPC_MIN 2 +#define LL_SA_RPC_DEF 32 +#define LL_SA_RPC_MAX 8192 + +/* per inode struct, for dir only */ +struct ll_statahead_info { + struct inode *sai_inode; + unsigned int sai_generation; /* generation for statahead */ + atomic_t sai_refcount; /* when access this struct, hold + * refcount */ + unsigned int sai_sent; /* stat requests sent count */ + unsigned int sai_replied; /* stat requests which received + * reply */ + unsigned int sai_max; /* max ahead of lookup */ + unsigned int sai_index; /* index of statahead entry */ + unsigned int sai_hit; /* hit count */ + unsigned int sai_miss; /* miss count: + * for "ls -al" case, it includes + * hidden dentry miss; + * for "ls -l" case, it does not + * include hidden dentry miss. + * "sai_miss_hidden" is used for + * the later case. + */ + unsigned int sai_consecutive_miss; /* consecutive miss */ + unsigned int sai_miss_hidden;/* "ls -al", but first dentry + * is not a hidden one */ + unsigned int sai_skip_hidden;/* skipped hidden dentry count */ + unsigned int sai_ls_all:1; /* "ls -al", do stat-ahead for + * hidden entries */ + cfs_waitq_t sai_waitq; /* stat-ahead wait queue */ + struct ptlrpc_thread sai_thread; /* stat-ahead thread */ + struct list_head sai_entries; /* stat-ahead entries */ +}; + +int do_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup); +void ll_statahead_exit(struct dentry *dentry, int result); +void ll_stop_statahead(struct inode *inode, void *key); + +static inline +void ll_d_wakeup(struct dentry *dentry) +{ + struct ll_dentry_data *lld = ll_d2d(dentry); + + LASSERT(dentry->d_op != &ll_init_d_ops); + if (lld != NULL) + cfs_waitq_broadcast(&lld->lld_waitq); +} + +static inline +int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) +{ + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_dentry_data *ldd = ll_d2d(*dentryp); + + if (sbi->ll_sa_max == 0) + return -ENOTSUPP; + + /* not the same process, don't statahead */ + if (lli->lli_opendir_pid != cfs_curproc_pid()) + return -EBADF; + + /* + * When "ls" a dentry, the system trigger more than once "revalidate" or + * "lookup", for "getattr", for "getxattr", and maybe for others. + * Under patchless client mode, the operation intent is not accurate, + * it maybe misguide the statahead thread. For example: + * The "revalidate" call for "getattr" and "getxattr" of a dentry maybe + * have the same operation intent -- "IT_GETATTR". + * In fact, one dentry should has only one chance to interact with the + * statahead thread, otherwise the statahead windows will be confused. + * The solution is as following: + * Assign "lld_sa_generation" with "sai_generation" when a dentry + * "IT_GETATTR" for the first time, and the subsequent "IT_GETATTR" + * will bypass interacting with statahead thread for checking: + * "lld_sa_generation == lli_sai->sai_generation" + */ + if (ldd && lli->lli_sai && + ldd->lld_sa_generation == lli->lli_sai->sai_generation) + return -EAGAIN; + + return do_statahead_enter(dir, dentryp, lookup); +} + /* llite ioctl register support rountine */ #ifdef __KERNEL__ enum llioc_iter { diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index e5f2380fefc36f13366836d4dc915a2444e69c61..aa14d11d68e2805af5795f197e1985f6bc2444f8 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -106,6 +106,9 @@ static struct ll_sb_info *ll_init_sbi(void) spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock); } + /* metadata statahead is enabled by default */ + sbi->ll_sa_max = LL_SA_RPC_DEF; + RETURN(sbi); } @@ -1105,6 +1108,13 @@ void ll_clear_inode(struct inode *inode) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); + if (S_ISDIR(inode->i_mode)) { + /* these should have been cleared in ll_file_release */ + LASSERT(lli->lli_sai == NULL); + LASSERT(lli->lli_opendir_key == NULL); + LASSERT(lli->lli_opendir_pid == 0); + } + ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK; md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode), null_if_equal, inode); @@ -2239,6 +2249,7 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data, op_data->op_capa2 = ll_mdscapa_get(i2); } else { fid_zero(&op_data->op_fid2); + op_data->op_capa2 = NULL; } op_data->op_name = name; diff --git a/lustre/llite/llite_nfs.c b/lustre/llite/llite_nfs.c index 8892862fd939f946f4d00ff748a8c9e2ef303072..61f06d6f85c60deddc6c1d5726da301c7c30aa8d 100644 --- a/lustre/llite/llite_nfs.c +++ b/lustre/llite/llite_nfs.c @@ -78,8 +78,6 @@ static struct inode *search_inode_for_lustre(struct super_block *sb, RETURN(inode); } -extern struct dentry_operations ll_d_ops; - static struct dentry *ll_iget_for_nfs(struct super_block *sb, struct lu_fid *fid, umode_t mode) @@ -109,8 +107,20 @@ static struct dentry *ll_iget_for_nfs(struct super_block *sb, iput(inode); RETURN(ERR_PTR(-ENOMEM)); } + ll_set_dd(result); - result->d_op = &ll_d_ops; + + lock_dentry(result); + if (unlikely(result->d_op == &ll_init_d_ops)) { + result->d_op = &ll_d_ops; + unlock_dentry(result); + smp_wmb(); + ll_d_wakeup(result); + } else { + result->d_op = &ll_d_ops; + unlock_dentry(result); + } + RETURN(result); } diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 50a476e5d41ae50eeca6a177e2415c6adac64113..4e785a3eb1dd4746731f2ed04a6db7ed2bbc9e2f 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -460,6 +460,56 @@ static int ll_wr_track_gid(struct file *file, const char *buffer, return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID)); } +static int ll_rd_statahead_max(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return snprintf(page, count, "%u\n", sbi->ll_sa_max); +} + +static int ll_wr_statahead_max(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val >= 0 && val <= LL_SA_RPC_MAX) + sbi->ll_sa_max = val; + else + CERROR("Bad statahead_max value %d. Valid values are in the " + "range [0, %d]\n", val, LL_SA_RPC_MAX); + + return count; +} + +static int ll_rd_statahead_stats(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return snprintf(page, count, + "statahead wrong: %u\n" + "statahead total: %u\n" + "ls blocked: %llu\n" + "ls cached: %llu\n" + "hit count: %llu\n" + "miss count: %llu\n", + sbi->ll_sa_wrong, + sbi->ll_sa_total, + sbi->ll_sa_blocked, + sbi->ll_sa_cached, + sbi->ll_sa_hit, + sbi->ll_sa_miss); +} + static int ll_rd_contention_time(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -523,6 +573,8 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 }, { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 }, { "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 }, + { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 }, + { "statahead_stats", ll_rd_statahead_stats, 0, 0 }, { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0}, { "lockless_truncate", ll_rd_lockless_truncate, ll_wr_lockless_truncate, 0}, diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index bfe181f071e4219b7aed6895e802d954fd27bdaa..2631bda709af8d95aad2eb3a45040bada7fd0cf9 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -37,9 +37,6 @@ #include <lustre_mdc.h> #include "llite_internal.h" -/* methods */ -extern struct dentry_operations ll_d_ops; - /* * Check if we have something mounted at the named dchild. * In such a case there would always be dentry present. @@ -317,7 +314,7 @@ static void ll_d_add(struct dentry *de, struct inode *inode) * in ll_revalidate_it. After revaliadate inode will be have hashed aliases * and it triggers BUG_ON in d_instantiate_unique (bug #10954). */ -struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) +static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) { struct list_head *tmp; struct dentry *dentry; @@ -387,25 +384,58 @@ struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) return de; } -static int lookup_it_finish(struct ptlrpc_request *request, - struct lookup_intent *it, - void *data) +static inline void ll_dop_init(struct dentry *de, int *set) +{ + lock_dentry(de); + if (likely(de->d_op != &ll_d_ops)) { + de->d_op = &ll_init_d_ops; + *set = 1; + } + unlock_dentry(de); +} + +static inline void ll_dop_fini(struct dentry *de, int succ) +{ + lock_dentry(de); + if (likely(de->d_op == &ll_init_d_ops)) { + if (succ) + de->d_op = &ll_d_ops; + else + de->d_op = &ll_fini_d_ops; + unlock_dentry(de); + smp_wmb(); + ll_d_wakeup(de); + } else { + if (succ) + de->d_op = &ll_d_ops; + unlock_dentry(de); + } +} + +int ll_lookup_it_finish(struct ptlrpc_request *request, + struct lookup_intent *it, void *data) { struct it_cb_data *icbd = data; struct dentry **de = icbd->icbd_childp; struct inode *parent = icbd->icbd_parent; struct ll_sb_info *sbi = ll_i2sbi(parent); struct inode *inode = NULL; - int rc; + int set = 0, rc; + ENTRY; + + ll_dop_init(*de, &set); /* NB 1 request reference will be taken away by ll_intent_lock() * when I return */ if (!it_disposition(it, DISP_LOOKUP_NEG)) { - ENTRY; + struct dentry *save = *de; rc = ll_prep_inode(&inode, request, (*de)->d_sb); - if (rc) + if (rc) { + if (set) + ll_dop_fini(*de, 0); RETURN(rc); + } CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n", inode, inode->i_ino, inode->i_generation); @@ -422,8 +452,9 @@ static int lookup_it_finish(struct ptlrpc_request *request, Also see bug 7198. */ *de = ll_find_alias(inode, *de); + if (set && *de != save) + ll_dop_fini(save, 0); } else { - ENTRY; /* Check that parent has UPDATE lock. If there is none, we cannot afford to hash this dentry (done by ll_d_add) as it might get picked up later when UPDATE lock will appear */ @@ -444,7 +475,8 @@ static int lookup_it_finish(struct ptlrpc_request *request, } ll_set_dd(*de); - (*de)->d_op = &ll_d_ops; + + ll_dop_fini(*de, 1); RETURN(0); } @@ -482,6 +514,15 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, RETURN(ERR_PTR(rc)); } + if (it->it_op == IT_GETATTR) { + rc = ll_statahead_enter(parent, &dentry, 1); + if (rc >= 0) { + ll_statahead_exit(dentry, rc); + if (rc == 1) + RETURN(retval = dentry); + } + } + icbd.icbd_childp = &dentry; icbd.icbd_parent = parent; @@ -505,7 +546,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, if (rc < 0) GOTO(out, retval = ERR_PTR(rc)); - rc = lookup_it_finish(req, it, &icbd); + rc = ll_lookup_it_finish(req, it, &icbd); if (rc != 0) { ll_intent_release(it); GOTO(out, retval = ERR_PTR(rc)); diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 9818ec4038e042ee12a7626cfba771837e3a0eda..7cbc2930338225468dacecc1b6df983d49c18f34 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -76,7 +76,7 @@ static void ll_invalidatepage(struct page *page, unsigned long offset) ll_removepage(page); } #endif -static int ll_releasepage(struct page *page, int gfp_mask) +static int ll_releasepage(struct page *page, gfp_t gfp_mask) { if (PagePrivate(page)) ll_removepage(page); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c new file mode 100644 index 0000000000000000000000000000000000000000..c2780caecd266e701d2128581f713d88622975ab --- /dev/null +++ b/lustre/llite/statahead.c @@ -0,0 +1,963 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <linux/fs.h> +#include <linux/sched.h> +#include <linux/mm.h> +#include <linux/smp_lock.h> +#include <linux/highmem.h> +#include <linux/pagemap.h> + +#define DEBUG_SUBSYSTEM S_LLITE + +#include <obd_support.h> +#include <lustre_lite.h> +#include <lustre_dlm.h> +#include <linux/lustre_version.h> +#include "llite_internal.h" + +struct ll_sai_entry { + struct list_head se_list; + unsigned int se_index; + int se_stat; +}; + +enum { + SA_ENTRY_UNSTATED = 0, + SA_ENTRY_STATED +}; + +static unsigned int sai_generation = 0; +static spinlock_t sai_generation_lock = SPIN_LOCK_UNLOCKED; + +static struct ll_statahead_info *ll_sai_alloc(void) +{ + struct ll_statahead_info *sai; + + OBD_ALLOC_PTR(sai); + if (!sai) + return NULL; + + spin_lock(&sai_generation_lock); + sai->sai_generation = ++sai_generation; + if (unlikely(sai_generation == 0)) + sai->sai_generation = ++sai_generation; + spin_unlock(&sai_generation_lock); + atomic_set(&sai->sai_refcount, 1); + sai->sai_max = LL_SA_RPC_MIN; + cfs_waitq_init(&sai->sai_waitq); + cfs_waitq_init(&sai->sai_thread.t_ctl_waitq); + CFS_INIT_LIST_HEAD(&sai->sai_entries); + return sai; +} + +static inline +struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai) +{ + LASSERT(sai); + atomic_inc(&sai->sai_refcount); + return sai; +} + +static void ll_sai_put(struct ll_statahead_info *sai) +{ + struct inode *inode = sai->sai_inode; + struct ll_inode_info *lli = ll_i2info(inode); + ENTRY; + + if (atomic_dec_and_lock(&sai->sai_refcount, &lli->lli_lock)) { + struct ll_sai_entry *entry, *next; + + lli->lli_sai = NULL; + spin_unlock(&lli->lli_lock); + + LASSERT(sai->sai_thread.t_flags & SVC_STOPPED); + + if (sai->sai_sent > sai->sai_replied) + CDEBUG(D_READA,"statahead for dir "DFID" does not " + "finish: [sent:%u] [replied:%u]\n", + PFID(&lli->lli_fid), + sai->sai_sent, sai->sai_replied); + + list_for_each_entry_safe(entry, next, &sai->sai_entries, + se_list) { + list_del(&entry->se_list); + OBD_FREE_PTR(entry); + } + OBD_FREE_PTR(sai); + iput(inode); + } + EXIT; +} + +static struct ll_sai_entry * +ll_sai_entry_get(struct ll_statahead_info *sai, unsigned int index, int stat) +{ + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_sai_entry *entry; + ENTRY; + + OBD_ALLOC_PTR(entry); + if (entry == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + CDEBUG(D_READA, "alloc sai entry %p index %u, stat %d\n", + entry, index, stat); + entry->se_index = index; + entry->se_stat = stat; + + spin_lock(&lli->lli_lock); + list_add_tail(&entry->se_list, &sai->sai_entries); + spin_unlock(&lli->lli_lock); + + RETURN(entry); +} + +/* inside lli_lock + * return value: + * 0: can not find the entry with the index + * 1: it is the first entry + * 2: it is not the first entry */ +static int +ll_sai_entry_set(struct ll_statahead_info *sai, unsigned int index, int stat) +{ + struct ll_sai_entry *entry; + int rc = 0; + ENTRY; + + if (list_empty(&sai->sai_entries)) + RETURN(0); + + entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list); + if (entry->se_index == index) + GOTO(out, rc = 1); + + while (entry->se_list.next != &sai->sai_entries && + entry->se_index < index) { + entry = list_entry(entry->se_list.next, struct ll_sai_entry, + se_list); + if (entry->se_index == index) + GOTO(out, rc = 2); + } + + EXIT; + +out: + if (rc) { + LASSERT(entry->se_stat == SA_ENTRY_UNSTATED); + entry->se_stat = stat; + } + + return rc; +} + +/* Check whether first entry was stated already or not. + * No need to hold lli_lock, for: + * (1) it is me that remove entry from the list (ll_sai_entry_put) + * (2) the statahead thread only add new entry to the list tail */ +static int ll_sai_entry_stated(struct ll_statahead_info *sai) +{ + struct ll_sai_entry *entry; + int rc = 0; + ENTRY; + + if (!list_empty(&sai->sai_entries)) { + entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, + se_list); + rc = (entry->se_stat != SA_ENTRY_UNSTATED); + } + + RETURN(rc); +} + +static void ll_sai_entry_put(struct ll_statahead_info *sai) +{ + struct ll_inode_info *lli = ll_i2info(sai->sai_inode); + struct ll_sai_entry *entry; + ENTRY; + + spin_lock(&lli->lli_lock); + if (!list_empty(&sai->sai_entries)) { + entry = list_entry(sai->sai_entries.next, + struct ll_sai_entry, se_list); + list_del(&entry->se_list); + OBD_FREE_PTR(entry); + } + spin_unlock(&lli->lli_lock); + + EXIT; +} + +/* finish lookup/revalidate */ +static int ll_statahead_interpret(struct ptlrpc_request *req, + struct md_enqueue_info *minfo, + int rc) +{ + struct lookup_intent *it = &minfo->mi_it; + struct dentry *dentry = minfo->mi_dentry; + struct inode *dir = dentry->d_parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = NULL; + ENTRY; + + CDEBUG(D_READA, "interpret statahead %.*s rc %d\n", + dentry->d_name.len, dentry->d_name.name, rc); + + spin_lock(&lli->lli_lock); + if (unlikely(lli->lli_sai == NULL || + lli->lli_sai->sai_generation != minfo->mi_generation)) { + spin_unlock(&lli->lli_lock); + GOTO(out_free, rc = -ESTALE); + } else { + sai = ll_sai_get(lli->lli_sai); + spin_unlock(&lli->lli_lock); + } + + if (rc || dir == NULL) + GOTO(out, rc); + + if (dentry->d_inode == NULL) { + /* lookup */ + struct dentry *save = dentry; + struct it_cb_data icbd = { + .icbd_parent = dir, + .icbd_childp = &dentry + }; + + LASSERT(fid_is_zero(&minfo->mi_data.op_fid2)); + + rc = ll_lookup_it_finish(req, it, &icbd); + if (!rc) + /* Here dentry->d_inode might be NULL, + * because the entry may have been removed before + * we start doing stat ahead. */ + ll_lookup_finish_locks(it, dentry); + + if (dentry != save) + dput(save); + } else { + /* revalidate */ + struct mdt_body *body; + + body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, + sizeof(*body)); + if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) { + ll_unhash_aliases(dentry->d_inode); + GOTO(out, rc = -EAGAIN); + } + + rc = ll_revalidate_it_finish(req, it, dentry); + if (rc) { + ll_unhash_aliases(dentry->d_inode); + GOTO(out, rc); + } + + spin_lock(&dcache_lock); + lock_dentry(dentry); + __d_drop(dentry); +#ifdef DCACHE_LUSTRE_INVALID + dentry->d_flags &= ~DCACHE_LUSTRE_INVALID; +#endif + unlock_dentry(dentry); + d_rehash_cond(dentry, 0); + spin_unlock(&dcache_lock); + + ll_lookup_finish_locks(it, dentry); + } + EXIT; + +out: + if (sai != NULL) { + int first; + + sai->sai_replied++; + spin_lock(&lli->lli_lock); + first = ll_sai_entry_set(sai, + (unsigned int)(long)minfo->mi_cbdata, + SA_ENTRY_STATED); + spin_unlock(&lli->lli_lock); + if (first == 1) + /* wake up the "ls -l" process only when the first entry + * returned. */ + cfs_waitq_signal(&sai->sai_waitq); + else if (first == 0) + CDEBUG(D_READA, "can't find sai entry for dir " + DFID" generation %u index %u\n", + PFID(&lli->lli_fid), + minfo->mi_generation, + (unsigned int)(long)minfo->mi_cbdata); + + ll_sai_put(sai); + } +out_free: + ll_intent_release(it); + OBD_FREE_PTR(minfo); + + dput(dentry); + return rc; +} + +static void sa_args_fini(struct md_enqueue_info *minfo, + struct ldlm_enqueue_info *einfo) +{ + LASSERT(minfo && einfo); + capa_put(minfo->mi_data.op_capa1); + capa_put(minfo->mi_data.op_capa2); + OBD_FREE_PTR(minfo); + OBD_FREE_PTR(einfo); +} + +/* There is race condition between "capa_put" and "ll_statahead_interpret" for + * accessing "op_data.op_capa[1,2]" as following: + * "capa_put" releases "op_data.op_capa[1,2]"'s reference count after calling + * "md_intent_getattr_async". But "ll_statahead_interpret" maybe run first, and + * fill "op_data.op_capa[1,2]" as POISON, then cause "capa_put" access invalid + * "ocapa". So here reserve "op_data.op_capa[1,2]" in "pcapa" before calling + * "md_intent_getattr_async". */ +static int sa_args_init(struct inode *dir, struct dentry *dentry, + struct md_enqueue_info **pmi, + struct ldlm_enqueue_info **pei, + struct obd_capa **pcapa) +{ + struct ll_inode_info *lli = ll_i2info(dir); + struct md_enqueue_info *minfo; + struct ldlm_enqueue_info *einfo; + struct md_op_data *op_data; + + OBD_ALLOC_PTR(einfo); + if (einfo == NULL) + return -ENOMEM; + + OBD_ALLOC_PTR(minfo); + if (minfo == NULL) { + OBD_FREE_PTR(einfo); + return -ENOMEM; + } + + op_data = ll_prep_md_op_data(&minfo->mi_data, dir, dentry->d_inode, + dentry->d_name.name, dentry->d_name.len, + 0, LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) { + OBD_FREE_PTR(einfo); + OBD_FREE_PTR(minfo); + return PTR_ERR(op_data); + } + + minfo->mi_it.it_op = IT_GETATTR; + minfo->mi_dentry = dentry; + minfo->mi_cb = ll_statahead_interpret; + minfo->mi_generation = lli->lli_sai->sai_generation; + minfo->mi_cbdata = (void *)(long)lli->lli_sai->sai_index; + + einfo->ei_type = LDLM_IBITS; + einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); + einfo->ei_cb_bl = ll_md_blocking_ast; + einfo->ei_cb_cp = ldlm_completion_ast; + einfo->ei_cb_gl = NULL; + einfo->ei_cbdata = NULL; + + *pmi = minfo; + *pei = einfo; + pcapa[0] = op_data->op_capa1; + pcapa[1] = op_data->op_capa2; + + return 0; +} + +/* similar to ll_lookup_it(). */ +static int do_sa_lookup(struct inode *dir, struct dentry *dentry) +{ + struct md_enqueue_info *minfo; + struct ldlm_enqueue_info *einfo; + struct obd_capa *capas[2]; + int rc; + ENTRY; + + rc = sa_args_init(dir, dentry, &minfo, &einfo, capas); + if (rc) + RETURN(rc); + + rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo); + if (!rc) { + capa_put(capas[0]); + capa_put(capas[1]); + } else { + sa_args_fini(minfo, einfo); + } + + RETURN(rc); +} + +/* similar to ll_revalidate_it(). + * return value: + * 1 -- dentry valid + * 0 -- will send stat-ahead request + * others -- prepare stat-ahead request failed */ +static int do_sa_revalidate(struct dentry *dentry) +{ + struct inode *inode = dentry->d_inode; + struct inode *dir = dentry->d_parent->d_inode; + struct lookup_intent it = { .it_op = IT_GETATTR }; + struct md_enqueue_info *minfo; + struct ldlm_enqueue_info *einfo; + struct obd_capa *capas[2]; + int rc; + ENTRY; + + if (inode == NULL) + RETURN(1); + + if (d_mountpoint(dentry)) + RETURN(1); + + if (dentry == dentry->d_sb->s_root) + RETURN(1); + + rc = md_revalidate_lock(ll_i2mdexp(dir), &it, ll_inode2fid(inode)); + if (rc == 1) { + ll_intent_release(&it); + RETURN(1); + } + + rc = sa_args_init(dir, dentry, &minfo, &einfo, capas); + if (rc) + RETURN(rc); + + rc = md_intent_getattr_async(ll_i2mdexp(dir), minfo, einfo); + if (!rc) { + capa_put(capas[0]); + capa_put(capas[1]); + } else { + sa_args_fini(minfo, einfo); + } + + RETURN(rc); +} + +static inline void ll_name2qstr(struct qstr *this, const char *name, int namelen) +{ + unsigned long hash = init_name_hash(); + unsigned int c; + + this->name = name; + this->len = namelen; + for (; namelen > 0; namelen--, name++) { + c = *(const unsigned char *)name; + hash = partial_name_hash(c, hash); + } + this->hash = end_name_hash(hash); +} + +static int ll_statahead_one(struct dentry *parent, const char* entry_name, + int entry_name_len) +{ + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; + struct qstr name; + struct dentry *dentry; + struct ll_sai_entry *se; + int rc; + ENTRY; + +#ifdef DCACHE_LUSTRE_INVALID + if (parent->d_flags & DCACHE_LUSTRE_INVALID) { +#else + if (d_unhashed(parent)) { +#endif + CDEBUG(D_READA, "parent dentry@%p %.*s is " + "invalid, skip statahead\n", + parent, parent->d_name.len, parent->d_name.name); + RETURN(-EINVAL); + } + + se = ll_sai_entry_get(sai, sai->sai_index, SA_ENTRY_UNSTATED); + if (IS_ERR(se)) + RETURN(PTR_ERR(se)); + + ll_name2qstr(&name, entry_name, entry_name_len); + dentry = d_lookup(parent, &name); + if (!dentry) { + dentry = d_alloc(parent, &name); + if (dentry) { + rc = do_sa_lookup(dir, dentry); + if (rc) + dput(dentry); + } else { + GOTO(out, rc = -ENOMEM); + } + } else { + rc = do_sa_revalidate(dentry); + if (rc) + dput(dentry); + } + + EXIT; + +out: + if (rc) { + CDEBUG(D_READA, "set sai entry %p index %u stat %d rc %d\n", + se, se->se_index, se->se_stat, rc); + se->se_stat = rc; + cfs_waitq_signal(&sai->sai_waitq); + } else { + sai->sai_sent++; + } + + sai->sai_index++; + return rc; +} + +static inline int sa_check_stop(struct ll_statahead_info *sai) +{ + return !!(sai->sai_thread.t_flags & SVC_STOPPING); +} + +static inline int sa_not_full(struct ll_statahead_info *sai) +{ + return sai->sai_index < sai->sai_hit + sai->sai_miss + sai->sai_max; +} + +/* (1) hit ratio less than 80% + * or + * (2) consecutive miss more than 8 */ +static inline int sa_low_hit(struct ll_statahead_info *sai) +{ + return ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) || + (sai->sai_consecutive_miss > 8)); +} + +struct ll_sa_thread_args { + struct dentry *sta_parent; + pid_t sta_pid; +}; + +static int ll_statahead_thread(void *arg) +{ + struct ll_sa_thread_args *sta = arg; + struct dentry *parent = dget(sta->sta_parent); + struct inode *dir = parent->d_inode; + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai); + struct ptlrpc_thread *thread = &sai->sai_thread; + struct page *page; + __u64 pos = 0; + int first = 0; + int rc = 0; + struct ll_dir_chain chain; + ENTRY; + + { + char pname[16]; + snprintf(pname, 15, "ll_sa_%u", sta->sta_pid); + cfs_daemonize(pname); + } + + sbi->ll_sa_total++; + spin_lock(&lli->lli_lock); + thread->t_flags = SVC_RUNNING; + spin_unlock(&lli->lli_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); + + ll_dir_chain_init(&chain); + page = ll_get_dir_page(dir, pos, 0, &chain); + + while (1) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CERROR("error reading dir "DFID" at %llu/%u: rc %d\n", + PFID(ll_inode2fid(dir)), pos, + sai->sai_index, rc); + break; + } + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + struct l_wait_info lwi = { 0 }; + char *name = ent->lde_name; + int namelen = le16_to_cpu(ent->lde_namelen); + + if (namelen == 0) + /* Skip dummy record. */ + continue; + + if (name[0] == '.') { + if (namelen == 1) { + /* skip . */ + continue; + } else if (name[1] == '.' && namelen == 2) { + /* skip .. */ + continue; + } else if (!sai->sai_ls_all) { + /* skip hidden files */ + sai->sai_skip_hidden++; + continue; + } + } + + /* don't stat-ahead first entry */ + if (unlikely(!first)) { + first++; + continue; + } + + l_wait_event(thread->t_ctl_waitq, + sa_check_stop(sai) || sa_not_full(sai), + &lwi); + + if (unlikely(sa_check_stop(sai))) { + ll_put_page(page); + GOTO(out, rc); + } + + rc = ll_statahead_one(parent, name, namelen); + if (rc < 0) { + ll_put_page(page); + GOTO(out, rc); + } + } + pos = le64_to_cpu(dp->ldp_hash_end); + ll_put_page(page); + if (pos == DIR_END_OFF) { + /* End of directory reached. */ + break; + } else if (1 /* chain is exhausted*/) { + /* Normal case: continue to the next page. */ + page = ll_get_dir_page(dir, pos, 1, &chain); + } else { + /* go into overflow page. */ + } + } + EXIT; + +out: + ll_dir_chain_fini(&chain); + spin_lock(&lli->lli_lock); + thread->t_flags = SVC_STOPPED; + spin_unlock(&lli->lli_lock); + cfs_waitq_signal(&sai->sai_waitq); + cfs_waitq_signal(&thread->t_ctl_waitq); + ll_sai_put(sai); + dput(parent); + CDEBUG(D_READA, "statahead thread stopped, pid %d\n", + cfs_curproc_pid()); + return rc; +} + +/* called in ll_file_release() */ +void ll_stop_statahead(struct inode *inode, void *key) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ptlrpc_thread *thread; + + spin_lock(&lli->lli_lock); + if (lli->lli_opendir_pid == 0 || + unlikely(lli->lli_opendir_key != key)) { + spin_unlock(&lli->lli_lock); + return; + } + + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + + if (lli->lli_sai) { + struct l_wait_info lwi = { 0 }; + + thread = &lli->lli_sai->sai_thread; + if (!(thread->t_flags & SVC_STOPPED)) { + thread->t_flags = SVC_STOPPING; + spin_unlock(&lli->lli_lock); + cfs_waitq_signal(&thread->t_ctl_waitq); + + CDEBUG(D_READA, "stopping statahead thread, pid %d\n", + cfs_curproc_pid()); + l_wait_event(thread->t_ctl_waitq, + thread->t_flags & SVC_STOPPED, + &lwi); + } else { + spin_unlock(&lli->lli_lock); + } + + /* Put the ref which was held when first statahead_enter. + * It maybe not the last ref for some statahead requests + * maybe inflight. */ + ll_sai_put(lli->lli_sai); + return; + } + spin_unlock(&lli->lli_lock); +} + +enum { + LS_NONE_FIRST_DE = 0, /* not first dirent, or is "." */ + LS_FIRST_DE, /* the first non-hidden dirent */ + LS_FIRST_DOT_DE /* the first hidden dirent, that is ".xxx" */ +}; + +static int is_first_dirent(struct inode *dir, struct dentry *dentry) +{ + struct ll_dir_chain chain; + struct qstr *target = &dentry->d_name; + struct page *page; + __u64 pos = 0; + int dot_de; + int rc = LS_NONE_FIRST_DE; + ENTRY; + + ll_dir_chain_init(&chain); + page = ll_get_dir_page(dir, pos, 0, &chain); + + while (1) { + struct lu_dirpage *dp; + struct lu_dirent *ent; + + if (IS_ERR(page)) { + rc = PTR_ERR(page); + CERROR("error reading dir "DFID" at %llu: rc %d\n", + PFID(ll_inode2fid(dir)), pos, rc); + break; + } + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + char *name = ent->lde_name; + int namelen = le16_to_cpu(ent->lde_namelen); + + if (namelen == 0) + /* Skip dummy record. */ + continue; + + if (name[0] == '.') { + if (namelen == 1) + /* skip . */ + continue; + else if (name[1] == '.' && namelen == 2) + /* skip .. */ + continue; + else + dot_de = 1; + } else { + dot_de = 0; + } + + if (dot_de && target->name[0] != '.') { + CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", + target->len, target->name, + namelen, name); + continue; + } + + if (target->len == namelen && + !strncmp(target->name, name, target->len)) + rc = LS_FIRST_DE + dot_de; + else + rc = LS_NONE_FIRST_DE; + ll_put_page(page); + GOTO(out, rc); + } + pos = le64_to_cpu(dp->ldp_hash_end); + ll_put_page(page); + if (pos == DIR_END_OFF) { + /* End of directory reached. */ + break; + } else if (1 /* chain is exhausted*/) { + /* Normal case: continue to the next page. */ + page = ll_get_dir_page(dir, pos, 1, &chain); + } else { + /* go into overflow page. */ + } + } + EXIT; + +out: + ll_dir_chain_fini(&chain); + return rc; +} + +/* Start statahead thread if this is the first dir entry. + * Otherwise if a thread is started already, wait it until it is ahead of me. + * Return value: + * 0 -- stat ahead thread process such dentry, for lookup, it miss + * 1 -- stat ahead thread process such dentry, for lookup, it hit + * -EEXIST -- stat ahead thread started, and this is the first dentry + * -EBADFD -- statahead thread exit and not dentry available + * others -- error */ +int do_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) +{ + struct ll_sb_info *sbi = ll_i2sbi(dir); + struct ll_inode_info *lli = ll_i2info(dir); + struct ll_statahead_info *sai = lli->lli_sai; + struct ll_sa_thread_args sta; + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + LASSERT(lli->lli_opendir_pid == cfs_curproc_pid()); + + if (sai) { + if (unlikely(sai->sai_thread.t_flags & SVC_STOPPED && + list_empty(&sai->sai_entries))) + RETURN(-EBADFD); + + if ((*dentryp)->d_name.name[0] == '.') { + if (likely(sai->sai_ls_all || + sai->sai_miss_hidden >= sai->sai_skip_hidden)) { + /* Hidden dentry is the first one, or statahead + * thread does not skip so many hidden dentries + * before "sai_ls_all" enabled as below. */ + } else { + if (!sai->sai_ls_all) + /* It maybe because hidden dentry is not + * the first one, "sai_ls_all" was not + * set, then "ls -al" missed. Enable + * "sai_ls_all" for such case. */ + sai->sai_ls_all = 1; + + /* Such "getattr" has been skipped before + * "sai_ls_all" enabled as above. */ + sai->sai_miss_hidden++; + RETURN(-ENOENT); + } + } + + if (ll_sai_entry_stated(sai)) { + sbi->ll_sa_cached++; + } else { + sbi->ll_sa_blocked++; + /* thread started already, avoid double-stat */ + l_wait_event(sai->sai_waitq, + ll_sai_entry_stated(sai) || + sai->sai_thread.t_flags & SVC_STOPPED, + &lwi); + } + + if (lookup) { + struct dentry *result; + + result = d_lookup((*dentryp)->d_parent, + &(*dentryp)->d_name); + if (result) { + LASSERT(result != *dentryp); + dput(*dentryp); + *dentryp = result; + RETURN(1); + } + } + /* do nothing for revalidate */ + RETURN(0); + } + + /* I am the "lli_opendir_pid" owner, only me can set "lli_sai". */ + LASSERT(lli->lli_sai == NULL); + + rc = is_first_dirent(dir, *dentryp); + if (rc == LS_NONE_FIRST_DE) { + /* It is not "ls -{a}l" operation, no need statahead for it */ + spin_lock(&lli->lli_lock); + lli->lli_opendir_key = NULL; + lli->lli_opendir_pid = 0; + spin_unlock(&lli->lli_lock); + RETURN(-EBADF); + } + + sai = ll_sai_alloc(); + if (sai == NULL) + RETURN(-ENOMEM); + + sai->sai_inode = igrab(dir); + sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); + + sta.sta_parent = (*dentryp)->d_parent; + sta.sta_pid = cfs_curproc_pid(); + + lli->lli_sai = sai; + rc = cfs_kernel_thread(ll_statahead_thread, &sta, 0); + if (rc < 0) { + CERROR("can't start ll_sa thread, rc: %d\n", rc); + sai->sai_thread.t_flags = SVC_STOPPED; + ll_sai_put(sai); + LASSERT(lli->lli_sai == NULL); + RETURN(rc); + } + + l_wait_event(sai->sai_thread.t_ctl_waitq, + sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED), + &lwi); + + /* We don't stat-ahead for the first dirent since we are already in + * lookup, and -EEXIST also indicates that this is the first dirent. */ + RETURN(-EEXIST); +} + +/* update hit/miss count */ +void ll_statahead_exit(struct dentry *dentry, int result) +{ + struct dentry *parent = dentry->d_parent; + struct ll_inode_info *lli = ll_i2info(parent->d_inode); + struct ll_sb_info *sbi = ll_i2sbi(parent->d_inode); + struct ll_dentry_data *ldd = ll_d2d(dentry); + + if (lli->lli_opendir_pid != cfs_curproc_pid()) + return; + + if (lli->lli_sai) { + struct ll_statahead_info *sai = lli->lli_sai; + + if (result == 1) { + sbi->ll_sa_hit++; + sai->sai_hit++; + sai->sai_consecutive_miss = 0; + sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); + } else { + sbi->ll_sa_miss++; + sai->sai_miss++; + sai->sai_consecutive_miss++; + if (sa_low_hit(sai) && + sai->sai_thread.t_flags & SVC_RUNNING) { + sbi->ll_sa_wrong++; + CDEBUG(D_READA, "statahead for dir %.*s hit " + "ratio too low: hit/miss %u/%u, " + "sent/replied %u/%u. stopping statahead " + "thread: pid %d\n", + parent->d_name.len, parent->d_name.name, + sai->sai_hit, sai->sai_miss, + sai->sai_sent, sai->sai_replied, + cfs_curproc_pid()); + spin_lock(&lli->lli_lock); + if (!(sai->sai_thread.t_flags & SVC_STOPPED)) + sai->sai_thread.t_flags = SVC_STOPPING; + spin_unlock(&lli->lli_lock); + } + } + + cfs_waitq_signal(&sai->sai_thread.t_ctl_waitq); + ll_sai_entry_put(sai); + + if (likely(ldd != NULL)) + ldd->lld_sa_generation = sai->sai_generation; + } +} diff --git a/lustre/llite/xattr.c b/lustre/llite/xattr.c index 933834f87f6990d34c095290d4cd6688e0fc9490..ffbec6e93ad6dc1a3e216e3c8871822d011e4875 100644 --- a/lustre/llite/xattr.c +++ b/lustre/llite/xattr.c @@ -309,6 +309,8 @@ int ll_getxattr_common(struct inode *inode, const char *name, posix_acl_release(acl); RETURN(rc); } + if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode)) + RETURN(-ENODATA); #endif do_getxattr: diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index f87d2f5651aabf162578dcf729856a5566eac4a1..fe3ffe75fa6ee60af8fe0c9aea19460f89ceb150 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -1009,8 +1009,7 @@ release_lock: body->valid = OBD_MD_FLSIZE; } if (master_valid == 0) { - memcpy(&oit->d.lustre.it_lock_handle, - &master_lockh, sizeof(master_lockh)); + oit->d.lustre.it_lock_handle = master_lockh.cookie; oit->d.lustre.it_lock_mode = master_lock_mode; } rc = 0; diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index fed558fe4b262b5339aa22428b032a0d2ea3e4fb..1374c1fea228e62bf872002b91e7bac03ad450bd 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1061,7 +1061,7 @@ static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf) GOTO(out, rc = -EINVAL); obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1)); - rc = lmv_add_target(obd, &tgt_uuid); + rc = lmv_add_target(obd, &tgt_uuid); GOTO(out, rc); default: { CERROR("Unknown command: %d\n", lcfg->lcfg_command); @@ -1660,8 +1660,8 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid, ENTRY; rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); repeat: ++loop; @@ -1755,7 +1755,7 @@ static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp, if (!fid_is_sane(fid)) RETURN(0); - + if (fid_exp == NULL) fid_exp = lmv_find_export(lmv, fid); @@ -1796,7 +1796,7 @@ static int lmv_early_cancel_stripes(struct obd_export *exp, ldlm_policy_data_t policy = {{0}}; struct lu_fid *st_fid; int i; - + policy.l_inodebits.bits = bits; for (i = 0; i < obj->lo_objcount; i++) { st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); @@ -1839,8 +1839,8 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data, ENTRY; rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); repeat: ++loop; @@ -1929,8 +1929,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data, newlen, new, PFID(&op_data->op_fid2)); rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); if (oldlen == 0) { /* @@ -2070,8 +2070,8 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data, ENTRY; rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); obj = lmv_obj_grab(obd, &op_data->op_fid1); @@ -2128,8 +2128,8 @@ static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid, ENTRY; rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); tgt_exp = lmv_find_export(lmv, fid); if (IS_ERR(tgt_exp)) @@ -2230,8 +2230,8 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid, offset = offset64; rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + if (rc) + RETURN(rc); CDEBUG(D_INFO, "READPAGE at %llx from "DFID"\n", offset, PFID(&rid)); @@ -2408,9 +2408,9 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, int rc, loop = 0; ENTRY; - rc = lmv_check_connect(obd); - if (rc) - RETURN(rc); + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) { /* mds asks to remove slave objects */ @@ -2902,6 +2902,54 @@ static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc, RETURN(rc); } +int lmv_intent_getattr_async(struct obd_export *exp, + struct md_enqueue_info *minfo, + struct ldlm_enqueue_info *einfo) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; + int rc; + ENTRY; + + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + + if (fid_is_zero(&minfo->mi_data.op_fid2)) + tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid1); + else + tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid2); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_intent_getattr_async(tgt_exp, minfo, einfo); + RETURN(rc); +} + +int lmv_revalidate_lock(struct obd_export *exp, + struct lookup_intent *it, + struct lu_fid *fid) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct obd_export *tgt_exp; + int rc; + ENTRY; + + rc = lmv_check_connect(obd); + if (rc) + RETURN(rc); + + tgt_exp = lmv_find_export(lmv, fid); + if (IS_ERR(tgt_exp)) + RETURN(PTR_ERR(tgt_exp)); + + rc = md_revalidate_lock(tgt_exp, it, fid); + RETURN(rc); +} + + struct obd_ops lmv_obd_ops = { .o_owner = THIS_MODULE, .o_setup = lmv_setup, @@ -2948,8 +2996,10 @@ struct md_ops lmv_md_ops = { .m_free_lustre_md = lmv_free_lustre_md, .m_set_open_replay_data = lmv_set_open_replay_data, .m_clear_open_replay_data = lmv_clear_open_replay_data, + .m_renew_capa = lmv_renew_capa, .m_get_remote_perm = lmv_get_remote_perm, - .m_renew_capa = lmv_renew_capa + .m_intent_getattr_async = lmv_intent_getattr_async, + .m_revalidate_lock = lmv_revalidate_lock }; int __init lmv_init(void) diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index bcf08ba7020d8df9b3358400cab2691a1c57ee22..5483a6d26985fadea6768246c14ea89881ac7c26 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -100,7 +100,7 @@ int mdc_intent_lock(struct obd_export *exp, ldlm_blocking_callback cb_blocking, int extra_lock_flags); int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmlen, + struct lustre_handle *lockh, void *lmm, int lmmsize, int extra_lock_flags); int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, @@ -158,6 +158,14 @@ static inline void mdc_set_capa_size(struct ptlrpc_request *req, ; } +int mdc_revalidate_lock(struct obd_export *exp, + struct lookup_intent *it, + struct lu_fid *fid); + +int mdc_intent_getattr_async(struct obd_export *exp, + struct md_enqueue_info *minfo, + struct ldlm_enqueue_info *einfo); + ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, const struct lu_fid *fid, ldlm_type_t type, ldlm_policy_data_t *policy, ldlm_mode_t mode, diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index 818a621b4439c39b9bf536588bc5c1abacddeb6c..a6bc50f3bcc650dbadeb0af6c3887dead78f1232 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -458,6 +458,7 @@ void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, int flags, if (op_data->op_name) { char *tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME); LOGL0(op_data->op_name, op_data->op_namelen, tmp); + } } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index b18f6421a2d766b698995001029cacac9582b7bc..fdf6fe978d829433708bcf5854516330333b5019 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -62,20 +62,6 @@ void it_clear_disposition(struct lookup_intent *it, int flag) } EXPORT_SYMBOL(it_clear_disposition); -static int it_to_lock_mode(struct lookup_intent *it) -{ - ENTRY; - - /* CREAT needs to be tested before open (both could be set) */ - if (it->it_op & IT_CREAT) - return LCK_PW; - else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) - return LCK_PR; - - LBUG(); - RETURN(-EINVAL); -} - int it_open_error(int phase, struct lookup_intent *it) { if (it_disposition(it, DISP_OPEN_OPEN)) { @@ -151,13 +137,11 @@ ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags, ldlm_policy_data_t *policy, ldlm_mode_t mode, struct lustre_handle *lockh) { - struct ldlm_res_id res_id = - { .name = {fid_seq(fid), - fid_oid(fid), - fid_ver(fid)} }; + struct ldlm_res_id res_id; ldlm_mode_t rc; ENTRY; + fid_build_reg_res_name(fid, &res_id); rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags, &res_id, type, policy, mode, lockh); RETURN(rc); @@ -168,15 +152,13 @@ int mdc_cancel_unused(struct obd_export *exp, ldlm_policy_data_t *policy, ldlm_mode_t mode, int flags, void *opaque) { - struct ldlm_res_id res_id = - { .name = {fid_seq(fid), - fid_oid(fid), - fid_ver(fid)} }; + struct ldlm_res_id res_id; struct obd_device *obd = class_exp2obd(exp); int rc; ENTRY; + fid_build_reg_res_name(fid, &res_id); rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id, policy, mode, flags, opaque); RETURN(rc); @@ -186,13 +168,10 @@ int mdc_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, ldlm_iterator_t it, void *data) { - struct ldlm_res_id res_id = { .name = {0} }; + struct ldlm_res_id res_id; ENTRY; - res_id.name[0] = fid_seq(fid); - res_id.name[1] = fid_oid(fid); - res_id.name[2] = fid_ver(fid); - + fid_build_reg_res_name(fid, &res_id); ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id, it, data); @@ -226,7 +205,7 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body) + struct mdt_body *body) { int rc; @@ -380,7 +359,7 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *op_data) + struct md_op_data *op_data) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); @@ -444,69 +423,19 @@ static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) RETURN(req); } -/* We always reserve enough space in the reply packet for a stripe MD, because - * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct md_op_data *op_data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags) +static int mdc_finish_enqueue(struct obd_export *exp, + struct ptlrpc_request *req, + struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, + struct lustre_handle *lockh, + int rc) { - struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req; - struct req_capsule *pill; - struct ldlm_request *lockreq; - struct ldlm_reply *lockrep; - int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; - int rc; - struct ldlm_res_id res_id = - { .name = {fid_seq(&op_data->op_fid1), - fid_oid(&op_data->op_fid1), - fid_ver(&op_data->op_fid1)} }; - ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; + struct req_capsule *pill = &req->rq_pill; + struct ldlm_request *lockreq; + struct ldlm_reply *lockrep; ENTRY; - LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); - - if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - - if (it->it_op & IT_OPEN) { - int joinfile = !!((it->it_flags & O_JOIN_FILE) && - op_data->op_data); - - req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize, - einfo->ei_cbdata); - if (!joinfile) { - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - einfo->ei_cbdata = NULL; - lmm = NULL; - } else - it->it_flags &= ~O_JOIN_FILE; - } else if (it->it_op & IT_UNLINK) - req = mdc_intent_unlink_pack(exp, it, op_data); - else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) - req = mdc_intent_getattr_pack(exp, it, op_data); - else if (it->it_op == IT_READDIR) - req = ldlm_enqueue_pack(exp); - else { - LBUG(); - RETURN(-EINVAL); - } - - if (IS_ERR(req)) - RETURN(PTR_ERR(req)); - pill = &req->rq_pill; - - /* It is important to obtain rpc_lock first (if applicable), so that - * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter */ - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, NULL, lockh, 0); - mdc_exit_request(&obddev->u.cli); - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - + LASSERT(rc >= 0); /* Similarly, if we're going to replay this request, we don't want to * actually get a lock, just perform the intent. */ if (req->rq_transno || req->rq_replay) { @@ -518,12 +447,6 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, einfo->ei_mode = 0; memset(lockh, 0, sizeof(*lockh)); rc = 0; - } else if (rc != 0) { - CERROR("ldlm_cli_enqueue: %d\n", rc); - LASSERTF(rc < 0, "rc %d\n", rc); - mdc_clear_replay_flag(req, rc); - ptlrpc_req_finished(req); - RETURN(rc); } else { /* rc = 0 */ struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock); @@ -597,6 +520,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, RETURN(-EPROTO); if (body->valid & OBD_MD_FLMODEASIZE) { + struct obd_device *obddev = class_exp2obd(exp); + if (obddev->u.cli.cl_max_mds_easize < body->max_mdsize) { obddev->u.cli.cl_max_mds_easize = @@ -623,6 +548,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { + void *lmm; if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < body->eadatasize) { @@ -671,6 +597,193 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, RETURN(rc); } + +/* We always reserve enough space in the reply packet for a stripe MD, because + * we don't know in advance the file type. */ +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct md_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + int extra_lock_flags) +{ + struct obd_device *obddev = class_exp2obd(exp); + struct ptlrpc_request *req; + struct req_capsule *pill; + int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; + int rc; + struct ldlm_res_id res_id; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; + ENTRY; + + LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type); + + fid_build_reg_res_name(&op_data->op_fid1, &res_id); + + if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + + if (it->it_op & IT_OPEN) { + int joinfile = !!((it->it_flags & O_JOIN_FILE) && + op_data->op_data); + + req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize, + einfo->ei_cbdata); + if (!joinfile) { + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + einfo->ei_cbdata = NULL; + lmm = NULL; + } else + it->it_flags &= ~O_JOIN_FILE; + } else if (it->it_op & IT_UNLINK) + req = mdc_intent_unlink_pack(exp, it, op_data); + else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) + req = mdc_intent_getattr_pack(exp, it, op_data); + else if (it->it_op == IT_READDIR) + req = ldlm_enqueue_pack(exp); + else { + LBUG(); + RETURN(-EINVAL); + } + + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); + pill = &req->rq_pill; + + /* It is important to obtain rpc_lock first (if applicable), so that + * threads that are serialised with rpc_lock are not polluting our + * rpcs in flight counter */ + mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_enter_request(&obddev->u.cli); + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, + 0, NULL, lockh, 0); + mdc_exit_request(&obddev->u.cli); + mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + if (rc < 0) { + CERROR("ldlm_cli_enqueue: %d\n", rc); + mdc_clear_replay_flag(req, rc); + ptlrpc_req_finished(req); + RETURN(rc); + } + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); + + RETURN(rc); +} + +static int mdc_finish_intent_lock(struct obd_export *exp, + struct ptlrpc_request *request, + struct md_op_data *op_data, + struct lookup_intent *it, + struct lustre_handle *lockh) +{ + struct lustre_handle old_lock; + struct mdt_body *mdt_body; + struct ldlm_lock *lock; + int rc; + + + LASSERT(request != NULL); + LASSERT(request != LP_POISON); + LASSERT(request->rq_repmsg != LP_POISON); + + if (!it_disposition(it, DISP_IT_EXECD)) { + /* The server failed before it even started executing the + * intent, i.e. because it couldn't unpack the request. */ + LASSERT(it->d.lustre.it_status != 0); + RETURN(it->d.lustre.it_status); + } + rc = it_open_error(DISP_IT_EXECD, it); + if (rc) + RETURN(rc); + + mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); + LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ + + /* If we were revalidating a fid/name pair, mark the intent in + * case we fail and get called again from lookup */ + if (fid_is_sane(&op_data->op_fid2) && + it->it_flags & O_CHECK_STALE && + it->it_op != IT_GETATTR) { + it_set_disposition(it, DISP_ENQ_COMPLETE); + + /* Also: did we find the same inode? */ + /* sever can return one of two fids: + * op_fid2 - new allocated fid - if file is created. + * op_fid3 - existent fid - if file only open. + * op_fid3 is saved in lmv_intent_open */ + if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) && + (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) { + CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID + "\n", PFID(&op_data->op_fid2), + PFID(&op_data->op_fid2), PFID(&mdt_body->fid1)); + RETURN(-ESTALE); + } + } + + rc = it_open_error(DISP_LOOKUP_EXECD, it); + if (rc) + RETURN(rc); + + /* keep requests around for the multiple phases of the call + * this shows the DISP_XX must guarantee we make it into the call + */ + if (!it_disposition(it, DISP_ENQ_CREATE_REF) && + it_disposition(it, DISP_OPEN_CREATE) && + !it_open_error(DISP_OPEN_CREATE, it)) { + it_set_disposition(it, DISP_ENQ_CREATE_REF); + ptlrpc_request_addref(request); /* balanced in ll_create_node */ + } + if (!it_disposition(it, DISP_ENQ_OPEN_REF) && + it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + it_set_disposition(it, DISP_ENQ_OPEN_REF); + ptlrpc_request_addref(request); /* balanced in ll_file_open */ + /* BUG 11546 - eviction in the middle of open rpc processing */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); + } + + if (it->it_op & IT_CREAT) { + /* XXX this belongs in ll_create_it */ + } else if (it->it_op == IT_OPEN) { + LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); + } else { + LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); + } + + /* If we already have a matching lock, then cancel the new + * one. We have to set the data here instead of in + * mdc_enqueue, because we need to use the child's inode as + * the l_ast_data to match, and that's not available until + * intent_finish has performed the iget().) */ + lock = ldlm_handle2lock(lockh); + if (lock) { + ldlm_policy_data_t policy = lock->l_policy_data; + LDLM_DEBUG(lock, "matching against this"); + + LASSERTF(fid_res_name_eq(&mdt_body->fid1, + &lock->l_resource->lr_name), + "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n", + (unsigned long)lock->l_resource->lr_name.name[0], + (unsigned long)lock->l_resource->lr_name.name[1], + (unsigned long)lock->l_resource->lr_name.name[2], + (unsigned long)fid_seq(&mdt_body->fid1), + (unsigned long)fid_oid(&mdt_body->fid1), + (unsigned long)fid_ver(&mdt_body->fid1)); + LDLM_LOCK_PUT(lock); + + memcpy(&old_lock, lockh, sizeof(*lockh)); + if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, + LDLM_IBITS, &policy, LCK_NL, &old_lock)) { + ldlm_lock_decref_and_cancel(lockh, + it->d.lustre.it_lock_mode); + memcpy(lockh, &old_lock, sizeof(old_lock)); + it->d.lustre.it_lock_handle = lockh->cookie; + } + } + CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", + op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), + it->d.lustre.it_status, it->d.lustre.it_disposition, rc); + RETURN(rc); +} + /* * This long block is all about fixing up the lock and request state * so that it is correct as of the moment _before_ the operation was @@ -704,11 +817,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, ldlm_blocking_callback cb_blocking, int extra_lock_flags) { - struct ptlrpc_request *request; - struct lustre_handle old_lock; struct lustre_handle lockh; - struct mdt_body *mdt_body; - struct ldlm_lock *lock; int rc = 0; ENTRY; LASSERT(it); @@ -742,8 +851,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, &op_data->op_fid2, LDLM_IBITS, &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh); if (mode) { - memcpy(&it->d.lustre.it_lock_handle, &lockh, - sizeof(lockh)); + it->d.lustre.it_lock_handle = lockh.cookie; it->d.lustre.it_lock_mode = mode; } @@ -778,7 +886,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, lmm, lmmsize, extra_lock_flags); if (rc < 0) RETURN(rc); - memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); + it->d.lustre.it_lock_handle = lockh.cookie; } else if (!fid_is_sane(&op_data->op_fid2) || !(it->it_flags & O_CHECK_STALE)) { /* DISP_ENQ_COMPLETE set means there is extra reference on @@ -787,109 +895,125 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, * lookup, so we clear DISP_ENQ_COMPLETE */ it_clear_disposition(it, DISP_ENQ_COMPLETE); } - request = *reqp = it->d.lustre.it_data; - LASSERT(request != NULL); - LASSERT(request != LP_POISON); - LASSERT(request->rq_repmsg != LP_POISON); + *reqp = it->d.lustre.it_data; + rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); + RETURN(rc); +} - if (!it_disposition(it, DISP_IT_EXECD)) { - /* The server failed before it even started executing the - * intent, i.e. because it couldn't unpack the request. */ - LASSERT(it->d.lustre.it_status != 0); - RETURN(it->d.lustre.it_status); - } - rc = it_open_error(DISP_IT_EXECD, it); - if (rc) - RETURN(rc); +static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req, + void *unused, int rc) +{ + struct obd_export *exp = req->rq_async_args.pointer_arg[0]; + struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1]; + struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2]; + struct lookup_intent *it; + struct lustre_handle *lockh; + struct obd_device *obddev; + int flags = LDLM_FL_HAS_INTENT; + ENTRY; - mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY); - LASSERT(mdt_body != NULL); /* mdc_enqueue checked */ + it = &minfo->mi_it; + lockh = &minfo->mi_lockh; - /* If we were revalidating a fid/name pair, mark the intent in - * case we fail and get called again from lookup */ - if (fid_is_sane(&op_data->op_fid2) && - (it->it_flags & O_CHECK_STALE) && - it->it_op != IT_GETATTR) { - it_set_disposition(it, DISP_ENQ_COMPLETE); + obddev = class_exp2obd(exp); - /* Also: did we find the same inode? */ - /* sever can return one of two fids: - * op_fid2 - new allocated fid - if file is created. - * op_fid3 - existent fid - if file only open. - * op_fid3 is saved in lmv_intent_open */ - if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) && - (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) { - CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID - "\n", PFID(&op_data->op_fid2), - PFID(&op_data->op_fid2), PFID(&mdt_body->fid1)); - RETURN(-ESTALE); - } + mdc_exit_request(&obddev->u.cli); + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) + rc = -ETIMEDOUT; + + rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, + &flags, NULL, 0, NULL, lockh, rc); + if (rc < 0) { + CERROR("ldlm_cli_enqueue_fini: %d\n", rc); + mdc_clear_replay_flag(req, rc); + GOTO(out, rc); } - rc = it_open_error(DISP_LOOKUP_EXECD, it); + rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); if (rc) - RETURN(rc); + GOTO(out, rc); - /* keep requests around for the multiple phases of the call - * this shows the DISP_XX must guarantee we make it into the call - */ - if (!it_disposition(it, DISP_ENQ_CREATE_REF) && - it_disposition(it, DISP_OPEN_CREATE) && - !it_open_error(DISP_OPEN_CREATE, it)) { - it_set_disposition(it, DISP_ENQ_CREATE_REF); - ptlrpc_request_addref(request); /* balanced in ll_create_node */ - } - if (!it_disposition(it, DISP_ENQ_OPEN_REF) && - it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - it_set_disposition(it, DISP_ENQ_OPEN_REF); - ptlrpc_request_addref(request); /* balanced in ll_file_open */ - /* BUG 11546 - eviction in the middle of open rpc processing */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); - } + it->d.lustre.it_lock_handle = lockh->cookie; - if (it->it_op & IT_CREAT) { - /* XXX this belongs in ll_create_it */ - } else if (it->it_op == IT_OPEN) { - LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); - } else { - LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); + rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh); + EXIT; + +out: + OBD_FREE_PTR(einfo); + minfo->mi_cb(req, minfo, rc); + return 0; +} + +int mdc_intent_getattr_async(struct obd_export *exp, + struct md_enqueue_info *minfo, + struct ldlm_enqueue_info *einfo) +{ + struct md_op_data *op_data = &minfo->mi_data; + struct lookup_intent *it = &minfo->mi_it; + struct ptlrpc_request *req; + struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_res_id res_id; + ldlm_policy_data_t policy = { + .l_inodebits = { MDS_INODELOCK_LOOKUP } + }; + int rc; + int flags = LDLM_FL_HAS_INTENT; + ENTRY; + + CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n", + op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1), + ldlm_it2str(it->it_op), it->it_flags); + + fid_build_reg_res_name(&op_data->op_fid1, &res_id); + req = mdc_intent_getattr_pack(exp, it, op_data); + if (!req) + RETURN(-ENOMEM); + + mdc_enter_request(&obddev->u.cli); + rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, + 0, NULL, &minfo->mi_lockh, 1); + if (rc < 0) { + mdc_exit_request(&obddev->u.cli); + RETURN(rc); } - /* If we already have a matching lock, then cancel the new - * one. We have to set the data here instead of in - * mdc_enqueue, because we need to use the child's inode as - * the l_ast_data to match, and that's not available until - * intent_finish has performed the iget().) */ - lock = ldlm_handle2lock(&lockh); - if (lock) { - ldlm_policy_data_t policy = lock->l_policy_data; - LDLM_DEBUG(lock, "matching against this"); + req->rq_async_args.pointer_arg[0] = exp; + req->rq_async_args.pointer_arg[1] = minfo; + req->rq_async_args.pointer_arg[2] = einfo; + req->rq_interpret_reply = mdc_intent_getattr_async_interpret; + ptlrpcd_add_req(req); - LASSERTF(fid_res_name_eq(&mdt_body->fid1, - &lock->l_resource->lr_name), - "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n", - (unsigned long)lock->l_resource->lr_name.name[0], - (unsigned long)lock->l_resource->lr_name.name[1], - (unsigned long)lock->l_resource->lr_name.name[2], - (unsigned long)fid_seq(&mdt_body->fid1), - (unsigned long)fid_oid(&mdt_body->fid1), - (unsigned long)fid_ver(&mdt_body->fid1)); - LDLM_LOCK_PUT(lock); + RETURN(0); +} - memcpy(&old_lock, &lockh, sizeof(lockh)); - if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_IBITS, &policy, LCK_NL, &old_lock)) { - ldlm_lock_decref_and_cancel(&lockh, - it->d.lustre.it_lock_mode); - memcpy(&lockh, &old_lock, sizeof(old_lock)); - memcpy(&it->d.lustre.it_lock_handle, &lockh, - sizeof(lockh)); - } +int mdc_revalidate_lock(struct obd_export *exp, + struct lookup_intent *it, + struct lu_fid *fid) +{ + /* We could just return 1 immediately, but since we should only + * be called in revalidate_it if we already have a lock, let's + * verify that. */ + struct ldlm_res_id res_id; + struct lustre_handle lockh; + ldlm_policy_data_t policy; + ldlm_mode_t mode; + ENTRY; + + fid_build_reg_res_name(fid, &res_id); + /* As not all attributes are kept under update lock, e.g. + owner/group/acls are under lookup lock, we need both + ibits for GETATTR. */ + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : + MDS_INODELOCK_LOOKUP; + + mode = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, + &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh); + if (mode) { + it->d.lustre.it_lock_handle = lockh.cookie; + it->d.lustre.it_lock_mode = mode; } - CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", - op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op), - it->d.lustre.it_status, it->d.lustre.it_disposition, rc); - RETURN(rc); + RETURN(!!mode); } diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index b58bdcf937c54a0c18988378643782a7c62862fe..7148cf9401e8edb82939dc4c1c40d60919241944 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -136,11 +136,11 @@ int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid, /* * This function now is known to always saying that it will receive 4 buffers * from server. Even for cases when acl_size and md_size is zero, RPC header - * willcontain 4 fields and RPC itself will contain zero size fields. This is + * will contain 4 fields and RPC itself will contain zero size fields. This is * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed * and thus zero, it shirinks it, making zero size. The same story about * md_size. And this is course of problem when client waits for smaller number - * of fields. This issue will be fixed later when client gets awar of RPC + * of fields. This issue will be fixed later when client gets aware of RPC * layouts. --umka */ static int mdc_getattr_common(struct obd_export *exp, @@ -1831,8 +1831,10 @@ struct md_ops mdc_md_ops = { .m_free_lustre_md = mdc_free_lustre_md, .m_set_open_replay_data = mdc_set_open_replay_data, .m_clear_open_replay_data = mdc_clear_open_replay_data, + .m_renew_capa = mdc_renew_capa, .m_get_remote_perm = mdc_get_remote_perm, - .m_renew_capa = mdc_renew_capa + .m_intent_getattr_async = mdc_intent_getattr_async, + .m_revalidate_lock = mdc_revalidate_lock }; extern quota_interface_t mdc_quota_interface; diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 293c90d0a7d01e9f4ad43766fa0b2b7999dd45c2..f57b55c4c515ea7068b589e850e79da4fd0ac6ac 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -1200,6 +1200,8 @@ int lprocfs_alloc_md_stats(struct obd_device *obd, LPROCFS_MD_OP_INIT(num_private_stats, stats, cancel_unused); LPROCFS_MD_OP_INIT(num_private_stats, stats, renew_capa); LPROCFS_MD_OP_INIT(num_private_stats, stats, get_remote_perm); + LPROCFS_MD_OP_INIT(num_private_stats, stats, intent_getattr_async); + LPROCFS_MD_OP_INIT(num_private_stats, stats, revalidate_lock); for (i = num_private_stats; i < num_stats; i++) { if (stats->ls_percpu[0]->lp_cntr[i].lc_name == NULL) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 2b567ad9d95b41586c5fe7b6f25c24e36b7b0830..4e7ee5436c39e5493ce1c0c9daa19c8bf9072c3f 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -80,6 +80,7 @@ FAIL_ON_ERROR=false cleanup() { echo -n "cln.." + pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; } cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; } } setup() { @@ -4785,6 +4786,87 @@ test_121() { #bug #10589 } run_test 121 "read cancel race =========" +test_123a() { # was test 123, statahead(bug 11401) + if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then + log "testing on UP system. Performance may be not as good as expected." + fi + + remount_client $MOUNT + mkdir -p $DIR/$tdir + error=0 + NUMFREE=`df -i -P $DIR | tail -n 1 | awk '{ print $4 }'` + [ $NUMFREE -gt 100000 ] && NUMFREE=100000 || NUMFREE=$((NUMFREE-1000)) + MULT=10 + for ((i=1, j=0; i<=$NUMFREE; j=$i, i=$((i * MULT)) )); do + createmany -o $DIR/$tdir/$tfile $j $((i - j)) + + lctl get_param -n llite.*.statahead_max | grep '[0-9]' + cancel_lru_locks mdc + cancel_lru_locks osc + stime=`date +%s` + ls -l $DIR/$tdir > /dev/null + etime=`date +%s` + delta_sa=$((etime - stime)) + log "ls $i files with statahead: $delta_sa sec" + lctl get_param -n llite.*.statahead_stats + + max=`lctl get_param -n llite.*.statahead_max | head -n 1` + lctl set_param -n llite.*.statahead_max 0 + lctl get_param llite.*.statahead_max + cancel_lru_locks mdc + cancel_lru_locks osc + stime=`date +%s` + ls -l $DIR/$tdir > /dev/null + etime=`date +%s` + delta=$((etime - stime)) + log "ls $i files without statahead: $delta sec" + + lctl set_param llite.*.statahead_max=$max + if [ $delta_sa -gt $(($delta + 2)) ]; then + log "ls $i files is slower with statahead!" + error=1 + fi + + [ $delta -gt 20 ] && break + [ $delta -gt 8 ] && MULT=$((50 / delta)) + [ "$SLOW" = "no" -a $delta -gt 3 ] && break + done + log "ls done" + + stime=`date +%s` + rm -r $DIR/$tdir + sync + etime=`date +%s` + delta=$((etime - stime)) + log "rm -r $DIR/$tdir/: $delta seconds" + log "rm done" + lctl get_param -n llite.*.statahead_stats + # wait for commitment of removal + sleep 2 + [ $error -ne 0 ] && error "statahead is slow!" + return 0 +} +run_test 123a "verify statahead work" + +test_123b () { # statahead(bug 15027) + mkdir -p $DIR/$tdir + createmany -o $DIR/$tdir/$tfile-%d 1000 + + cancel_lru_locks mdc + cancel_lru_locks osc + +#define OBD_FAIL_MDC_GETATTR_ENQUEUE 0x803 + sysctl -w lustre.fail_loc=0x80000803 + ls -lR $DIR/$tdir > /dev/null + log "ls done" + sysctl -w lustre.fail_loc=0x0 + lctl get_param -n llite.*.statahead_stats + rm -r $DIR/$tdir + sync + +} +run_test 123b "not panic with network error in statahead enqueue (bug 15027)" + test_124a() { [ -z "`lctl get_param -n mdc.*.connect_flags | grep lru_resize`" ] && \ skip "no lru resize on server" && return 0