From 5198938653c53c63e00fe9c4291913c1e2df81ea Mon Sep 17 00:00:00 2001 From: zam <zam> Date: Thu, 8 May 2008 05:46:55 +0000 Subject: [PATCH] b=11270 i=vitaly.vertman i=oleg.drokin Lockless i/o code fixes and improvements: (1) lockless truncate checks for OBD_CONNECT_TRUNCLOCK and ll_file_punch sets OBD_FL_TRUNCLOCK correctly. (2) an lproc control for lockless truncate, lproc statistics for lockless truncate. (3) sanityN tests for lockless code --- lustre/include/linux/lustre_lite.h | 1 + lustre/ldlm/ldlm_extent.c | 289 ++++++++++++++--------------- lustre/llite/file.c | 3 +- lustre/llite/llite_internal.h | 3 + lustre/llite/llite_lib.c | 7 +- lustre/llite/lproc_llite.c | 27 ++- lustre/llite/rw.c | 11 +- lustre/tests/sanityN.sh | 79 ++++++++ lustre/tests/test-framework.sh | 30 +++ 9 files changed, 300 insertions(+), 150 deletions(-) diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 7a63f2e8de..c531c059fe 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -50,6 +50,7 @@ enum { LPROC_LL_FSYNC, LPROC_LL_SETATTR, LPROC_LL_TRUNC, + LPROC_LL_LOCKLESS_TRUNC, LPROC_LL_FLOCK, #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index cc0e3aa920..2db142f4f1 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -408,170 +408,169 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, compat = 0; } } - RETURN(compat); - } - - /* for waiting queue */ - list_for_each(tmp, queue) { - check_contention = 1; - - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + } else { + /* for waiting queue */ + list_for_each(tmp, queue) { + check_contention = 1; - if (req == lock) - break; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); - if (unlikely(scan)) { - /* We only get here if we are queuing GROUP lock - and met some incompatible one. The main idea of this - code is to insert GROUP lock past compatible GROUP - lock in the waiting queue or if there is not any, - then in front of first non-GROUP lock */ - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should be no - more GROUP locks later on, queue in front of - first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - compat = 0; - break; - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - compat = 0; + if (req == lock) break; - } - continue; - } - /* locks are compatible, overlap doesn't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) { - if (req_mode == LCK_PR && - ((lock->l_policy_data.l_extent.start <= - req->l_policy_data.l_extent.start) && - (lock->l_policy_data.l_extent.end >= - req->l_policy_data.l_extent.end))) { - /* If we met a PR lock just like us or wider, - and nobody down the list conflicted with - it, that means we can skip processing of - the rest of the list and safely place - ourselves at the end of the list, or grant - (dependent if we met an conflicting locks - before in the list). - In case of 1st enqueue only we continue - traversing if there is something conflicting - down the list because we need to make sure - that something is marked as AST_SENT as well, - in cse of empy worklist we would exit on - first conflict met. */ - /* There IS a case where such flag is - not set for a lock, yet it blocks - something. Luckily for us this is - only during destroy, so lock is - exclusive. So here we are safe */ - if (!(lock->l_flags & LDLM_FL_AST_SENT)) { - RETURN(compat); + if (unlikely(scan)) { + /* We only get here if we are queuing GROUP lock + and met some incompatible one. The main idea of this + code is to insert GROUP lock past compatible GROUP + lock in the waiting queue or if there is not any, + then in front of first non-GROUP lock */ + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should be no + more GROUP locks later on, queue in front of + first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + compat = 0; + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + compat = 0; + break; } + continue; } - /* non-group locks are compatible, overlap doesn't - matter */ - if (likely(req_mode != LCK_GROUP)) - continue; + /* locks are compatible, overlap doesn't matter */ + if (lockmode_compat(lock->l_req_mode, req_mode)) { + if (req_mode == LCK_PR && + ((lock->l_policy_data.l_extent.start <= + req->l_policy_data.l_extent.start) && + (lock->l_policy_data.l_extent.end >= + req->l_policy_data.l_extent.end))) { + /* If we met a PR lock just like us or wider, + and nobody down the list conflicted with + it, that means we can skip processing of + the rest of the list and safely place + ourselves at the end of the list, or grant + (dependent if we met an conflicting locks + before in the list). + In case of 1st enqueue only we continue + traversing if there is something conflicting + down the list because we need to make sure + that something is marked as AST_SENT as well, + in cse of empy worklist we would exit on + first conflict met. */ + /* There IS a case where such flag is + not set for a lock, yet it blocks + something. Luckily for us this is + only during destroy, so lock is + exclusive. So here we are safe */ + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { + RETURN(compat); + } + } - /* If we are trying to get a GROUP lock and there is - another one of this kind, we need to compare gid */ - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* If existing lock with matched gid is granted, - we grant new one too. */ - if (lock->l_req_mode == lock->l_granted_mode) - RETURN(2); + /* non-group locks are compatible, overlap doesn't + matter */ + if (likely(req_mode != LCK_GROUP)) + continue; - /* Otherwise we are scanning queue of waiting - * locks and it means current request would - * block along with existing lock (that is - * already blocked. - * If we are in nonblocking mode - return - * immediately */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; + /* If we are trying to get a GROUP lock and there is + another one of this kind, we need to compare gid */ + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* If existing lock with matched gid is granted, + we grant new one too. */ + if (lock->l_req_mode == lock->l_granted_mode) + RETURN(2); + + /* Otherwise we are scanning queue of waiting + * locks and it means current request would + * block along with existing lock (that is + * already blocked. + * If we are in nonblocking mode - return + * immediately */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } + /* If this group lock is compatible with another + * group lock on the waiting list, they must be + * together in the list, so they can be granted + * at the same time. Otherwise the later lock + * can get stuck behind another, incompatible, + * lock. */ + ldlm_resource_insert_lock_after(lock, req); + /* Because 'lock' is not granted, we can stop + * processing this queue and return immediately. + * There is no need to check the rest of the + * list. */ + RETURN(0); } - /* If this group lock is compatible with another - * group lock on the waiting list, they must be - * together in the list, so they can be granted - * at the same time. Otherwise the later lock - * can get stuck behind another, incompatible, - * lock. */ - ldlm_resource_insert_lock_after(lock, req); - /* Because 'lock' is not granted, we can stop - * processing this queue and return immediately. - * There is no need to check the rest of the - * list. */ - RETURN(0); } - } - if (unlikely(req_mode == LCK_GROUP && - (lock->l_req_mode != lock->l_granted_mode))) { - scan = 1; - compat = 0; - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should - * be no more GROUP locks later on, queue in - * front of first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - break; - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - break; + if (unlikely(req_mode == LCK_GROUP && + (lock->l_req_mode != lock->l_granted_mode))) { + scan = 1; + compat = 0; + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should + * be no more GROUP locks later on, queue in + * front of first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + break; + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + break; + } + continue; } - continue; - } - if (unlikely(lock->l_req_mode == LCK_GROUP)) { - /* If compared lock is GROUP, then requested is PR/PW/ - * so this is not compatible; extent range does not - * matter */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } else { - *flags |= LDLM_FL_NO_TIMEOUT; - } - } else if (lock->l_policy_data.l_extent.end < req_start || - lock->l_policy_data.l_extent.start > req_end) { - /* if a non group lock doesn't overlap skip it */ - continue; - } else if (lock->l_req_extent.end < req_start || - lock->l_req_extent.start > req_end) - /* false contention, the requests doesn't really overlap */ + if (unlikely(lock->l_req_mode == LCK_GROUP)) { + /* If compared lock is GROUP, then requested is PR/PW/ + * so this is not compatible; extent range does not + * matter */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } else { + *flags |= LDLM_FL_NO_TIMEOUT; + } + } else if (lock->l_policy_data.l_extent.end < req_start || + lock->l_policy_data.l_extent.start > req_end) { + /* if a non group lock doesn't overlap skip it */ + continue; + } else if (lock->l_req_extent.end < req_start || + lock->l_req_extent.start > req_end) + /* false contention, the requests doesn't really overlap */ check_contention = 0; - if (!work_list) - RETURN(0); + if (!work_list) + RETURN(0); - /* don't count conflicting glimpse locks */ - if (lock->l_req_mode == LCK_PR && - lock->l_policy_data.l_extent.start == 0 && - lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) - check_contention = 0; + /* don't count conflicting glimpse locks */ + if (lock->l_req_mode == LCK_PR && + lock->l_policy_data.l_extent.start == 0 && + lock->l_policy_data.l_extent.end == OBD_OBJECT_EOF) + check_contention = 0; - *contended_locks += check_contention; + *contended_locks += check_contention; - compat = 0; - if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, work_list); + compat = 0; + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, work_list); + } } if (ldlm_check_contention(req, *contended_locks) && diff --git a/lustre/llite/file.c b/lustre/llite/file.c index ddaa5aba2e..d49116fc25 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1406,7 +1406,8 @@ repeat: ll_inode_size_unlock(inode, 1); retval = ll_glimpse_size(inode, LDLM_FL_BLOCK_GRANTED); if (retval) { - ll_tree_unlock(&tree); + if (tree_locked) + ll_tree_unlock(&tree); goto out; } } else { diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 10fed7c7e0..f98f79e30d 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -231,6 +231,8 @@ enum stats_track_type { /* default value for ll_sb_info->contention_time */ #define SBI_DEFAULT_CONTENTION_SECONDS 60 +/* default value for lockless_truncate_enable */ +#define SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE 1 struct ll_sb_info { struct list_head ll_list; @@ -260,6 +262,7 @@ struct ll_sb_info { struct list_head ll_pglist; /* all pages (llap_pglist_item) */ unsigned ll_contention_time; /* seconds */ + unsigned ll_lockless_truncate_enable; /* true/false */ struct ll_ra_info ll_ra_info; unsigned int ll_namelen; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 54fcd63740..c42df614b7 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -76,6 +76,7 @@ static struct ll_sb_info *ll_init_sbi(void) sbi->ll_ra_info.ra_max_read_ahead_whole_pages = SBI_DEFAULT_READAHEAD_WHOLE_MAX; sbi->ll_contention_time = SBI_DEFAULT_CONTENTION_SECONDS; + sbi->ll_lockless_truncate_enable = SBI_DEFAULT_LOCKLESS_TRUNCATE_ENABLE; INIT_LIST_HEAD(&sbi->ll_conn_chain); INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list); @@ -273,7 +274,8 @@ static int client_common_fill_super(struct super_block *sb, data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_GRANT | OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | - OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT; + OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | + OBD_CONNECT_TRUNCLOCK; if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) { /* OBD_CONNECT_CKSUM should always be set, even if checksums are @@ -1308,7 +1310,8 @@ static int ll_setattr_do_truncate(struct inode *inode, loff_t new_size) UNLOCK_INODE_MUTEX(inode); UP_WRITE_I_ALLOC_SEM(inode); - if (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK) { + if (sbi->ll_lockless_truncate_enable && + (sbi->ll_lco.lco_flags & OBD_CONNECT_TRUNCLOCK)) { ast_flags = LDLM_FL_BLOCK_GRANTED; rc = obd_match(sbi->ll_osc_exp, lsm, LDLM_EXTENT, &policy, LCK_PW, &ast_flags, inode, &lockh); diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 04a7f90610..b5a306bc0d 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -463,6 +463,27 @@ static int ll_wr_contention_time(struct file *file, const char *buffer, count; } +static int ll_rd_lockless_truncate(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + + *eof = 1; + return snprintf(page, count, "%u\n", + ll_s2sbi(sb)->ll_lockless_truncate_enable); +} + +static int ll_wr_lockless_truncate(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return lprocfs_write_helper(buffer, count, + &sbi->ll_lockless_truncate_enable) + ?: count; +} + static int ll_rd_statahead_max(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -534,7 +555,10 @@ static struct lprocfs_vars lprocfs_llite_obd_vars[] = { { "stats_track_pid", ll_rd_track_pid, ll_wr_track_pid, 0 }, { "stats_track_ppid", ll_rd_track_ppid, ll_wr_track_ppid, 0 }, { "stats_track_gid", ll_rd_track_gid, ll_wr_track_gid, 0 }, - { "contention_seconds", ll_rd_contention_time, ll_wr_contention_time, 0}, + { "contention_seconds", ll_rd_contention_time, + ll_wr_contention_time, 0}, + { "lockless_truncate", ll_rd_lockless_truncate, + ll_wr_lockless_truncate, 0}, { "statahead_max", ll_rd_statahead_max, ll_wr_statahead_max, 0 }, { "statahead_stats", ll_rd_statahead_stats, 0, 0 }, { 0 } @@ -576,6 +600,7 @@ struct llite_file_opcode { /* inode operation */ { LPROC_LL_SETATTR, LPROCFS_TYPE_REGS, "setattr" }, { LPROC_LL_TRUNC, LPROCFS_TYPE_REGS, "truncate" }, + { LPROC_LL_LOCKLESS_TRUNC, LPROCFS_TYPE_REGS, "lockless_truncate" }, { LPROC_LL_FLOCK, LPROCFS_TYPE_REGS, "flock" }, #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) { LPROC_LL_GETATTR, LPROCFS_TYPE_REGS, "getattr" }, diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index c48c39e23e..b29cbf83d8 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -120,7 +120,14 @@ int ll_file_punch(struct inode * inode, loff_t new_size, int srvlock) oinfo.oi_oa = &oa; oa.o_id = lli->lli_smd->lsm_object_id; oa.o_valid = OBD_MD_FLID; - oa.o_flags = srvlock ? OBD_FL_TRUNCLOCK : 0; + if (srvlock) { + /* set OBD_MD_FLFLAGS in o_valid, only if we + * set OBD_FL_TRUNCLOCK, otherwise ost_punch + * and filter_setattr get confused, see the comment + * in ost_punch */ + oa.o_flags = OBD_FL_TRUNCLOCK; + oa.o_valid |= OBD_MD_FLFLAGS; + } obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |OBD_MD_FLFID| OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGENER | @@ -212,6 +219,8 @@ void ll_truncate(struct inode *inode) ll_inode_size_unlock(inode, 0); if (!srvlock) ll_file_punch(inode, new_size, 0); + else + ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LOCKLESS_TRUNC, 1); EXIT; return; diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index dbcb0c4e7f..00e3147a03 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -589,6 +589,85 @@ test_31() { } run_test 31 "voluntary cancel / blocking ast race==============" +# enable/disable lockless truncate feature, depending on the arg 0/1 +enable_lockless_truncate() { + lctl set_param -n llite.*.lockless_truncate $1 +} + +test_32a() { # bug 11270 + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params $HOSTNAME llite.*.lockless_truncate > $p + cancel_lru_locks osc + clear_llite_stats + enable_lockless_truncate 1 + dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1 + + log "checking cached lockless truncate" + $TRUNCATE $DIR1/$tfile 8000000 + $CHECKSTAT -s 8000000 $DIR2/$tfile || error "wrong file size" + [ $(calc_llite_stats lockless_truncate) -eq 0 ] || + error "lockless truncate doesn't use cached locks" + + log "checking not cached lockless truncate" + $TRUNCATE $DIR2/$tfile 5000000 + $CHECKSTAT -s 5000000 $DIR1/$tfile || error "wrong file size" + [ $(calc_llite_stats lockless_truncate) -ne 0 ] || + error "not cached trancate isn't lockless" + + log "disabled lockless truncate" + enable_lockless_truncate 0 + clear_llite_stats + $TRUNCATE $DIR2/$tfile 3000000 + $CHECKSTAT -s 3000000 $DIR1/$tfile || error "wrong file size" + [ $(calc_llite_stats lockless_truncate) -eq 0 ] || + error "lockless truncate disabling failed" + rm $DIR1/$tfile + # restore lockless_truncate default values + restore_lustre_params < $p + rm -f $p +} +run_test 32a "lockless truncate" + +test_32b() { # bug 11270 + local node + local p="$TMP/sanityN-$TESTNAME.parameters" + save_lustre_params $HOSTNAME "llite.*.contention_seconds" > $p + for node in $(osts_nodes); do + save_lustre_params $node "ldlm.namespaces.filter-*.max_nolock_bytes" >> $p + save_lustre_params $node "ldlm.namespaces.filter-*.contended_locks" >> $p + save_lustre_params $node "ldlm.namespaces.filter-*.contention_seconds" >> $p + done + clear_llite_stats + # agressive lockless i/o settings + for node in $(osts_nodes); do + do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 2000000; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 0; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 60' + done + lctl set_param -n llite.*.contention_seconds 60 + for i in $(seq 5); do + dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + done + [ $(calc_llite_stats lockless_write_bytes) -ne 0 ] || error "lockless i/o was not triggered" + # disable lockless i/o (it is disabled by default) + for node in $(osts_nodes); do + do_node $node 'lctl set_param -n ldlm.namespaces.filter-*.max_nolock_bytes 0; lctl set_param -n ldlm.namespaces.filter-*.contended_locks 32; lctl set_param -n ldlm.namespaces.filter-*.contention_seconds 0' + done + # set contention_seconds to 0 at client too, otherwise Lustre still + # remembers lock contention + lctl set_param -n llite.*.contention_seconds 0 + clear_llite_stats + for i in $(seq 5); do + dd if=/dev/zero of=$DIR1/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + dd if=/dev/zero of=$DIR2/$tfile bs=4k count=1 conv=notrunc > /dev/null 2>&1 + done + [ $(calc_llite_stats lockless_write_bytes) -eq 0 ] || + error "lockless i/o works when disabled" + rm -f $DIR1/$tfile + restore_lustre_params <$p + rm -f $p +} +run_test 32b "lockless i/o" + log "cleanup: ======================================================" check_and_cleanup_lustre diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index fb824d3aa9..feb98e39fa 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -1438,3 +1438,33 @@ multiop_bg_pause() { return 0 } + +# reset llite stat counters +clear_llite_stats(){ + lctl set_param -n llite.*.stats 0 +} + +# sum llite stat items +calc_llite_stats() { + local res=$(lctl get_param -n llite.*.stats | + awk 'BEGIN {s = 0} END {print s} /^'"$1"'/ {s += $2}') + echo $res +} + +# save_lustre_params(node, parameter_mask) +# generate a stream of formatted strings (<node> <param name>=<param value>) +save_lustre_params() { + local s + do_node $1 "lctl get_param $2" | while read s; do echo "$1 $s"; done +} + +# restore lustre parameters from input stream, produces by save_lustre_params +restore_lustre_params() { + local node + local name + local val + while IFS=" =" read node name val; do + do_node $node "lctl set_param -n $name $val" + done +} + -- GitLab