From aa7dc66ab24c3d4f0483a2dff5a5727772711115 Mon Sep 17 00:00:00 2001 From: yury <yury> Date: Wed, 3 Sep 2008 08:15:47 +0000 Subject: [PATCH] b=15260 r=umka,tappro - fix flock support for HEAD (original patch from Oleg Drokin aka green) --- lustre/include/lustre_dlm.h | 1 + lustre/include/obd.h | 3 ++- lustre/include/obd_class.h | 3 ++- lustre/ldlm/ldlm_flock.c | 44 +++++++++++++++++++++--------- lustre/ldlm/ldlm_internal.h | 2 ++ lustre/ldlm/ldlm_lock.c | 29 +++++++++++++------- lustre/liblustre/dir.c | 2 +- lustre/liblustre/super.c | 2 +- lustre/llite/dir.c | 2 +- lustre/llite/file.c | 25 ++++++++++------- lustre/lmv/lmv_obd.c | 12 ++++----- lustre/mdc/mdc_internal.h | 2 +- lustre/mdc/mdc_locks.c | 53 ++++++++++++++++++++++++++++--------- 13 files changed, 123 insertions(+), 57 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 17f8fc4a2a..7c65539e03 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -671,6 +671,7 @@ struct ldlm_enqueue_info { void *ei_cb_cp; /* lock completion callback */ void *ei_cb_gl; /* lock glimpse callback */ void *ei_cbdata; /* Data to be passed into callbacks. */ + short ei_async:1; /* async request */ }; extern struct obd_ops ldlm_obd_ops; diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 88c43750a7..65cd938608 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1405,7 +1405,8 @@ struct md_ops { struct md_open_data *); int (*m_enqueue)(struct obd_export *, struct ldlm_enqueue_info *, struct lookup_intent *, struct md_op_data *, - struct lustre_handle *, void *, int, int); + struct lustre_handle *, void *, int, + struct ptlrpc_request **, int); int (*m_getattr)(struct obd_export *, const struct lu_fid *, struct obd_capa *, obd_valid, int, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index c1e464187d..0fae2a8881 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1873,6 +1873,7 @@ static inline int md_enqueue(struct obd_export *exp, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, + struct ptlrpc_request **req, int extra_lock_flags) { int rc; @@ -1880,7 +1881,7 @@ static inline int md_enqueue(struct obd_export *exp, EXP_CHECK_MD_OP(exp, enqueue); EXP_MD_COUNTER_INCREMENT(exp, enqueue); rc = MDP(exp->exp_obd, enqueue)(exp, einfo, it, op_data, lockh, - lmm, lmmsize, extra_lock_flags); + lmm, lmmsize, req, extra_lock_flags); RETURN(rc); } diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 1b2d3ebbbb..0febedc8cd 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -111,7 +111,10 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) if (flags == LDLM_FL_WAIT_NOREPROC) { /* client side - set a flag to prevent sending a CANCEL */ lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; - ldlm_lock_decref_internal(lock, mode); + + /* when reaching here, it is under lock_res_and_lock(). Thus, + need call the nolock version of ldlm_lock_decref_internal*/ + ldlm_lock_decref_internal_nolock(lock, mode); } ldlm_lock_destroy_nolock(lock); @@ -164,6 +167,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, int local = ns_is_client(ns); int added = (mode == LCK_NL); int overlaps = 0; + int splitted = 0; ENTRY; CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64 @@ -182,6 +186,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, req->l_blocking_ast = ldlm_flock_blocking_ast; } +reprocess: if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { /* This loop determines where this processes locks start * in the resource lr_granted list. */ @@ -365,15 +370,22 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* XXX - if ldlm_lock_new() can sleep we should * release the ns_lock, allocate the new lock, * and restart processing this lock. */ - new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, + if (!new2) { + unlock_res_and_lock(req); + new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); - if (!new2) { - ldlm_flock_destroy(req, lock->l_granted_mode, *flags); - *err = -ENOLCK; - RETURN(LDLM_ITER_STOP); + lock_res_and_lock(req); + if (!new2) { + ldlm_flock_destroy(req, lock->l_granted_mode, *flags); + *err = -ENOLCK; + RETURN(LDLM_ITER_STOP); + } + goto reprocess; } + splitted = 1; + new2->l_granted_mode = lock->l_granted_mode; new2->l_policy_data.l_flock.pid = new->l_policy_data.l_flock.pid; @@ -391,8 +403,9 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, &new2->l_export->exp_ldlm_data.led_held_locks); spin_unlock(&new2->l_export->exp_ldlm_data.led_lock); } - if (*flags == LDLM_FL_WAIT_NOREPROC) - ldlm_lock_addref_internal(new2, lock->l_granted_mode); + if (*flags == LDLM_FL_WAIT_NOREPROC) { + ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode); + } /* insert new2 at lock */ ldlm_resource_add_lock(res, ownlocks, new2); @@ -400,6 +413,10 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, break; } + /* if new2 is created but never used, destroy it*/ + if (splitted == 0 && new2 != NULL) + ldlm_lock_destroy_nolock(new2); + /* At this point we're granting the lock request. */ req->l_granted_mode = req->l_req_mode; @@ -428,9 +445,10 @@ restart: ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list); - unlock_res(res); - rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); - lock_res(res); + unlock_res_and_lock(req); + rc = ldlm_run_ast_work(&rpc_list, + LDLM_WORK_BL_AST); + lock_res_and_lock(req); if (rc == -ERESTART) GOTO(restart, -ERESTART); } @@ -550,7 +568,7 @@ granted: OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10); LDLM_DEBUG(lock, "client-side enqueue granted"); ns = lock->l_resource->lr_namespace; - lock_res(lock->l_resource); + lock_res_and_lock(lock); /* before flock's complete ast gets here, the flock * can possibly be freed by another thread @@ -596,7 +614,7 @@ granted: if (flags == 0) cfs_waitq_signal(&lock->l_waitq); } - unlock_res(lock->l_resource); + unlock_res_and_lock(lock); RETURN(0); } EXPORT_SYMBOL(ldlm_flock_completion_ast); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index db172775a6..24cfca34de 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -118,7 +118,9 @@ ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *, ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, void *cookie, int *flags); void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); +void ldlm_lock_addref_internal_nolock(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref_internal(struct ldlm_lock *, __u32 mode); +void ldlm_lock_decref_internal_nolock(struct ldlm_lock *, __u32 mode); void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, struct list_head *work_list); int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 63c5987277..5678333ebd 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -589,15 +589,12 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) unlock_res_and_lock(lock); } -void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) +/* only called in ldlm_flock_destroy and for local locks. + * * for LDLM_FLOCK type locks, l_blocking_ast is null, and + * * ldlm_lock_remove_from_lru() does nothing, it is safe + * * for ldlm_flock_destroy usage by dropping some code */ +void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode) { - struct ldlm_namespace *ns; - ENTRY; - - lock_res_and_lock(lock); - - ns = lock->l_resource->lr_namespace; - LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); if (mode & (LCK_NL | LCK_CR | LCK_PR)) { LASSERT(lock->l_readers > 0); @@ -608,6 +605,20 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) lock->l_writers--; } + LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ +} + +void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) +{ + struct ldlm_namespace *ns; + ENTRY; + + lock_res_and_lock(lock); + + ns = lock->l_resource->lr_namespace; + + ldlm_lock_decref_internal_nolock(lock, mode); + if (lock->l_flags & LDLM_FL_LOCAL && !lock->l_readers && !lock->l_writers) { /* If this is a local lock on a server namespace and this was @@ -650,8 +661,6 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) unlock_res_and_lock(lock); } - LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ - EXIT; } diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index 5e4a737d43..b45bc8925a 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -100,7 +100,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) LUSTRE_OPC_ANY); rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, - &op_data, &lockh, NULL, 0, + &op_data, &lockh, NULL, 0, NULL, LDLM_FL_CANCEL_ON_BLOCK); request = (struct ptlrpc_request *)it.d.lustre.it_data; if (request) diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 9a5e5b7960..4c9aaad05b 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -1792,7 +1792,7 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags, LUSTRE_OPC_ANY); rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data, - &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY); + &lockh, lum, lum_size, NULL, LDLM_FL_INTENT_ONLY); if (rc) GOTO(out, rc); diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 6a9561b4e3..d76a31c43c 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -292,7 +292,7 @@ struct page *ll_get_dir_page(struct inode *dir, __u64 hash, int exact, return (void *)op_data; rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it, - op_data, &lockh, NULL, 0, 0); + op_data, &lockh, NULL, 0, NULL, 0); ll_finish_md_op_data(op_data); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 0b7082c54f..89d77e5d6c 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -2337,7 +2337,7 @@ static int join_file(struct inode *head_inode, struct file *head_filp, RETURN(PTR_ERR(op_data)); rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, - op_data, &lockh, NULL, 0, 0); + op_data, &lockh, NULL, 0, NULL, 0); ll_finish_md_op_data(op_data); if (rc < 0) @@ -2810,13 +2810,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) { struct inode *inode = file->f_dentry->d_inode; struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ldlm_res_id res_id = - { .name = { fid_seq(ll_inode2fid(inode)), - fid_oid(ll_inode2fid(inode)), - fid_ver(ll_inode2fid(inode)), - LDLM_FLOCK} }; - struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL, - ldlm_flock_completion_ast, NULL, file_lock }; + struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK, + .ei_cb_cp =ldlm_flock_completion_ast, + .ei_cbdata = file_lock }; + struct md_op_data *op_data; struct lustre_handle lockh = {0}; ldlm_policy_data_t flock; int flags = 0; @@ -2888,12 +2885,20 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock) LBUG(); } + op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0, + LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, " "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end); - rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id, - &flock, &flags, NULL, 0, NULL, &lockh, 0); + rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL, + op_data, &lockh, &flock, 0, NULL /* req */, flags); + + ll_finish_md_op_data(op_data); + if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0 || file_lock->fl_type == F_UNLCK)) ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW)); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index db0116c2d5..de6470f810 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1506,7 +1506,7 @@ lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo, continue; rc = md_enqueue(tgt_exp, einfo, it, op_data2, - lockh + i, lmm, lmmsize, 0); + lockh + i, lmm, lmmsize, NULL, 0); CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n", PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status); @@ -1588,7 +1588,7 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo, rdata->op_bias = MDS_CROSS_REF; rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh, - lmm, lmmsize, extra_lock_flags); + lmm, lmmsize, NULL, extra_lock_flags); OBD_FREE_PTR(rdata); EXIT; out: @@ -1600,7 +1600,7 @@ static int lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags) + struct ptlrpc_request **req, int extra_lock_flags) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; @@ -1613,7 +1613,7 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, if (rc) RETURN(rc); - if (op_data->op_mea1 && it->it_op == IT_UNLINK) { + if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) { rc = lmv_enqueue_slaves(exp, einfo, it, op_data, lockh, lmm, lmmsize); RETURN(rc); @@ -1645,9 +1645,9 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, PFID(&op_data->op_fid1)); rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh, - lmm, lmmsize, extra_lock_flags); + lmm, lmmsize, req, extra_lock_flags); - if (rc == 0 && it->it_op == IT_OPEN) + if (rc == 0 && it && it->it_op == IT_OPEN) rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh, lmm, lmmsize, extra_lock_flags); RETURN(rc); diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index d6babb53b7..0676f0d227 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -113,7 +113,7 @@ int mdc_intent_lock(struct obd_export *exp, int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags); + struct ptlrpc_request **req, int extra_lock_flags); int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, struct list_head *cancels, ldlm_mode_t mode, diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index ea03789001..1c47ca0c88 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -615,25 +615,39 @@ static int mdc_finish_enqueue(struct obd_export *exp, int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, struct lookup_intent *it, struct md_op_data *op_data, struct lustre_handle *lockh, void *lmm, int lmmsize, - int extra_lock_flags) + struct ptlrpc_request **reqp, int extra_lock_flags) { struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; struct req_capsule *pill; - int flags = extra_lock_flags | LDLM_FL_HAS_INTENT; + int flags = extra_lock_flags; int rc; struct ldlm_res_id res_id; ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; ENTRY; - LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type); + LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", + einfo->ei_type); fid_build_reg_res_name(&op_data->op_fid1, &res_id); - if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) + if (it) + flags |= LDLM_FL_HAS_INTENT; + if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - if (it->it_op & IT_OPEN) { + if (reqp) + req = *reqp; + + if (!it) { + /* The only way right now is FLOCK, in this case we hide flock + policy as lmm, but lmmsize is 0 */ + LASSERT(lmm && lmmsize == 0); + LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", + einfo->ei_type); + policy = *(ldlm_policy_data_t *)lmm; + res_id.name[3] = LDLM_FLOCK; + } else if (it->it_op & IT_OPEN) { int joinfile = !!((it->it_flags & O_JOIN_FILE) && op_data->op_data); @@ -662,13 +676,28 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, /* It is important to obtain rpc_lock first (if applicable), so that * threads that are serialised with rpc_lock are not polluting our - * rpcs in flight counter */ - mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); - mdc_enter_request(&obddev->u.cli); + * rpcs in flight counter. We do not do flock request limiting, though*/ + if (it) { + mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + mdc_enter_request(&obddev->u.cli); + } rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, 0, NULL, lockh, 0); - mdc_exit_request(&obddev->u.cli); - mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + if (reqp) + *reqp = req; + + if (it) { + mdc_exit_request(&obddev->u.cli); + mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it); + } + if (!it) { + /* For flock requests we immediatelly return without further + delay and let caller deal with the rest, since rest of + this function metadata processing makes no sense for flock + requests anyway */ + RETURN(rc); + } + if (rc < 0) { CERROR("ldlm_cli_enqueue: %d\n", rc); mdc_clear_replay_flag(req, rc); @@ -896,7 +925,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, } } rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, - lmm, lmmsize, extra_lock_flags); + lmm, lmmsize, NULL, extra_lock_flags); if (rc < 0) RETURN(rc); it->d.lustre.it_lock_handle = lockh.cookie; -- GitLab