From 2c556138af51384d6ca9da66614657f40c067526 Mon Sep 17 00:00:00 2001 From: scjody <scjody> Date: Thu, 16 Aug 2007 01:22:40 +0000 Subject: [PATCH] Branch b1_6 Revert bug 11401 due to build breakage and test badness. --- lustre/ChangeLog | 5 - lustre/include/lustre_mds.h | 41 -- lustre/ldlm/ldlm_lockd.c | 2 +- lustre/llite/Makefile.in | 2 +- lustre/llite/dcache.c | 7 +- lustre/llite/dir.c | 35 +- lustre/llite/file.c | 7 - lustre/llite/llite_internal.h | 87 +--- lustre/llite/llite_lib.c | 9 - lustre/llite/lproc_llite.c | 56 --- lustre/llite/namei.c | 20 +- lustre/llite/statahead.c | 847 ---------------------------------- lustre/llite/xattr.c | 2 - lustre/mdc/mdc_lib.c | 5 +- lustre/mdc/mdc_locks.c | 733 +++++++++++------------------ lustre/mdc/mdc_request.c | 8 - lustre/tests/sanity.sh | 64 +-- 17 files changed, 329 insertions(+), 1601 deletions(-) delete mode 100644 lustre/llite/statahead.c diff --git a/lustre/ChangeLog b/lustre/ChangeLog index e72609663a..8b08cd2a92 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -187,11 +187,6 @@ Description: reply_lock_interpret crash due to race with it and lock cancel. Details : Do not replay locks that are being cancelled. Do not reference locks by their address during replay, just by their handle. -Severity : enhancement -Bugzilla : 11401 -Description: client-side metadata stat-ahead during readdir(directory readahead) -Details : perform client-side metadata stat-ahead when the client detects - readdir and sequential stat of dir entries therein Severity : normal Bugzilla : 11679 diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h index 0759c5ff27..981e444937 100644 --- a/lustre/include/lustre_mds.h +++ b/lustre/include/lustre_mds.h @@ -113,8 +113,6 @@ int mds_reint_rec(struct mds_update_record *r, int offset, /* mds/mds_lov.c */ /* mdc/mdc_locks.c */ -struct md_enqueue_info; - int it_disposition(struct lookup_intent *it, int flag); void it_set_disposition(struct lookup_intent *it, int flag); void it_clear_disposition(struct lookup_intent *it, int flag); @@ -122,9 +120,6 @@ int it_open_error(int phase, struct lookup_intent *it); void mdc_set_lock_data(__u64 *lockh, void *data); int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, ldlm_iterator_t it, void *data); -int mdc_revalidate_lock(struct obd_export *exp, - struct lookup_intent *it, - struct ll_fid *fid); int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *, void *lmm, int lmmsize, @@ -135,9 +130,6 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct mdc_op_data *data, struct lustre_handle *lockh, void *lmm, int lmmlen, int extra_lock_flags); -int mdc_intent_getattr_async(struct obd_export *exp, - struct md_enqueue_info *minfo, - struct ldlm_enqueue_info *einfo); /* mdc/mdc_request.c */ int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp); @@ -205,18 +197,6 @@ static inline void mdc_pack_fid(struct ll_fid *fid, obd_id ino, __u32 gen, fid->f_type = type; } -static inline int it_to_lock_mode(struct lookup_intent *it) -{ - /* CREAT needs to be tested before open (both could be set) */ - if (it->it_op & IT_CREAT) - return LCK_CW; - else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) - return LCK_CR; - - LBUG(); - return -EINVAL; -} - /* ioctls for trying requests */ #define IOC_REQUEST_TYPE 'f' #define IOC_REQUEST_MIN_NR 30 @@ -229,25 +209,4 @@ static inline int it_to_lock_mode(struct lookup_intent *it) #define IOC_REQUEST_CLOSE _IOWR('f', 35, long) #define IOC_REQUEST_MAX_NR 35 -/* metadata stat-ahead */ -typedef int (* md_enqueue_cb_t)(struct obd_export *exp, - struct ptlrpc_request *req, - struct md_enqueue_info *minfo, - int rc); - -struct md_enqueue_info { - struct obd_export *mi_exp; - struct mdc_op_data mi_data; - struct lookup_intent mi_it; - struct lustre_handle mi_lockh; - struct dentry *mi_dentry; - md_enqueue_cb_t mi_cb; - void *mi_cbdata; -}; - -struct mdc_enqueue_args { - struct md_enqueue_info *ma_mi; - struct ldlm_enqueue_info *ma_ei; -}; - #endif diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index f738e3b144..7fd3d0b70d 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -106,8 +106,8 @@ struct ldlm_bl_work_item { static inline int have_expired_locks(void) { int need_to_run; - ENTRY; + ENTRY; spin_lock_bh(&waiting_locks_spinlock); need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks); spin_unlock_bh(&waiting_locks_spinlock); diff --git a/lustre/llite/Makefile.in b/lustre/llite/Makefile.in index ff06efd3cd..dfa273bd92 100644 --- a/lustre/llite/Makefile.in +++ b/lustre/llite/Makefile.in @@ -1,5 +1,5 @@ MODULES := lustre -lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o statahead.o +lustre-objs := dcache.o dir.o file.o llite_close.o llite_lib.o llite_nfs.o rw.o lproc_llite.o namei.o symlink.o llite_mmap.o xattr.o ifeq ($(PATCHLEVEL),4) lustre-objs += rw24.o super.o diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 29418ffe0b..12286953c9 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -333,11 +333,11 @@ void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft) int ll_revalidate_it(struct dentry *de, int lookup_flags, struct lookup_intent *it) { + int rc; struct mdc_op_data op_data; struct ptlrpc_request *req = NULL; struct lookup_intent lookup_it = { .it_op = IT_LOOKUP }; struct obd_export *exp; - int first = 0, rc; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name, @@ -426,16 +426,11 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, } } - if (it->it_op == IT_GETATTR) - first = ll_statahead_enter(de->d_parent->d_inode, &de, 0); - do_lock: it->it_create_mode &= ~current->fs->umask; rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags, &req, ll_mdc_blocking_ast, 0); - if (it->it_op == IT_GETATTR && !first) - ll_statahead_exit(de, rc); /* If req is NULL, then mdc_intent_lock only tried to do a lock match; * if all was well, it will return 1 if it found locks, 0 otherwise. */ if (req == NULL && rc >= 0) { diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 60e2762a5a..2666a0411f 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -27,6 +27,7 @@ */ #include <linux/fs.h> +#include <linux/ext2_fs.h> #include <linux/pagemap.h> #include <linux/mm.h> #include <linux/version.h> @@ -48,6 +49,8 @@ #include <lustre_dlm.h> #include "llite_internal.h" +typedef struct ext2_dir_entry_2 ext2_dirent; + #ifdef HAVE_PG_FS_MISC #define PageChecked(page) test_bit(PG_fs_misc, &(page)->flags) #define SetPageChecked(page) set_bit(PG_fs_misc, &(page)->flags) @@ -102,6 +105,18 @@ static inline unsigned ext2_chunk_size(struct inode *inode) return inode->i_sb->s_blocksize; } +static inline void ext2_put_page(struct page *page) +{ + kunmap(page); + page_cache_release(page); +} + +static inline unsigned long dir_pages(struct inode *inode) +{ + return (inode->i_size+CFS_PAGE_SIZE-1) >> CFS_PAGE_SHIFT; +} + + static void ext2_check_page(struct inode *dir, struct page *page) { unsigned chunk_size = ext2_chunk_size(dir); @@ -190,7 +205,7 @@ fail: SetPageError(page); } -struct page *ll_get_dir_page(struct inode *dir, unsigned long n) +static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) { struct ldlm_res_id res_id = { .name = { dir->i_ino, (__u64)dir->i_generation} }; @@ -249,6 +264,24 @@ fail: goto out_unlock; } +/* + * p is at least 6 bytes before the end of page + */ +static inline ext2_dirent *ext2_next_entry(ext2_dirent *p) +{ + return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len)); +} + +static inline unsigned +ext2_validate_entry(char *base, unsigned offset, unsigned mask) +{ + ext2_dirent *de = (ext2_dirent*)(base + offset); + ext2_dirent *p = (ext2_dirent*)(base + (offset&mask)); + while ((char*)p < (char*)de) + p = ext2_next_entry(p); + return (char *)p - base; +} + static unsigned char ext2_filetype_table[EXT2_FT_MAX] = { [EXT2_FT_UNKNOWN] DT_UNKNOWN, [EXT2_FT_REG_FILE] DT_REG, diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 9b3bce1c15..6d669e2389 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -235,9 +235,6 @@ int ll_file_release(struct inode *inode, struct file *file) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - if (S_ISDIR(inode->i_mode)) - ll_stop_statahead(inode); - /* don't do anything for / */ if (inode->i_sb->s_root == file->f_dentry) RETURN(0); @@ -265,7 +262,6 @@ static int ll_intent_file_open(struct file *file, void *lmm, struct inode *inode = file->f_dentry->d_inode; struct ptlrpc_request *req; int rc; - ENTRY; if (!parent) RETURN(-ENOENT); @@ -389,9 +385,6 @@ int ll_file_open(struct inode *inode, struct file *file) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino, inode->i_generation, inode, file->f_flags); - if (S_ISDIR(inode->i_mode) && lli->lli_opendir_pid == 0) - lli->lli_opendir_pid = current->pid; - /* don't do anything for / */ if (inode->i_sb->s_root == file->f_dentry) RETURN(0); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 67617177aa..b5a4d54534 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -5,7 +5,6 @@ #ifndef LLITE_INTERNAL_H #define LLITE_INTERNAL_H -#include <linux/ext2_fs.h> #ifdef CONFIG_FS_POSIX_ACL # include <linux/fs.h> #ifdef HAVE_XATTR_ACL @@ -108,10 +107,6 @@ struct ll_inode_info { #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) struct inode lli_vfs_inode; #endif - - /* metadata stat-ahead */ - pid_t lli_opendir_pid; - struct ll_statahead_info *lli_sai; }; /* @@ -262,19 +257,9 @@ struct ll_sb_info { enum stats_track_type ll_stats_track_type; int ll_stats_track_id; int ll_rw_stats_on; + dev_t ll_sdev_orig; /* save s_dev before assign for * clustred nfs */ - - /* metadata stat-ahead */ - unsigned int ll_sa_count; /* current statahead RPCs */ - unsigned int ll_sa_max; /* max statahead RPCs */ - unsigned int ll_sa_wrong; /* statahead thread stopped for - * low hit ratio */ - unsigned int ll_sa_total; /* statahead thread started - * count */ - unsigned long long ll_sa_blocked; /* ls count waiting for - * statahead */ - unsigned long long ll_sa_cached; /* ls count got in cache */ }; #define LL_DEFAULT_MAX_RW_CHUNK (32 * 1024 * 1024) @@ -377,9 +362,9 @@ static inline struct inode *ll_info2i(struct ll_inode_info *lli) } struct it_cb_data { - struct inode *icbd_parent; + struct inode *icbd_parent; struct dentry **icbd_childp; - obd_id hash; + obd_id hash; }; void ll_i2gids(__u32 *suppgids, struct inode *i1,struct inode *i2); @@ -449,38 +434,6 @@ static void ll_stats_ops_tally(struct ll_sb_info *sbi, int op, int count) {} extern struct file_operations ll_dir_operations; extern struct inode_operations ll_dir_inode_operations; -struct page *ll_get_dir_page(struct inode *dir, unsigned long n); -/* - * p is at least 6 bytes before the end of page - */ -typedef struct ext2_dir_entry_2 ext2_dirent; - -static inline ext2_dirent *ext2_next_entry(ext2_dirent *p) -{ - return (ext2_dirent *)((char*)p + le16_to_cpu(p->rec_len)); -} - -static inline unsigned -ext2_validate_entry(char *base, unsigned offset, unsigned mask) -{ - ext2_dirent *de = (ext2_dirent*)(base + offset); - ext2_dirent *p = (ext2_dirent*)(base + (offset&mask)); - while ((char*)p < (char*)de) - p = ext2_next_entry(p); - return (char *)p - base; -} - -static inline void ext2_put_page(struct page *page) -{ - kunmap(page); - page_cache_release(page); -} - -static inline unsigned long dir_pages(struct inode *inode) -{ - return (inode->i_size + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; -} - /* llite/namei.c */ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir); struct inode *ll_iget(struct super_block *sb, ino_t hash, @@ -496,9 +449,6 @@ int ll_prepare_mdc_op_data(struct mdc_op_data *, struct lookup_intent *ll_convert_intent(struct open_intent *oit, int lookup_flags); #endif -int lookup_it_finish(struct ptlrpc_request *request, int offset, - struct lookup_intent *it, void *data); -void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry); /* llite/rw.c */ int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); @@ -761,35 +711,4 @@ ssize_t ll_getxattr(struct dentry *dentry, const char *name, ssize_t ll_listxattr(struct dentry *dentry, char *buffer, size_t size); int ll_removexattr(struct dentry *dentry, const char *name); -/* statahead.c */ - -#define LL_STATAHEAD_MIN 1 -#define LL_STATAHEAD_DEF 32 -#define LL_STATAHEAD_MAX 10000 - -/* per inode struct, for dir only */ -struct ll_statahead_info { - struct inode *sai_inode; - atomic_t sai_refc; /* when access this struct, hold - * refcount */ - unsigned int sai_max; /* max ahead of lookup */ - unsigned int sai_sent; /* stat requests sent count */ - unsigned int sai_replied; /* stat requests which received - * reply */ - unsigned int sai_cached; /* UPDATE lock cached locally - * already */ - unsigned int sai_hit; /* hit count */ - unsigned int sai_miss; /* miss count */ - unsigned int sai_consecutive_miss; /* consecutive miss */ - unsigned sai_ls_all:1; /* ls -al, do stat-ahead for - * hidden entries */ - struct ptlrpc_thread sai_thread; /* stat-ahead thread */ - struct list_head sai_entries; /* stat-ahead entries */ - unsigned int sai_entries_nr; /* stat-ahead entries count */ -}; - -int ll_statahead_enter(struct inode *dir, struct dentry **dentry, int lookup); -void ll_statahead_exit(struct dentry *dentry, int result); -void ll_stop_statahead(struct inode *inode); - #endif /* LLITE_INTERNAL_H */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 4b49457b17..47f4cdf733 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -95,9 +95,6 @@ static struct ll_sb_info *ll_init_sbi(void) spin_lock_init(&sbi->ll_rw_extents_info.pp_extents[i].pp_w_hist.oh_lock); } - /* metadata statahead is enabled by default */ - sbi->ll_sa_max = LL_STATAHEAD_DEF; - RETURN(sbi); } @@ -1128,12 +1125,6 @@ void ll_clear_inode(struct inode *inode) CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); - if (S_ISDIR(inode->i_mode)) { - /* these should have been cleared in ll_file_release */ - LASSERT(lli->lli_sai == NULL); - LASSERT(lli->lli_opendir_pid == 0); - } - ll_inode2fid(&fid, inode); clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &lli->lli_flags); mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode); diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 8451b19f9b..4efcd45db2 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -437,59 +437,6 @@ static int ll_wr_track_gid(struct file *file, const char *buffer, return (ll_wr_track_id(buffer, count, data, STATS_TRACK_GID)); } -static int ll_rd_statahead_count(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct super_block *sb = data; - struct ll_sb_info *sbi = ll_s2sbi(sb); - - return snprintf(page, count, "%u\n", sbi->ll_sa_count); -} - -static int ll_rd_statahead_max(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct super_block *sb = data; - struct ll_sb_info *sbi = ll_s2sbi(sb); - - return snprintf(page, count, "%u\n", sbi->ll_sa_max); -} - -static int ll_wr_statahead_max(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct super_block *sb = data; - struct ll_sb_info *sbi = ll_s2sbi(sb); - int val, rc; - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - if (val >= 0 && val <= LL_STATAHEAD_MAX) - sbi->ll_sa_max = val; - else - CERROR("Bad statahead_max value %d. Valid values are in the " - "range [0, %d]\n", val, LL_STATAHEAD_MAX); - - return count; -} - -static int ll_rd_statahead_stats(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct super_block *sb = data; - struct ll_sb_info *sbi = ll_s2sbi(sb); - - return snprintf(page, count, - "statahead wrong: %u\n" - "statahead total: %u\n" - "ls blocked: %llu\n" - "ls total: %llu\n", - sbi->ll_sa_wrong, sbi->ll_sa_total, - sbi->ll_sa_blocked, - sbi->ll_sa_blocked + sbi->ll_sa_cached); -} - static struct lprocfs_vars lprocfs_obd_vars[] = { { "uuid", ll_rd_sb_uuid, 0, 0 }, //{ "mntpt_path", ll_rd_path, 0, 0 }, @@ -511,9 +458,6 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 }, { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 }, { "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 }, - { "statahead_count", ll_rd_statahead_count, 0, 0 }, - { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 }, - { "statahead_stats", ll_rd_statahead_stats, 0, 0 }, { 0 } }; diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 52472bcfd8..773f828f8d 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -373,7 +373,7 @@ static void ll_d_add(struct dentry *de, struct inode *inode) * in ll_revalidate_it. After revaliadate inode will be have hashed aliases * and it triggers BUG_ON in d_instantiate_unique (bug #10954). */ -static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) +struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) { struct list_head *tmp; struct dentry *dentry; @@ -442,8 +442,8 @@ static struct dentry *ll_find_alias(struct inode *inode, struct dentry *de) return de; } -int lookup_it_finish(struct ptlrpc_request *request, int offset, - struct lookup_intent *it, void *data) +static int lookup_it_finish(struct ptlrpc_request *request, int offset, + struct lookup_intent *it, void *data) { struct it_cb_data *icbd = data; struct dentry **de = icbd->icbd_childp; @@ -530,17 +530,8 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, RETURN(ERR_PTR(rc)); } - if (it->it_op == IT_GETATTR) { - rc = ll_statahead_enter(parent, &dentry, 1); - if (rc >= 0) { - ll_statahead_exit(dentry, rc); - if (rc == 1) - RETURN(retval = dentry); - } - } - - icbd.icbd_parent = parent; icbd.icbd_childp = &dentry; + icbd.icbd_parent = parent; rc = ll_prepare_mdc_op_data(&op_data, parent, NULL, dentry->d_name.name, dentry->d_name.len, lookup_flags, NULL); @@ -549,10 +540,9 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, it->it_create_mode &= ~current->fs->umask; - up(&parent->i_sem); rc = mdc_intent_lock(ll_i2mdcexp(parent), &op_data, NULL, 0, it, lookup_flags, &req, ll_mdc_blocking_ast, 0); - down(&parent->i_sem); + if (rc < 0) GOTO(out, retval = ERR_PTR(rc)); diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c deleted file mode 100644 index 717661186d..0000000000 --- a/lustre/llite/statahead.c +++ /dev/null @@ -1,847 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2007 Cluster File Systems, Inc. - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <linux/fs.h> -#include <linux/sched.h> -#include <linux/mm.h> -#include <linux/smp_lock.h> -#include <linux/highmem.h> -#include <linux/pagemap.h> - -#define DEBUG_SUBSYSTEM S_LLITE - -#include <obd_support.h> -#include <lustre_lite.h> -#include <lustre_dlm.h> -#include <linux/lustre_version.h> -#include "llite_internal.h" - -struct ll_sai_entry { - struct list_head se_list; - int se_index; - int se_stat; -}; - -enum { - SA_ENTRY_UNSTATED = 0, - SA_ENTRY_STATED -}; - -static struct ll_statahead_info *ll_sai_alloc(void) -{ - struct ll_statahead_info *sai; - - OBD_ALLOC_PTR(sai); - if (!sai) - return NULL; - - sai->sai_max = LL_STATAHEAD_MIN; - init_waitqueue_head(&sai->sai_thread.t_ctl_waitq); - INIT_LIST_HEAD(&sai->sai_entries); - atomic_set(&sai->sai_refc, 1); - return sai; -} - -static inline struct ll_statahead_info *ll_sai_get(struct ll_statahead_info *sai) -{ - LASSERT(sai); - atomic_inc(&sai->sai_refc); - return sai; -} - -static void ll_sai_put(struct ll_statahead_info *sai) -{ - struct inode *inode = sai->sai_inode; - struct ll_inode_info *lli = ll_i2info(inode); - - if (atomic_dec_and_lock(&sai->sai_refc, &lli->lli_lock)) { - struct ll_sai_entry *entry, *next; - - LASSERT(sai->sai_thread.t_flags & SVC_STOPPED); - list_for_each_entry_safe(entry, next, &sai->sai_entries, - se_list) { - list_del(&entry->se_list); - OBD_FREE_PTR(entry); - } - OBD_FREE_PTR(sai); - lli->lli_sai = NULL; - spin_unlock(&lli->lli_lock); - iput(inode); - } -} - -static struct ll_sai_entry *ll_sai_entry_get(struct ll_statahead_info *sai, - int index, int stat) -{ - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sb_info *sbi = ll_i2sbi(sai->sai_inode); - struct ll_sai_entry *entry; - - OBD_ALLOC_PTR(entry); - if (entry == NULL) - return NULL; - - CDEBUG(D_READA, "sai entry %p index %d, stat %d\n", entry, index, stat); - entry->se_index = index; - entry->se_stat = stat; - - spin_lock(&lli->lli_lock); - list_add_tail(&entry->se_list, &sai->sai_entries); - sai->sai_entries_nr++; - sbi->ll_sa_count = sai->sai_entries_nr; - spin_unlock(&lli->lli_lock); - - LASSERT(sai->sai_entries_nr <= sbi->ll_sa_max); - return entry; -} - -static void ll_sai_entry_set(struct ll_statahead_info *sai, int index, - int stat) -{ - struct ll_sai_entry *entry; - - list_for_each_entry(entry, &sai->sai_entries, se_list) { - if (entry->se_index == index) { - LASSERT(entry->se_stat == SA_ENTRY_UNSTATED); - entry->se_stat = stat; - CDEBUG(D_READA, "set sai entry %p index %d stat %d\n", - entry, index, stat); - return; - } - } - CERROR("can't find sai entry index %d\n", index); - LBUG(); -} - -/* check first entry was stated already */ -static int ll_sai_entry_stated(struct ll_statahead_info *sai) -{ - struct ll_inode_info *lli = ll_i2info(sai->sai_inode); - struct ll_sai_entry *entry; - int rc = 0; - - spin_lock(&lli->lli_lock); - if (!list_empty(&sai->sai_entries)) { - entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, - se_list); - CDEBUG(D_READA, "sai entry %p index %d stat %d\n", - entry, entry->se_index, entry->se_stat); - rc = (entry->se_stat != SA_ENTRY_UNSTATED); - } - spin_unlock(&lli->lli_lock); - - return rc; -} - -/* inside lli_lock */ -static void ll_sai_entry_put(struct ll_statahead_info *sai) -{ - struct ll_sai_entry *entry; - - LASSERT(!list_empty(&sai->sai_entries)); - LASSERT(sai->sai_entries_nr > 0); - - entry = list_entry(sai->sai_entries.next, struct ll_sai_entry, se_list); - list_del(&entry->se_list); - sai->sai_entries_nr--; - - CDEBUG(D_READA, "free sa entry %p index %d stat %d\n", - entry, entry->se_index, entry->se_stat); - OBD_FREE_PTR(entry); -} - -/* finish lookup/revalidate */ -static int ll_statahead_interpret(struct obd_export *exp, - struct ptlrpc_request *req, - struct md_enqueue_info *minfo, - int rc) -{ - struct lookup_intent *it = &minfo->mi_it; - struct dentry *dentry = minfo->mi_dentry; - struct inode *dir = dentry->d_parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai; - ENTRY; - - CDEBUG(D_READA, "statahead %.*s rc %d\n", - dentry->d_name.len, dentry->d_name.name, rc); - if (rc) - GOTO(out, rc); - - if (dentry->d_inode == NULL) { - /* lookup */ - struct dentry *save = dentry; - struct it_cb_data icbd = { - .icbd_parent = dir, - .icbd_childp = &dentry - }; - - down(&dir->i_sem); - rc = lookup_it_finish(req, DLM_REPLY_REC_OFF, it, &icbd); - if (!rc) { - LASSERT(dentry->d_inode); - if (dentry != save) - dput(save); - ll_lookup_finish_locks(it, dentry); - } - up(&dir->i_sem); - } else { - /* revalidate */ - struct mds_body *body; - - body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, - sizeof(*body)); - if (memcmp(&minfo->mi_data.fid2, &body->fid1, - sizeof(body->fid1))) { - ll_unhash_aliases(dentry->d_inode); - GOTO(out, rc = -EAGAIN); - } - - rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, dentry); - if (rc) { - ll_unhash_aliases(dentry->d_inode); - GOTO(out, rc); - } - - spin_lock(&dcache_lock); - lock_dentry(dentry); - __d_drop(dentry); - dentry->d_flags &= ~DCACHE_LUSTRE_INVALID; - unlock_dentry(dentry); - __d_rehash(dentry, 0); - spin_unlock(&dcache_lock); - - ll_lookup_finish_locks(it, dentry); - - } - EXIT; -out: - spin_lock(&lli->lli_lock); - sai = lli->lli_sai; - if (sai) { - lli->lli_sai->sai_replied++; - ll_sai_entry_set(lli->lli_sai, (int)minfo->mi_cbdata, - SA_ENTRY_STATED); - wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq); - } - spin_unlock(&lli->lli_lock); - ll_intent_release(it); - OBD_FREE_PTR(minfo); - - dput(dentry); - return rc; -} - -static void sa_args_fini(struct md_enqueue_info *minfo, - struct ldlm_enqueue_info *einfo) -{ - LASSERT(minfo && einfo); - OBD_FREE_PTR(minfo); - OBD_FREE_PTR(einfo); -} - -static int sa_args_prep(struct inode *dir, struct dentry *dentry, - struct md_enqueue_info **pmi, - struct ldlm_enqueue_info **pei) -{ - struct ll_inode_info *lli = ll_i2info(dir); - struct md_enqueue_info *minfo; - struct ldlm_enqueue_info *einfo; - - OBD_ALLOC_PTR(einfo); - if (einfo == NULL) - return -ENOMEM; - - OBD_ALLOC_PTR(minfo); - if (minfo == NULL) { - OBD_FREE_PTR(einfo); - return -ENOMEM; - } - - minfo->mi_exp = ll_i2mdcexp(dir); - intent_init(&minfo->mi_it, IT_GETATTR); - minfo->mi_dentry = dentry; - minfo->mi_cb = ll_statahead_interpret; - minfo->mi_cbdata = (void *)lli->lli_sai->sai_sent; - - einfo->ei_type = LDLM_IBITS; - einfo->ei_mode = it_to_lock_mode(&minfo->mi_it); - einfo->ei_cb_bl = ll_mdc_blocking_ast; - einfo->ei_cb_cp = ldlm_completion_ast; - einfo->ei_cb_gl = NULL; - einfo->ei_cbdata = NULL; - - *pmi = minfo; - *pei = einfo; - - return 0; -} - -/* similar to ll_lookup_it(). */ -static int do_sa_lookup(struct inode *dir, struct dentry *dentry) -{ - struct md_enqueue_info *minfo; - struct ldlm_enqueue_info *einfo; - int rc; - ENTRY; - - rc = sa_args_prep(dir, dentry, &minfo, &einfo); - if (rc) - RETURN(rc); - - rc = ll_prepare_mdc_op_data(&minfo->mi_data, dir, NULL, - dentry->d_name.name, dentry->d_name.len, 0, - NULL); - if (rc == 0) - rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo); - - if (rc) - sa_args_fini(minfo, einfo); - - RETURN(rc); -} - -/* similar to ll_revalidate_it(). - * return 1: dentry valid. - * 0: will send stat-ahead request. - * -errno: prepare stat-ahead request failed. */ -static int do_sa_revalidate(struct dentry *dentry) -{ - struct inode *inode = dentry->d_inode; - struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode); - struct ll_fid fid; - struct lookup_intent it; - struct md_enqueue_info *minfo; - struct ldlm_enqueue_info *einfo; - int rc; - ENTRY; - - if (inode == NULL) - RETURN(1); - - if (d_mountpoint(dentry)) - RETURN(1); - - ll_inode2fid(&fid, inode); - - intent_init(&it, IT_GETATTR); - rc = mdc_revalidate_lock(ll_i2mdcexp(inode), &it, &fid); - if (rc == 1) { - ll_intent_release(&it); - lli->lli_sai->sai_cached++; - wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq); - RETURN(1); - } - - rc = sa_args_prep(dentry->d_parent->d_inode, dentry, &minfo, &einfo); - if (rc) - RETURN(rc); - - rc = ll_prepare_mdc_op_data(&minfo->mi_data, dentry->d_parent->d_inode, - inode, dentry->d_name.name, - dentry->d_name.len, 0, NULL); - if (rc == 0) - rc = mdc_intent_getattr_async(minfo->mi_exp, minfo, einfo); - - if (rc) - sa_args_fini(minfo, einfo); - - RETURN(rc); -} - -/* copied from kernel */ -static inline void name2qstr(struct qstr *this, const char *name, int namelen) -{ - unsigned long hash; - const unsigned char *p = (const unsigned char *)name; - int len; - unsigned int c; - - hash = init_name_hash(); - for (len = 0; len < namelen; len++, p++) { - c = *p; - hash = partial_name_hash(c, hash); - } - this->name = name; - this->len = namelen; - this->hash = end_name_hash(hash); -} - -static int ll_statahead_one(struct dentry *parent, ext2_dirent *de) -{ - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct qstr name; - struct dentry *dentry; - struct ll_sai_entry *se; - int rc; - ENTRY; - - name2qstr(&name, de->name, de->name_len); - - se = ll_sai_entry_get(lli->lli_sai, lli->lli_sai->sai_sent, - SA_ENTRY_UNSTATED); - - down(&dir->i_sem); - if (parent->d_flags & DCACHE_LUSTRE_INVALID) { - CDEBUG(D_READA, "parent dentry@%p %.*s is " - "DCACHE_LUSTRE_INVALID, skip statahead\n", - parent, parent->d_name.len, parent->d_name.name); - up(&dir->i_sem); - GOTO(out, rc = -EINVAL); - } - - dentry = d_lookup(parent, &name); - if (!dentry) { - struct dentry *dentry = d_alloc(parent, &name); - - up(&dir->i_sem); - rc = -ENOMEM; - if (dentry) { - rc = do_sa_lookup(dir, dentry); - if (rc) - dput(dentry); - } - GOTO(out, rc); - } - up(&dir->i_sem); - - rc = do_sa_revalidate(dentry); - if (rc) - dput(dentry); - GOTO(out, rc); -out: - if (rc) { - CDEBUG(D_READA, "set sai entry %p index %d stat %d\n", - se, se->se_index, se->se_stat); - se->se_stat = rc; - wake_up(&lli->lli_sai->sai_thread.t_ctl_waitq); - } - lli->lli_sai->sai_sent++; - return rc; -} - -static inline int sa_check_stop(struct ll_statahead_info *sai) -{ - return !!(sai->sai_thread.t_flags & SVC_STOPPING); -} - -static inline int sa_not_full(struct ll_statahead_info *sai) -{ - return sai->sai_sent - sai->sai_miss - sai->sai_hit < sai->sai_max; -} - -struct ll_sa_thread_args { - struct dentry *sta_parent; - pid_t sta_pid; -}; - -static int ll_statahead_thread(void *arg) -{ - struct ll_sa_thread_args *sta = arg; - struct dentry *parent = dget(sta->sta_parent); - struct inode *dir = parent->d_inode; - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_statahead_info *sai = ll_sai_get(lli->lli_sai); - struct ptlrpc_thread *thread = &sai->sai_thread; - struct l_wait_info lwi = { 0 }; - unsigned long index = 0; - __u64 offset = 0; - int skip = 0; - int rc = 0; - char name[16] = ""; - ENTRY; - - sbi->ll_sa_total++; - - snprintf(name, 15, "ll_sa_%u", sta->sta_pid); - cfs_daemonize(name); - thread->t_flags = SVC_RUNNING; - wake_up(&thread->t_ctl_waitq); - CDEBUG(D_READA, "start doing statahead for %s\n", parent->d_name.name); - - if (sai->sai_ls_all) - CDEBUG(D_READA, "do statahead for hidden files\n"); - - while (1) { - unsigned long npages = dir_pages(dir); - - /* hit ratio < 80% */ - if ((sai->sai_hit < 4 * sai->sai_miss && sai->sai_hit > 7) || - (sai->sai_consecutive_miss > 8)) { - sbi->ll_sa_wrong++; - CDEBUG(D_READA, "statahead for dir %.*s hit ratio too " - "low: hit/miss %u/%u, sent/replied %u/%u, " - "cached %u\n", - parent->d_name.len, parent->d_name.name, - sai->sai_hit, sai->sai_miss, sai->sai_sent, - sai->sai_replied, sai->sai_cached); - break; - } - - /* reach the end of dir */ - if (index == npages) { - CDEBUG(D_READA, "reach end, index/npages %lu/%lu\n", - index, npages); - break; - } - - l_wait_event(thread->t_ctl_waitq, - sa_check_stop(sai) || sa_not_full(sai), - &lwi); - - if (sa_check_stop(sai)) - break; - - for (; index < npages; index++, offset = 0) { - char *kaddr, *limit; - ext2_dirent *de; - struct page *page; - - CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu" - "/%lu size %llu\n", - CFS_PAGE_SIZE, dir->i_ino, dir->i_generation, - index, npages, dir->i_size); - - page = ll_get_dir_page(dir, index); - npages = dir_pages(dir); - - if (IS_ERR(page)) { - rc = PTR_ERR(page); - CERROR("error reading dir %lu/%u page %lu: " - "rc %d\n", - dir->i_ino, dir->i_generation, index, - rc); - GOTO(out, rc); - } - - kaddr = page_address(page); - de = (ext2_dirent *)(kaddr + offset); - limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1); - for (; (char*)de <= limit && sa_not_full(sai); - de = ext2_next_entry(de)) { - if (!de->inode) - continue; - - /* don't stat-ahead ".", ".." */ - if (skip < 2) { - skip++; - continue; - } - - /* don't stat-ahead for hidden files */ - if (de->name[0] == '.' && !sai->sai_ls_all) - continue; - - /* don't stat-ahead for the first de */ - if (skip < 3) { - skip++; - continue; - } - - rc = ll_statahead_one(parent, de); - if (rc < 0) { - ext2_put_page(page); - GOTO(out, rc); - } - } - offset = (char *)de - kaddr; - ext2_put_page(page); - - if ((char *)de <= limit) - /* !sa_not_full() */ - break; - } - } - EXIT; -out: - thread->t_flags = SVC_STOPPED; - wake_up(&thread->t_ctl_waitq); - lli->lli_opendir_pid = 0; /* avoid statahead again */ - ll_sai_put(sai); - dput(parent); - return 0; -} - -/* called in ll_file_release */ -void ll_stop_statahead(struct inode *inode) -{ - struct ll_inode_info *lli = ll_i2info(inode); - struct ptlrpc_thread *thread; - - /* don't check pid here. upon fork, if parent closedir before child, - * child will not have chance to stop this thread. */ - lli->lli_opendir_pid = 0; - - spin_lock(&lli->lli_lock); - if (lli->lli_sai) { - ll_sai_get(lli->lli_sai); - spin_unlock(&lli->lli_lock); - - thread = &lli->lli_sai->sai_thread; - thread->t_flags = SVC_STOPPING; - wake_up(&thread->t_ctl_waitq); - wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED); - ll_sai_put(lli->lli_sai); - - CDEBUG(D_READA, "stop statahead thread, pid %d\n", - current->pid); - return; - } - spin_unlock(&lli->lli_lock); -} - -enum { - LS_NONE_FIRST_DE = 0, /* not first dirent, or is "." */ - LS_FIRST_DE, /* the first non-hidden dirent */ - LS_FIRST_DOT_DE /* the first hidden dirent, that is ".xxx" */ -}; - -static int is_first_dirent(struct inode *dir, struct dentry *dentry) -{ - struct qstr *d_name = &dentry->d_name; - unsigned long npages = dir_pages(dir); - struct page *page; - ext2_dirent *de; - unsigned long index; - __u64 offset = 0; - char *kaddr, *limit; - int dot_de = 1; /* dirent is dotfile till now */ - int rc = LS_NONE_FIRST_DE; - ENTRY; - - page = ll_get_dir_page(dir, 0); - if (IS_ERR(page)) { - CERROR("error reading dir %lu/%u page 0: rc %ld\n", - dir->i_ino, dir->i_generation, PTR_ERR(page)); - RETURN(LS_NONE_FIRST_DE); - } - - kaddr = page_address(page); - de = (ext2_dirent *)kaddr; - if (!(de->name_len == 1 && strncmp(de->name, ".", 1) == 0)) - CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino); - de = ext2_next_entry(de); /* skip ".", or ingore bad entry */ - if (!(de->name_len == 2 && strncmp(de->name, "..", 2) == 0)) - CWARN("Maybe got bad on-disk dir:%lu\n", dir->i_ino); - de = ext2_next_entry(de); /* skip "..", or ingore bad entry */ - - offset = (char *)de - kaddr; - - for (index = 0; index < npages; offset = 0) { - de = (ext2_dirent *)(kaddr + offset); - limit = kaddr + CFS_PAGE_SIZE - EXT2_DIR_REC_LEN(1); - for (; (char*)de <= limit; de = ext2_next_entry(de)) { - if (!de->inode) - continue; - - if (de->name[0] != '.') - dot_de = 0; - - if (dot_de && d_name->name[0] != '.') { - CDEBUG(D_READA, "%.*s skip hidden file %.*s\n", - d_name->len, d_name->name, - de->name_len, de->name); - continue; - } - - if (d_name->len == de->name_len && - !strncmp(d_name->name, de->name, d_name->len)) - rc = LS_FIRST_DE + dot_de; - else - rc = LS_NONE_FIRST_DE; - GOTO(out, rc); - } - - if (++index >= npages) - break; - - ext2_put_page(page); - - page = ll_get_dir_page(dir, index); - if (IS_ERR(page)) { - CERROR("error reading dir %lu/%u page %lu: rc %ld\n", - dir->i_ino, dir->i_generation, index, - PTR_ERR(page)); - RETURN(LS_NONE_FIRST_DE); - } - kaddr = page_address(page); - } - CERROR("%.*s not found in dir %.*s!\n", d_name->len, d_name->name, - dentry->d_parent->d_name.len, dentry->d_parent->d_name.name); - EXIT; -out: - ext2_put_page(page); - return rc; -} - -/* start stat-ahead thread if this is the first dir entry, otherwise if a thread - * is started already, wait until thread is ahead of me. - * Return value: - * 0 -- miss, - * 1 -- hit, - * -EEXIST -- stat ahead thread started, and this is the first try. - * other negative value -- error. - */ -int ll_statahead_enter(struct inode *dir, struct dentry **dentryp, int lookup) -{ - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_inode_info *lli = ll_i2info(dir); - struct ll_statahead_info *sai; - struct ll_sa_thread_args sta; - int rc; - ENTRY; - - if (sbi->ll_sa_max == 0) - RETURN(-ENOTSUPP); - - /* not the same process, don't statahead */ - if (lli->lli_opendir_pid != current->pid) - RETURN(-EBADF); - - spin_lock(&lli->lli_lock); - if (lli->lli_sai) { - sai = ll_sai_get(lli->lli_sai); - spin_unlock(&lli->lli_lock); - - if (ll_sai_entry_stated(sai)) { - sbi->ll_sa_cached++; - } else { - struct l_wait_info lwi = { 0 }; - - sbi->ll_sa_blocked++; - up(&dir->i_sem); - /* thread started already, avoid double-stat */ - l_wait_event(sai->sai_thread.t_ctl_waitq, - ll_sai_entry_stated(sai) || - sai->sai_thread.t_flags & SVC_STOPPED, - &lwi); - down(&dir->i_sem); - } - - ll_sai_put(sai); - - if (lookup) { - struct dentry *result; - - result = d_lookup((*dentryp)->d_parent, - &(*dentryp)->d_name); - if (result) { - LASSERT(result != *dentryp); - dput(*dentryp); - *dentryp = result; - } - RETURN(result != NULL); - } - /* do nothing for revalidate */ - RETURN(0); - } - spin_unlock(&lli->lli_lock); - - rc = is_first_dirent(dir, *dentryp); - if (!rc) { - /* optimization: don't statahead for this pid any longer */ - spin_lock(&lli->lli_lock); - if (lli->lli_sai == NULL) - lli->lli_opendir_pid = 0; - spin_unlock(&lli->lli_lock); - RETURN(-EBADF); - } - - spin_lock(&lli->lli_lock); - if (lli->lli_sai == NULL) { - lli->lli_sai = ll_sai_alloc(); - if (lli->lli_sai == NULL) { - spin_unlock(&lli->lli_lock); - RETURN(-ENOMEM); - } - } else { - /* sai is already there */ - spin_unlock(&lli->lli_lock); - RETURN(-EBUSY); - } - spin_unlock(&lli->lli_lock); - - sai = lli->lli_sai; - sai->sai_inode = igrab(dir); - sai->sai_ls_all = (rc == LS_FIRST_DOT_DE); - - sta.sta_parent = (*dentryp)->d_parent; - sta.sta_pid = current->pid; - rc = kernel_thread(ll_statahead_thread, &sta, 0); - if (rc < 0) { - CERROR("can't start ll_sa thread, rc: %d\n", rc); - ll_sai_put(sai); - RETURN(rc); - } - - wait_event(sai->sai_thread.t_ctl_waitq, - sai->sai_thread.t_flags & (SVC_RUNNING | SVC_STOPPED)); - ll_sai_put(sai); - - /* we don't stat-ahead for the first dirent since we are already in - * lookup, and -EEXIST also indicates that this is the first dirent. - */ - RETURN(-EEXIST); -} - -/* update hit/miss count */ -void ll_statahead_exit(struct dentry *dentry, int result) -{ - struct ll_inode_info *lli = ll_i2info(dentry->d_parent->d_inode); - struct ll_sb_info *sbi = ll_i2sbi(dentry->d_parent->d_inode); - - if (lli->lli_opendir_pid != current->pid) - return; - - spin_lock(&lli->lli_lock); - if (lli->lli_sai) { - struct ll_statahead_info *sai = lli->lli_sai; - - ll_sai_entry_put(sai); - if (result == 1) { - sai->sai_hit++; - sai->sai_consecutive_miss = 0; - sai->sai_max = min(2 * sai->sai_max, sbi->ll_sa_max); - CDEBUG(D_READA, "statahead %.*s hit (hit/miss %u/%u)\n", - dentry->d_name.len, dentry->d_name.name, - sai->sai_hit, sai->sai_miss); - } else { - sai->sai_miss++; - sai->sai_consecutive_miss++; - /* upon miss, it's always because some dentry is added - * by statahead thread, and at the mean time `ls` - * processs finds this dentry, but the d_op for this - * dentry is NULL, then revalidate is not done, and - * ll_statahead_exit() not called for this dentry, - * so statahead thread should be behind of `ls` process, - * put one entry to go ahead. - */ - ll_sai_entry_put(sai); - CDEBUG(D_READA, "statahead %.*s miss (hit/miss %u/%u)\n", - dentry->d_name.len, dentry->d_name.name, - sai->sai_hit, sai->sai_miss); - } - wake_up(&sai->sai_thread.t_ctl_waitq); - } - spin_unlock(&lli->lli_lock); -} diff --git a/lustre/llite/xattr.c b/lustre/llite/xattr.c index 5834f02b81..d123ba1445 100644 --- a/lustre/llite/xattr.c +++ b/lustre/llite/xattr.c @@ -239,8 +239,6 @@ int ll_getxattr_common(struct inode *inode, const char *name, posix_acl_release(acl); RETURN(rc); } - if (xattr_type == XATTR_ACL_DEFAULT_T && !S_ISDIR(inode->i_mode)) - RETURN(-ENODATA); #endif do_getxattr: diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index 89d1f0d54f..8b7c6afa15 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -315,8 +315,7 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, int valid, char *tmp; tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, data->namelen + 1); - memcpy(tmp, data->name, data->namelen); - data->name = tmp; + LOGL0(data->name, data->namelen, tmp); } } @@ -397,8 +396,8 @@ void mdc_exit_request(struct client_obd *cli) spin_lock(&cli->cl_loi_list_lock); cli->cl_r_in_flight--; - list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { + if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { /* No free request slots anymore */ break; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 3cf911a78a..7962b0ae4a 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -59,6 +59,18 @@ void it_clear_disposition(struct lookup_intent *it, int flag) } EXPORT_SYMBOL(it_clear_disposition); +static int it_to_lock_mode(struct lookup_intent *it) +{ + /* CREAT needs to be tested before open (both could be set) */ + if (it->it_op & IT_CREAT) + return LCK_CW; + else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP)) + return LCK_CR; + + LBUG(); + RETURN(-EINVAL); +} + int it_open_error(int phase, struct lookup_intent *it) { if (it_disposition(it, DISP_OPEN_OPEN)) { @@ -183,28 +195,25 @@ static int round_up(int val) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mds_body *body) + struct mds_body *body, int size[6]) { - int old_len, new_size, old_size; - struct lustre_msg *old_msg = req->rq_reqmsg; + int new_size, old_size; struct lustre_msg *new_msg; - old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2); /* save old size */ - old_size = lustre_msg_size(lustre_request_magic(req), - req->rq_reqmsg->lm_bufcount, - req->rq_reqmsg->lm_buflens); - - lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, - body->eadatasize); - new_size = lustre_msg_size(lustre_request_magic(req), - req->rq_reqmsg->lm_bufcount, - req->rq_reqmsg->lm_buflens); + old_size = lustre_msg_size(lustre_request_magic(req), 6, size); + + size[DLM_INTENT_REC_OFF + 2] = body->eadatasize; + new_size = lustre_msg_size(lustre_request_magic(req), 6, size); OBD_ALLOC(new_msg, new_size); if (new_msg != NULL) { - DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n", + struct lustre_msg *old_msg = req->rq_reqmsg; + + DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u", body->eadatasize); memcpy(new_msg, old_msg, old_size); + lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2, + body->eadatasize); spin_lock(&req->rq_lock); req->rq_reqmsg = new_msg; @@ -213,85 +222,107 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req, OBD_FREE(old_msg, old_size); } else { - lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len); body->valid &= ~OBD_MD_FLEASIZE; body->eadatasize = 0; } } -static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, - struct lookup_intent *it, - struct mdc_op_data *data, - void *lmm, int lmmsize) +/* We always reserve enough space in the reply packet for a stripe MD, because + * we don't know in advance the file type. */ +int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, + struct lookup_intent *it, struct mdc_op_data *op_data, + struct lustre_handle *lockh, void *lmm, int lmmsize, + int extra_lock_flags) { struct ptlrpc_request *req; - struct ldlm_intent *lit; struct obd_device *obddev = class_exp2obd(exp); + struct ldlm_res_id res_id = + { .name = {op_data->fid1.id, op_data->fid1.generation} }; + ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; + struct ldlm_request *lockreq; + struct ldlm_intent *lit; + struct ldlm_reply *lockrep; int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), + [DLM_LOCKREQ_OFF] = sizeof(*lockreq), [DLM_INTENT_IT_OFF] = sizeof(*lit), - [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create), - [DLM_INTENT_REC_OFF+1]= data->namelen + 1, - /* As an optimization, we allocate an RPC request buffer - * for at least a default-sized LOV EA even if we aren't - * sending one. We grow the whole request to the next - * power-of-two size since we get that much from a slab - * allocation anyways. This avoids an allocation below - * in the common case where we need to save a - * default-sized LOV EA for open replay. */ - [DLM_INTENT_REC_OFF+2]= max(lmmsize, - obddev->u.cli.cl_default_mds_easize) }; + 0, 0, 0, 0 }; int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), + [DLM_LOCKREPLY_OFF] = sizeof(*lockrep), [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize, - [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE }; - CFS_LIST_HEAD(cancels); - int count = 0; - int mode; - int rc; - - it->it_create_mode |= S_IFREG; - - rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size); - if (rc & (rc - 1)) - size[DLM_INTENT_REC_OFF + 2] = - min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc, - obddev->u.cli.cl_max_mds_easize); - - /* If inode is known, cancel conflicting OPEN locks. */ - if (data->fid2.id) { - if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) - mode = LCK_CW; + cl_max_mds_easize, 0 }; + int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; + int repbufcnt = 4, rc; + void *eadata; + ENTRY; + + LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); +// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu", +// ldlm_it2str(it->it_op), it_name, it_inode->i_ino); + + if (it->it_op & IT_OPEN) { + CFS_LIST_HEAD(cancels); + int count = 0; + int mode; + + it->it_create_mode |= S_IFREG; + + size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create); + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; + /* As an optimization, we allocate an RPC request buffer for + * at least a default-sized LOV EA even if we aren't sending + * one. We grow the whole request to the next power-of-two + * size since we get that much from a slab allocation anyways. + * This avoids an allocation below in the common case where + * we need to save a default-sized LOV EA for open replay. */ + size[DLM_INTENT_REC_OFF + 2] = max(lmmsize, + obddev->u.cli.cl_default_mds_easize); + rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, + size); + if (rc & (rc - 1)) + size[DLM_INTENT_REC_OFF + 2] = + min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc, + obddev->u.cli.cl_max_mds_easize); + + /* If inode is known, cancel conflicting OPEN locks. */ + if (op_data->fid2.id) { + if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC)) + mode = LCK_CW; #ifdef FMODE_EXEC - else if (it->it_flags & FMODE_EXEC) - mode = LCK_PR; + else if (it->it_flags & FMODE_EXEC) + mode = LCK_PR; #endif - else + else + mode = LCK_CR; + count = mdc_resource_get_unused(exp, &op_data->fid2, + &cancels, mode, + MDS_INODELOCK_OPEN); + } + + /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ + if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) + mode = LCK_EX; + else mode = LCK_CR; - count = mdc_resource_get_unused(exp, &data->fid2, &cancels, - mode, MDS_INODELOCK_OPEN); - } + count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels, + mode, MDS_INODELOCK_UPDATE); + if (it->it_flags & O_JOIN_FILE) { + /* join is like an unlink of the tail */ + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + size[DLM_INTENT_REC_OFF + 3] = + sizeof(struct mds_rec_join); + req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, + count); + mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data, + (*(__u64 *)op_data->data)); + } else { + req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, + count); + } - /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */ - if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE) - mode = LCK_EX; - else - mode = LCK_CR; - count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode, - MDS_INODELOCK_UPDATE); - if (it->it_flags & O_JOIN_FILE) { - __u64 head_size = (*(__u64 *)data->data); - /* join is like an unlink of the tail */ - size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join); - req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count); - mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size); - } else { - req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count); - } + if (!req) + RETURN(-ENOMEM); - if (req) { spin_lock(&req->rq_lock); req->rq_replay = 1; spin_unlock(&req->rq_lock); @@ -302,110 +333,76 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp, lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_open_pack(req, DLM_INTENT_REC_OFF, data, + mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data, it->it_create_mode, 0, it->it_flags, lmm, lmmsize); - ptlrpc_req_set_repsize(req, 5, repsize); - } - return req; -} - -static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp, - struct lookup_intent *it, - struct mdc_op_data *data) -{ - struct ptlrpc_request *req; - struct ldlm_intent *lit; - struct obd_device *obddev = class_exp2obd(exp); - int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), - [DLM_INTENT_IT_OFF] = sizeof(*lit), - [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink), - [DLM_INTENT_REC_OFF+1]= data->namelen + 1 }; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), - [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), - [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize, - [DLM_REPLY_REC_OFF+2] = obddev->u.cli. - cl_max_mds_cookiesize }; + repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; + } else if (it->it_op & IT_UNLINK) { + size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink); + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); + if (!req) + RETURN(-ENOMEM); - req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); - if (req) { /* pack the intent */ lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, sizeof(*lit)); lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data); + mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data); - ptlrpc_req_set_repsize(req, 5, repsize); - } - return req; -} + repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; + } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { + obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | + OBD_MD_FLACL | OBD_MD_FLMODEASIZE | + OBD_MD_FLDIREA; + size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body); + size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1; -static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp, - struct lookup_intent *it, - struct mdc_op_data *data) -{ - struct ptlrpc_request *req; - struct ldlm_intent *lit; - struct obd_device *obddev = class_exp2obd(exp); - int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request), - [DLM_INTENT_IT_OFF] = sizeof(*lit), - [DLM_INTENT_REC_OFF] = sizeof(struct mds_body), - [DLM_INTENT_REC_OFF+1]= data->namelen + 1 }; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), - [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), - [DLM_REPLY_REC_OFF+1] = obddev->u.cli. - cl_max_mds_easize, - [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE }; - obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA; + if (it->it_op & IT_GETATTR) + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + + req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); + if (!req) + RETURN(-ENOMEM); - req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0); - if (req) { /* pack the intent */ lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF, sizeof(*lit)); lit->opc = (__u64)it->it_op; /* pack the intended request */ - mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags, - data); - ptlrpc_req_set_repsize(req, 5, repsize); + mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, + it->it_flags, op_data); + + repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; + } else if (it->it_op == IT_READDIR) { + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); + if (!req) + RETURN(-ENOMEM); + + repbufcnt = 2; + } else { + LBUG(); + RETURN(-EINVAL); } - return req; -} -static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp) -{ - struct ptlrpc_request *req; - int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) }; - int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply) }; - - req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); - if (req) - ptlrpc_req_set_repsize(req, 2, repsize); - return req; -} + /* get ready for the reply */ + ptlrpc_req_set_repsize(req, repbufcnt, repsize); -static int mdc_finish_enqueue(struct obd_export *exp, - struct ptlrpc_request *req, - struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, - struct lustre_handle *lockh, - int rc) -{ - struct ldlm_request *lockreq; - struct ldlm_reply *lockrep; - ENTRY; + /* It is important to obtain rpc_lock first (if applicable), so that + * threads that are serialised with rpc_lock are not polluting our + * rpcs in flight counter */ + mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_enter_request(&obddev->u.cli); + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL, + 0, NULL, lockh, 0); + mdc_exit_request(&obddev->u.cli); + mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); /* Similarly, if we're going to replay this request, we don't want to * actually get a lock, just perform the intent. */ @@ -459,17 +456,16 @@ static int mdc_finish_enqueue(struct obd_export *exp, * It's important that we do this first! Otherwise we might exit the * function without doing so, and try to replay a failed create * (bug 3440) */ - if ((it->it_op & IT_OPEN) && - req->rq_replay && - (!it_disposition(it, DISP_OPEN_OPEN) || - it->d.lustre.it_status != 0)) + if (it->it_op & IT_OPEN && req->rq_replay && + (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0)) mdc_clear_replay_flag(req, it->d.lustre.it_status); DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d", it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status); /* We know what to expect, so we do any byte flipping required here */ - if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { + LASSERT(repbufcnt == 5 || repbufcnt == 2); + if (repbufcnt == 5) { struct mds_body *body; body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body), @@ -488,8 +484,6 @@ static int mdc_finish_enqueue(struct obd_export *exp, mdc_set_open_replay_data(NULL, req); if ((body->valid & OBD_MD_FLEASIZE) != 0) { - void *eadata; - /* The eadata is opaque; just check that it is there. * Eventually, obd_unpackmd() will check the contents */ eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1, @@ -499,17 +493,15 @@ static int mdc_finish_enqueue(struct obd_export *exp, RETURN (-EPROTO); } if (body->valid & OBD_MD_FLMODEASIZE) { - struct obd_device *obddev = class_exp2obd(exp); - if (obddev->u.cli.cl_max_mds_easize < - body->max_mdsize) { + body->max_mdsize) { obddev->u.cli.cl_max_mds_easize = body->max_mdsize; CDEBUG(D_INFO, "maxeasize become %d\n", body->max_mdsize); } if (obddev->u.cli.cl_max_mds_cookiesize < - body->max_cookiesize) { + body->max_cookiesize) { obddev->u.cli.cl_max_mds_cookiesize = body->max_cookiesize; CDEBUG(D_INFO, "cookiesize become %d\n", @@ -522,11 +514,10 @@ static int mdc_finish_enqueue(struct obd_export *exp, * reallocate it here to hold the actual LOV EA. */ if (it->it_op & IT_OPEN) { int offset = DLM_INTENT_REC_OFF + 2; - void *lmm; if (lustre_msg_buflen(req->rq_reqmsg, offset) < body->eadatasize) - mdc_realloc_openmsg(req, body); + mdc_realloc_openmsg(req, body, size); lmm = lustre_msg_buf(req->rq_reqmsg, offset, body->eadatasize); @@ -538,203 +529,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, RETURN(rc); } - -/* We always reserve enough space in the reply packet for a stripe MD, because - * we don't know in advance the file type. */ -int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, - struct lookup_intent *it, struct mdc_op_data *data, - struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags) -{ - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id = - { .name = {data->fid1.id, data->fid1.generation} }; - ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; - int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; - int rc; - ENTRY; - - LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type); - if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - - if (it->it_op & IT_OPEN) { - req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize); - if (it->it_flags & O_JOIN_FILE) { - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - } - } else if (it->it_op & IT_UNLINK) { - req = mdc_intent_unlink_pack(exp, it, data); - } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { - req = mdc_intent_lookup_pack(exp, it, data); - } else if (it->it_op == IT_READDIR) { - req = mdc_intent_readdir_pack(exp); - } else { - CERROR("bad it_op %x\n", it->it_op); - RETURN(-EINVAL); - } - - if (!req) - RETURN(-ENOMEM); - - /* It is important to obtain rpc_lock first (if applicable), so that - * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter */ - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL, - 0, NULL, lockh, 0); - mdc_exit_request(&obddev->u.cli); - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); - - RETURN(rc); -} EXPORT_SYMBOL(mdc_enqueue); -int mdc_revalidate_lock(struct obd_export *exp, - struct lookup_intent *it, - struct ll_fid *fid) -{ - /* We could just return 1 immediately, but since we should only - * be called in revalidate_it if we already have a lock, let's - * verify that. */ - struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}}; - struct lustre_handle lockh; - ldlm_policy_data_t policy; - int mode = LCK_CR; - int rc; - - /* As not all attributes are kept under update lock, e.g. - owner/group/acls are under lookup lock, we need both - ibits for GETATTR. */ - policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? - MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : - MDS_INODELOCK_LOOKUP; - - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED, - &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh); - if (!rc) { - mode = LCK_CW; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, - &policy, LCK_CW, &lockh); - } - if (!rc) { - mode = LCK_PR; - rc = ldlm_lock_match(exp->exp_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS, - &policy, LCK_PR, &lockh); - } - if (rc) { - memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); - it->d.lustre.it_lock_mode = mode; - } - - return rc; -} -EXPORT_SYMBOL(mdc_revalidate_lock); - -static int mdc_finish_intent_lock(struct obd_export *exp, - struct ptlrpc_request *req, - struct mdc_op_data *data, - struct lookup_intent *it, - struct lustre_handle *lockh) -{ - struct mds_body *mds_body; - struct lustre_handle old_lock; - struct ldlm_lock *lock; - int rc; - ENTRY; - - LASSERT(req != NULL); - LASSERT(req != LP_POISON); - LASSERT(req->rq_repmsg != LP_POISON); - - if (!it_disposition(it, DISP_IT_EXECD)) { - /* The server failed before it even started executing the - * intent, i.e. because it couldn't unpack the request. */ - LASSERT(it->d.lustre.it_status != 0); - RETURN(it->d.lustre.it_status); - } - rc = it_open_error(DISP_IT_EXECD, it); - if (rc) - RETURN(rc); - - mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, - sizeof(*mds_body)); - LASSERT(mds_body != NULL); /* mdc_enqueue checked */ - LASSERT_REPSWABBED(req, DLM_REPLY_REC_OFF); /* mdc_enqueue swabbed */ - - /* If we were revalidating a fid/name pair, mark the intent in - * case we fail and get called again from lookup */ - if (data->fid2.id && (it->it_op != IT_GETATTR)) { - it_set_disposition(it, DISP_ENQ_COMPLETE); - /* Also: did we find the same inode? */ - if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) - RETURN(-ESTALE); - } - - rc = it_open_error(DISP_LOOKUP_EXECD, it); - if (rc) - RETURN(rc); - - /* keep requests around for the multiple phases of the call - * this shows the DISP_XX must guarantee we make it into the call - */ - if (!it_disposition(it, DISP_ENQ_CREATE_REF) && - it_disposition(it, DISP_OPEN_CREATE) && - !it_open_error(DISP_OPEN_CREATE, it)) { - it_set_disposition(it, DISP_ENQ_CREATE_REF); - ptlrpc_request_addref(req); /* balanced in ll_create_node */ - } - if (!it_disposition(it, DISP_ENQ_OPEN_REF) && - it_disposition(it, DISP_OPEN_OPEN) && - !it_open_error(DISP_OPEN_OPEN, it)) { - it_set_disposition(it, DISP_ENQ_OPEN_REF); - ptlrpc_request_addref(req); /* balanced in ll_file_open */ - /* BUG 11546 - eviction in the middle of open rpc processing */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); - } - - if (it->it_op & IT_CREAT) { - /* XXX this belongs in ll_create_it */ - } else if (it->it_op == IT_OPEN) { - LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); - } else { - LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); - } - - /* If we already have a matching lock, then cancel the new - * one. We have to set the data here instead of in - * mdc_enqueue, because we need to use the child's inode as - * the l_ast_data to match, and that's not available until - * intent_finish has performed the iget().) */ - lock = ldlm_handle2lock(lockh); - if (lock) { - ldlm_policy_data_t policy = lock->l_policy_data; - - LDLM_DEBUG(lock, "matching against this"); - LDLM_LOCK_PUT(lock); - memcpy(&old_lock, lockh, sizeof(*lockh)); - if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, - LDLM_IBITS, &policy, LCK_NL, &old_lock)) { - ldlm_lock_decref_and_cancel(lockh, - it->d.lustre.it_lock_mode); - memcpy(lockh, &old_lock, sizeof(old_lock)); - memcpy(&it->d.lustre.it_lock_handle, lockh, - sizeof(*lockh)); - } - } - - CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", - data->namelen, data->name, ldlm_it2str(it->it_op), - it->d.lustre.it_status, it->d.lustre.it_disposition, rc); - RETURN(rc); -} - /* * This long block is all about fixing up the lock and request state * so that it is correct as of the moment _before_ the operation was @@ -768,9 +564,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, ldlm_blocking_callback cb_blocking, int extra_lock_flags) { struct lustre_handle lockh; - int rc; + struct ptlrpc_request *request; + int rc = 0; + struct mds_body *mds_body; + struct lustre_handle old_lock; + struct ldlm_lock *lock; ENTRY; - LASSERT(it); CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n", @@ -779,7 +578,43 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, if (op_data->fid2.id && (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) { - rc = mdc_revalidate_lock(exp, it, &op_data->fid2); + /* We could just return 1 immediately, but since we should only + * be called in revalidate_it if we already have a lock, let's + * verify that. */ + struct ldlm_res_id res_id = {.name ={op_data->fid2.id, + op_data->fid2.generation}}; + struct lustre_handle lockh; + ldlm_policy_data_t policy; + int mode = LCK_CR; + + /* As not all attributes are kept under update lock, e.g. + owner/group/acls are under lookup lock, we need both + ibits for GETATTR. */ + policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ? + MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP : + MDS_INODELOCK_LOOKUP; + + rc = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy, LCK_CR, &lockh); + if (!rc) { + mode = LCK_CW; + rc = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy,LCK_CW,&lockh); + } + if (!rc) { + mode = LCK_PR; + rc = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy,LCK_PR,&lockh); + } + if (rc) { + memcpy(&it->d.lustre.it_lock_handle, &lockh, + sizeof(lockh)); + it->d.lustre.it_lock_mode = mode; + } + /* Only return failure if it was not GETATTR by cfid (from inode_revalidate) */ if (rc || op_data->namelen != 0) @@ -811,100 +646,90 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, * lookup, so we clear DISP_ENQ_COMPLETE */ it_clear_disposition(it, DISP_ENQ_COMPLETE); } + request = *reqp = it->d.lustre.it_data; + LASSERT(request != NULL); + LASSERT(request != LP_POISON); + LASSERT(request->rq_repmsg != LP_POISON); - *reqp = it->d.lustre.it_data; - rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh); - - RETURN(rc); -} -EXPORT_SYMBOL(mdc_intent_lock); - -static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req, - void *unused, int rc) -{ - struct mdc_enqueue_args *ma; - struct md_enqueue_info *minfo; - struct ldlm_enqueue_info *einfo; - struct obd_export *exp; - struct lookup_intent *it; - struct lustre_handle *lockh; - struct obd_device *obddev; - int flags = LDLM_FL_HAS_INTENT; - ENTRY; - - ma = (struct mdc_enqueue_args *)&req->rq_async_args; - minfo = ma->ma_mi; - einfo = ma->ma_ei; - - exp = minfo->mi_exp; - it = &minfo->mi_it; - lockh = &minfo->mi_lockh; - - obddev = class_exp2obd(exp); - - mdc_exit_request(&obddev->u.cli); - - rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode, - &flags, NULL, 0, NULL, lockh, rc); - - rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc); + if (!it_disposition(it, DISP_IT_EXECD)) { + /* The server failed before it even started executing the + * intent, i.e. because it couldn't unpack the request. */ + LASSERT(it->d.lustre.it_status != 0); + RETURN(it->d.lustre.it_status); + } + rc = it_open_error(DISP_IT_EXECD, it); if (rc) - GOTO(out, rc); - - memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh)); - - rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh); - GOTO(out, rc); -out: - OBD_FREE_PTR(einfo); - minfo->mi_cb(exp, req, minfo, rc); + RETURN(rc); - return 0; -} + mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF, + sizeof(*mds_body)); + LASSERT(mds_body != NULL); /* mdc_enqueue checked */ + LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */ -int mdc_intent_getattr_async(struct obd_export *exp, - struct md_enqueue_info *minfo, - struct ldlm_enqueue_info *einfo) -{ - struct mdc_op_data *op_data = &minfo->mi_data; - struct lookup_intent *it = &minfo->mi_it; - struct ptlrpc_request *req; - struct obd_device *obddev = class_exp2obd(exp); - struct ldlm_res_id res_id = { - .name = {op_data->fid1.id, - op_data->fid1.generation} - }; - ldlm_policy_data_t policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP } - }; - struct mdc_enqueue_args *aa; - int rc; - int flags = LDLM_FL_HAS_INTENT; - ENTRY; + /* If we were revalidating a fid/name pair, mark the intent in + * case we fail and get called again from lookup */ + if (op_data->fid2.id && (it->it_op != IT_GETATTR)) { + it_set_disposition(it, DISP_ENQ_COMPLETE); + /* Also: did we find the same inode? */ + if (memcmp(&op_data->fid2, &mds_body->fid1, + sizeof(op_data->fid2))) + RETURN (-ESTALE); + } - CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n", - op_data->namelen, op_data->name, op_data->fid1.id, - ldlm_it2str(it->it_op), it->it_flags); + rc = it_open_error(DISP_LOOKUP_EXECD, it); + if (rc) + RETURN(rc); - req = mdc_intent_lookup_pack(exp, it, op_data); - if (!req) - RETURN(-ENOMEM); + /* keep requests around for the multiple phases of the call + * this shows the DISP_XX must guarantee we make it into the call + */ + if (!it_disposition(it, DISP_ENQ_CREATE_REF) && + it_disposition(it, DISP_OPEN_CREATE) && + !it_open_error(DISP_OPEN_CREATE, it)) { + it_set_disposition(it, DISP_ENQ_CREATE_REF); + ptlrpc_request_addref(request); /* balanced in ll_create_node */ + } + if (!it_disposition(it, DISP_ENQ_OPEN_REF) && + it_disposition(it, DISP_OPEN_OPEN) && + !it_open_error(DISP_OPEN_OPEN, it)) { + it_set_disposition(it, DISP_ENQ_OPEN_REF); + ptlrpc_request_addref(request); /* balanced in ll_file_open */ + /* BUG 11546 - eviction in the middle of open rpc processing */ + OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout); + } - mdc_enter_request(&obddev->u.cli); - rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL, - 0, NULL, &minfo->mi_lockh, 1); - if (rc < 0) { - mdc_exit_request(&obddev->u.cli); - RETURN(rc); + if (it->it_op & IT_CREAT) { + /* XXX this belongs in ll_create_it */ + } else if (it->it_op == IT_OPEN) { + LASSERT(!it_disposition(it, DISP_OPEN_CREATE)); + } else { + LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP)); } - CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args)); - aa = (struct mdc_enqueue_args *)&req->rq_async_args; - aa->ma_mi = minfo; - aa->ma_ei = einfo; - req->rq_interpret_reply = mdc_intent_getattr_async_interpret; - ptlrpcd_add_req(req); + /* If we already have a matching lock, then cancel the new + * one. We have to set the data here instead of in + * mdc_enqueue, because we need to use the child's inode as + * the l_ast_data to match, and that's not available until + * intent_finish has performed the iget().) */ + lock = ldlm_handle2lock(&lockh); + if (lock) { + ldlm_policy_data_t policy = lock->l_policy_data; + LDLM_DEBUG(lock, "matching against this"); + LDLM_LOCK_PUT(lock); + memcpy(&old_lock, &lockh, sizeof(lockh)); + if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL, + LDLM_IBITS, &policy, LCK_NL, &old_lock)) { + ldlm_lock_decref_and_cancel(&lockh, + it->d.lustre.it_lock_mode); + memcpy(&lockh, &old_lock, sizeof(old_lock)); + memcpy(&it->d.lustre.it_lock_handle, &lockh, + sizeof(lockh)); + } + } + CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n", + op_data->namelen, op_data->name, ldlm_it2str(it->it_op), + it->d.lustre.it_status, it->d.lustre.it_disposition, rc); - RETURN(0); + RETURN(rc); } -EXPORT_SYMBOL(mdc_intent_getattr_async); +EXPORT_SYMBOL(mdc_intent_lock); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index f73c236af4..0846aca06e 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -108,7 +108,6 @@ static int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, unsigned int acl_size, struct ptlrpc_request *req) { - struct obd_device *obddev = class_exp2obd(exp); struct mds_body *body; void *eadata; int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) }; @@ -129,9 +128,7 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, ptlrpc_req_set_repsize(req, bufcount, size); - mdc_enter_request(&obddev->u.cli); rc = ptlrpc_queue_wait(req); - mdc_exit_request(&obddev->u.cli); if (rc != 0) RETURN (rc); @@ -239,7 +236,6 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid, const char *input, int input_size, int output_size, int flags, struct ptlrpc_request **request) { - struct obd_device *obddev = class_exp2obd(exp); struct ptlrpc_request *req; int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; // int size[3] = {sizeof(struct mds_body)}, bufcnt = 1; @@ -291,15 +287,11 @@ int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid, /* make rpc */ if (opcode == MDS_SETXATTR) mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - else - mdc_enter_request(&obddev->u.cli); rc = ptlrpc_queue_wait(req); if (opcode == MDS_SETXATTR) mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - else - mdc_exit_request(&obddev->u.cli); if (rc != 0) GOTO(err_out, rc); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index d848522437..c97518998b 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -84,7 +84,6 @@ init_test_env $@ cleanup() { echo -n "cln.." - pgrep ll_sa > /dev/null && { echo "There are ll_sa thread not exit!"; exit 20; } cleanupall ${FORCE} $* || { echo "FAILed to clean up"; exit 20; } } CLEANUP=${CLEANUP:-:} @@ -4115,75 +4114,18 @@ test_122() { #bug #11544 } run_test 122 "fail client bulk callback (shouldn't LBUG) =======" -test_123() # statahead(bug 11401) -{ - if [ -z "$(grep "processor.*: 1" /proc/cpuinfo)" ]; then - log "single core CPU, skipping test" # && return - fi - - mkdir -p $DIR/$tdir - - for ((i=1, j=0; i<=10000; j=$i, i=$((i * 10)) )); do - createmany -o $DIR/$tdir/$tfile $j $((i - j)) - - grep '[0-9]' $LPROC/llite/*/statahead_max - cancel_lru_locks mdc - stime=`date +%s` - ls -l $DIR/$tdir > /dev/null - etime=`date +%s` - delta_sa=$((etime - stime)) - echo "ls $i files with statahead: $delta_sa sec" - - for client in $LPROC/llite/*; do - max=`cat $client/statahead_max` - cat $client/statahead_stats - echo 0 > $client/statahead_max - done - - grep '[0-9]' $LPROC/llite/*/statahead_max - cancel_lru_locks mdc - stime=`date +%s` - ls -l $DIR/$tdir > /dev/null - etime=`date +%s` - delta=$((etime - stime)) - echo "ls $i files without statahead: $delta sec" - - for client in /proc/fs/lustre/llite/*; do - cat $client/statahead_stats - echo $max > $client/statahead_max - done - - if [ $delta_sa -gt $delta ]; then - error "ls $i files is slower with statahead!" - fi - done - echo "ls done" - - stime=`date +%s` - rm -r $DIR/$tdir - sync - etime=`date +%s` - delta=$((etime - stime)) - echo "rm -r $DIR/$tdir/: $delta seconds" - echo "rm done" - cat /proc/fs/lustre/llite/*/statahead_stats - # wait for commitment of removal - sleep 2 -} -run_test 123 "verify statahead work" - TMPDIR=$OLDTMPDIR TMP=$OLDTMP HOME=$OLDHOME log "cleanup: ======================================================" if [ "`mount | grep $MOUNT`" ]; then - rm -rf $DIR/[Rdfs][1-9]* + rm -rf $DIR/[Rdfs][1-9]* fi if [ "$I_MOUNTED" = "yes" ]; then - cleanupall -f || error "cleanup failed" + cleanupall -f || error "cleanup failed" else - sysctl -w lnet.debug="$OLDDEBUG" 2> /dev/null || true + sysctl -w lnet.debug="$OLDDEBUG" 2> /dev/null || true fi -- GitLab