diff --git a/lustre/ChangeLog b/lustre/ChangeLog index a332517161a7f63139292ae259fc1926191e0dcd..04dee50b6256063571d0e09f8ef852025c57dac7 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1084,6 +1084,13 @@ Details : Change LASSERTs to client eviction (i.e. abort client's recovery) because LASSERT on both the data supplied by a client, and the data on disk is dangerous and incorrect. +Severity : enhancement +Bugzilla : 10718 +Description: Slow trucate/writes to huge files at high offsets. +Details : Directly associate cached pages to lock that protect those pages, + this allows us to quickly find what pages to write and remove + once lock callback is received. + -------------------------------------------------------------------------------- 2007-08-10 Cluster File Systems, Inc. <info@clusterfs.com> diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index 26cad317b7401da510471cfe804192e34828bcaa..b2dc0d9a07ab83e0bb6488c14cbe328fbabf476a 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -16,4 +16,5 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h \ obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h \ md_object.h dt_object.h lustre_param.h lustre_mdt.h \ lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \ - lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h + lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \ + lustre_cache.h diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 5d2941c5dee0534775ad8c5451ac0b3c5cb72921..61a4d132e811fcc6be2fee2daf327ac14822e99d 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -549,6 +549,10 @@ struct ldlm_lock { void *l_lvb_data; /* an LVB received during */ void *l_lvb_swabber; /* an enqueue */ void *l_ast_data; + spinlock_t l_extents_list_lock; + struct list_head l_extents_list; + + struct list_head l_cache_locks_list; /* Server-side-only members */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 188a0226743cf25d91aece336e56c72ff5d6cd91..e69ac779527987802d9aebc9f82fcaadb343c83a 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -255,6 +255,11 @@ struct obd_device_target { struct lustre_quota_ctxt obt_qctxt; }; +typedef void (*obd_pin_extent_cb)(void *data); +typedef int (*obd_page_removal_cb_t)(void *data, int discard); +typedef int (*obd_lock_cancel_cb)(struct ldlm_lock *,struct ldlm_lock_desc *, + void *, int); + /* llog contexts */ enum llog_ctxt_id { LLOG_CONFIG_ORIG_CTXT = 0, @@ -379,6 +384,7 @@ struct filter_obd { struct mdc_rpc_lock; struct obd_import; +struct lustre_cache; struct client_obd { struct semaphore cl_sem; struct obd_uuid cl_target_uuid; @@ -473,6 +479,10 @@ struct client_obd { struct lu_client_seq *cl_seq; atomic_t cl_resends; /* resend count */ + + /* Cache of triples */ + struct lustre_cache *cl_cache; + obd_lock_cancel_cb cl_ext_lock_cancel_cb; }; #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid) @@ -647,6 +657,9 @@ struct lov_obd { __u32 lov_offset_idx; /* aliasing for start_idx */ int lov_start_count;/* reseed counter */ int lov_connects; + obd_page_removal_cb_t lov_page_removal_cb; + obd_pin_extent_cb lov_page_pin_cb; + obd_lock_cancel_cb lov_lock_cancel_cb; }; struct lmv_tgt_desc { @@ -1172,7 +1185,8 @@ struct obd_ops { struct lov_oinfo *loi, cfs_page_t *page, obd_off offset, struct obd_async_page_ops *ops, void *data, - void **res); + void **res, int nocache, + struct lustre_handle *lockh); int (*o_queue_async_io)(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, void *cookie, @@ -1267,6 +1281,17 @@ struct obd_ops { int (*o_quotactl)(struct obd_export *, struct obd_quotactl *); int (*o_ping)(struct obd_export *exp); + + int (*o_register_page_removal_cb)(struct obd_export *exp, + obd_page_removal_cb_t cb, + obd_pin_extent_cb pin_cb); + int (*o_unregister_page_removal_cb)(struct obd_export *exp, + obd_page_removal_cb_t cb); + int (*o_register_lock_cancel_cb)(struct obd_export *exp, + obd_lock_cancel_cb cb); + int (*o_unregister_lock_cancel_cb)(struct obd_export *exp, + obd_lock_cancel_cb cb); + /* * NOTE: If adding ops, add another LPROCFS_OBD_OP_INIT() line * to lprocfs_alloc_obd_stats() in obdclass/lprocfs_status.c. diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index f6ea1348fa44686df50c888ae9e49e7e0bd99d33..c5d7f7980c0ec9a67adf9f783a9eca8715c0acdd 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1193,7 +1193,8 @@ static inline int obd_prep_async_page(struct obd_export *exp, struct lov_oinfo *loi, cfs_page_t *page, obd_off offset, struct obd_async_page_ops *ops, - void *data, void **res) + void *data, void **res, int nocache, + struct lustre_handle *lockh) { int ret; ENTRY; @@ -1202,7 +1203,8 @@ static inline int obd_prep_async_page(struct obd_export *exp, EXP_COUNTER_INCREMENT(exp, prep_async_page); ret = OBP(exp->exp_obd, prep_async_page)(exp, lsm, loi, page, offset, - ops, data, res); + ops, data, res, nocache, + lockh); RETURN(ret); } @@ -1647,6 +1649,59 @@ static inline int obd_register_observer(struct obd_device *obd, RETURN(0); } +static inline int obd_register_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t cb, + obd_pin_extent_cb pin_cb) +{ + int rc; + ENTRY; + + OBD_CHECK_DT_OP(exp->exp_obd, register_page_removal_cb, 0); + OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb); + + rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb); + RETURN(rc); +} + +static inline int obd_unregister_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t cb) +{ + int rc; + ENTRY; + + OBD_CHECK_DT_OP(exp->exp_obd, unregister_page_removal_cb, 0); + OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb); + + rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb); + RETURN(rc); +} + +static inline int obd_register_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + int rc; + ENTRY; + + OBD_CHECK_DT_OP(exp->exp_obd, register_lock_cancel_cb, 0); + OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb); + + rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb); + RETURN(rc); +} + +static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + int rc; + ENTRY; + + OBD_CHECK_DT_OP(exp->exp_obd, unregister_lock_cancel_cb, 0); + OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb); + + rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb); + RETURN(rc); +} + /* metadata helpers */ static inline int md_getstatus(struct obd_export *exp, struct lu_fid *fid, struct obd_capa **pc) diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index 12beb63b5bacb91e175d1453028d6bbe3a1aeb4a..1cbff28e1c9a2058fb62362076c70d338e2c064d 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -34,4 +34,8 @@ struct osc_enqueue_args { struct ldlm_enqueue_info*oa_ei; }; +int osc_extent_blocking_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag); + #endif diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 286197f8af0874bf2e140b67e428cd49e52fb0bf..11ac6caa51ccdc2166a59ff846e8ae9df96b41f0 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -341,6 +341,10 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) CFS_INIT_LIST_HEAD(&lock->l_handle.h_link); class_handle_hash(&lock->l_handle, lock_handle_addref); + CFS_INIT_LIST_HEAD(&lock->l_extents_list); + spin_lock_init(&lock->l_extents_list_lock); + CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list); + RETURN(lock); } diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index d262250b3d4067b7db8fbf964849129b1ecf4ca2..c91f25cfcf7e30a8a17bb72955e397d83df7f64a 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -223,6 +223,9 @@ int llu_iop_write(struct inode *ino, struct ioctx *ioctxp); int llu_iop_iodone(struct ioctx *ioctxp); int llu_local_size(struct inode *inode); int llu_glimpse_size(struct inode *inode); +int llu_extent_lock_cancel_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag); int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, struct lov_stripe_md *lsm, int mode, ldlm_policy_data_t *policy, struct lustre_handle *lockh, diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index 021a5925e08256a301b48ec98ad81a4a2354ccc4..d3e4a9f951d6dfea17ebd9746013074e0eb827e8 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -112,9 +112,9 @@ static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock RETURN(stripe); } -static int llu_extent_lock_callback(struct ldlm_lock *lock, - struct ldlm_lock_desc *new, void *data, - int flag) +int llu_extent_lock_cancel_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag) { struct lustre_handle lockh = { 0 }; int rc; @@ -295,7 +295,7 @@ int llu_glimpse_size(struct inode *inode) einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_cb_bl = llu_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; einfo.ei_cbdata = inode; @@ -345,7 +345,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_cb_bl = llu_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = llu_glimpse_callback; einfo.ei_cbdata = inode; @@ -537,7 +537,9 @@ static int llu_queue_pio(int cmd, struct llu_io_group *group, rc = obd_prep_async_page(exp, lsm, NULL, page, (obd_off)page->index << CFS_PAGE_SHIFT, &llu_async_page_ops, - llap, &llap->llap_cookie); + llap, &llap->llap_cookie, + 1 /* no cache in liblustre at all */, + NULL); if (rc) { LASSERT(rc < 0); llap->llap_cookie = NULL; @@ -609,7 +611,8 @@ struct llu_io_group * get_io_group(struct inode *inode, int maxpages, if (!llap_cookie_size) llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode), NULL, NULL, NULL, 0, - NULL, NULL, NULL); + NULL, NULL, NULL, 0, + NULL); OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages)); if (!group) diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 81dedf88678e780808c4381e13d40213ba0311a2..42d6b4e156c7db56578e915e8b7faaecf7b55cc9 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -95,6 +95,8 @@ static void llu_fsop_gone(struct filesys *fs) ENTRY; list_del(&sbi->ll_conn_chain); + obd_unregister_lock_cancel_cb(sbi->ll_dt_exp, + llu_extent_lock_cancel_cb); obd_disconnect(sbi->ll_dt_exp); obd_disconnect(sbi->ll_md_exp); @@ -2121,12 +2123,19 @@ llu_fsswop_mount(const char *source, sbi->ll_dt_exp = class_conn2export(&dt_conn); sbi->ll_lco.lco_flags = ocd.ocd_connect_flags; + err = obd_register_lock_cancel_cb(sbi->ll_dt_exp, + llu_extent_lock_cancel_cb); + if (err) { + CERROR("cannot register lock cancel callback: rc = %d\n", err); + GOTO(out_dt, err); + } + llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp); err = md_getstatus(sbi->ll_md_exp, &rootfid, NULL); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_dt, err); + GOTO(out_lock_cn_cb, err); } CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid)); sbi->ll_root_fid = rootfid; @@ -2136,7 +2145,7 @@ llu_fsswop_mount(const char *source, OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request); if (err) { CERROR("md_getattr failed for root: rc = %d\n", err); - GOTO(out_dt, err); + GOTO(out_lock_cn_cb, err); } err = md_get_lustre_md(sbi->ll_md_exp, request, @@ -2180,6 +2189,9 @@ out_inode: _sysio_i_gone(root); out_request: ptlrpc_req_finished(request); +out_lock_cn_cb: + obd_unregister_lock_cancel_cb(sbi->ll_dt_exp, + llu_extent_lock_cancel_cb); out_dt: obd_disconnect(sbi->ll_dt_exp); out_md: diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 7d1765aac858d677ef64ba5ab40a30bc9c93bff3..5c384d077a75f482585d302c1e677ffe23928fdf 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -776,165 +776,92 @@ check: RETURN(stripe); } -/* Flush the page cache for an extent as its canceled. When we're on an LOV, - * we get a lock cancellation for each stripe, so we have to map the obd's - * region back onto the stripes in the file that it held. +/* Get extra page reference to ensure it is not going away */ +void ll_pin_extent_cb(void *data) +{ + struct page *page = data; + + page_cache_get(page); + + return; +} + +/* Flush the page from page cache for an extent as its canceled. + * Page to remove is delivered as @data. * - * No one can dirty the extent until we've finished our work and they can + * No one can dirty the extent until we've finished our work and they cannot * enqueue another lock. The DLM protects us from ll_file_read/write here, * but other kernel actors could have pages locked. * + * If @discard is set, there is no need to write the page if it is dirty. + * * Called with the DLM lock held. */ -void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm, - struct ldlm_lock *lock, __u32 stripe) +int ll_page_removal_cb(void *data, int discard) { - ldlm_policy_data_t tmpex; - unsigned long start, end, count, skip, i, j; - struct page *page; - int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA; - struct lustre_handle lockh; - struct address_space *mapping = inode->i_mapping; - + int rc; + struct page *page = data; + struct address_space *mapping; + ENTRY; - tmpex = lock->l_policy_data; - CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n", - inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end, - i_size_read(inode)); - - /* our locks are page granular thanks to osc_enqueue, we invalidate the - * whole page. */ - if ((tmpex.l_extent.start & ~CFS_PAGE_MASK) != 0 || - ((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) != 0) - LDLM_ERROR(lock, "lock not aligned on PAGE_SIZE %lu", - CFS_PAGE_SIZE); - LASSERT((tmpex.l_extent.start & ~CFS_PAGE_MASK) == 0); - LASSERT(((tmpex.l_extent.end + 1) & ~CFS_PAGE_MASK) == 0); - - count = ~0; - skip = 0; - start = tmpex.l_extent.start >> CFS_PAGE_SHIFT; - end = tmpex.l_extent.end >> CFS_PAGE_SHIFT; - if (lsm->lsm_stripe_count > 1) { - count = lsm->lsm_stripe_size >> CFS_PAGE_SHIFT; - skip = (lsm->lsm_stripe_count - 1) * count; - start += start/count * skip + stripe * count; - if (end != ~0) - end += end/count * skip + stripe * count; - } - if (end < tmpex.l_extent.end >> CFS_PAGE_SHIFT) - end = ~0; - - i = i_size_read(inode) ? (__u64)(i_size_read(inode) - 1) >> - CFS_PAGE_SHIFT : 0; - if (i < end) - end = i; - - CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu " - "count: %lu skip: %lu end: %lu%s\n", start, start % count, - count, skip, end, discard ? " (DISCARDING)" : ""); - - /* walk through the vmas on the inode and tear down mmaped pages that - * intersect with the lock. this stops immediately if there are no - * mmap()ed regions of the file. This is not efficient at all and - * should be short lived. We'll associate mmap()ed pages with the lock - * and will be able to find them directly */ - for (i = start; i <= end; i += (j + skip)) { - j = min(count - (i % count), end - i + 1); - LASSERT(j > 0); - LASSERT(mapping); - if (ll_teardown_mmaps(mapping, - (__u64)i << CFS_PAGE_SHIFT, - ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) ) - break; - } - - /* this is the simplistic implementation of page eviction at - * cancelation. It is careful to get races with other page - * lockers handled correctly. fixes from bug 20 will make it - * more efficient by associating locks with pages and with - * batching writeback under the lock explicitly. */ - for (i = start, j = start % count; i <= end; - j++, i++, tmpex.l_extent.start += CFS_PAGE_SIZE) { - if (j == count) { - CDEBUG(D_PAGE, "skip index %lu to %lu\n", i, i + skip); - i += skip; - j = 0; - if (i > end) - break; - } - LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end, - LPU64" >= "LPU64" start %lu i %lu end %lu\n", - tmpex.l_extent.start, lock->l_policy_data.l_extent.end, - start, i, end); - if (!mapping_has_pages(mapping)) { - CDEBUG(D_INODE|D_PAGE, "nothing left\n"); - break; - } + /* We have page reference already from ll_pin_page */ + lock_page(page); - cond_resched(); - - page = find_lock_page(mapping, i); - if (page == NULL) - continue; - LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n", - i, tmpex.l_extent.start); - if (!discard && PageWriteback(page)) - wait_on_page_writeback(page); - - /* page->mapping to check with racing against teardown */ - if (!discard && clear_page_dirty_for_io(page)) { - rc = ll_call_writepage(inode, page); - /* either waiting for io to complete or reacquiring - * the lock that the failed writepage released */ - lock_page(page); - wait_on_page_writeback(page); - if (rc < 0) { - CERROR("writepage inode %lu(%p) of page %p " - "failed: %d\n", inode->i_ino, inode, - page, rc); - if (rc == -ENOSPC) - set_bit(AS_ENOSPC, &mapping->flags); - else - set_bit(AS_EIO, &mapping->flags); - } - } - - tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1; - /* check to see if another DLM lock covers this page b=2765 */ - rc2 = ldlm_lock_match(lock->l_resource->lr_namespace, - LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING | - LDLM_FL_TEST_LOCK, - &lock->l_resource->lr_name, LDLM_EXTENT, - &tmpex, LCK_PR | LCK_PW, &lockh); - - if (rc2 <= 0 && page->mapping != NULL) { - struct ll_async_page *llap = llap_cast_private(page); - /* checking again to account for writeback's - * lock_page() */ - LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n"); - if (llap) - ll_ra_accounting(llap, mapping); - ll_truncate_complete_page(page); + /* Already truncated by somebody */ + if (!page->mapping) + GOTO(out, rc = 0); + mapping = page->mapping; + + ll_teardown_mmaps(mapping, + (__u64)page->index << PAGE_CACHE_SHIFT, + ((__u64)page->index<<PAGE_CACHE_SHIFT)| + ~PAGE_CACHE_MASK); + LL_CDEBUG_PAGE(D_PAGE, page, "removing page\n"); + + if (!discard && clear_page_dirty_for_io(page)) { + LASSERT(page->mapping); + rc = ll_call_writepage(page->mapping->host, page); + /* either waiting for io to complete or reacquiring + * the lock that the failed writepage released */ + lock_page(page); + wait_on_page_writeback(page); + if (rc != 0) { + CERROR("writepage inode %lu(%p) of page %p " + "failed: %d\n", mapping->host->i_ino, + mapping->host, page, rc); + if (rc == -ENOSPC) + set_bit(AS_ENOSPC, &mapping->flags); + else + set_bit(AS_EIO, &mapping->flags); } - unlock_page(page); - page_cache_release(page); - } - LASSERTF(tmpex.l_extent.start <= - (lock->l_policy_data.l_extent.end == ~0ULL ? ~0ULL : - lock->l_policy_data.l_extent.end + 1), - "loop too long "LPU64" > "LPU64" start %lu i %lu end %lu\n", - tmpex.l_extent.start, lock->l_policy_data.l_extent.end, - start, i, end); + set_bit(AS_EIO, &mapping->flags); + } + if (page->mapping != NULL) { + struct ll_async_page *llap = llap_cast_private(page); + /* checking again to account for writeback's lock_page() */ + LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n"); + if (llap) + ll_ra_accounting(llap, page->mapping); + ll_truncate_complete_page(page); + } EXIT; +out: + LASSERT(!PageWriteback(page)); + unlock_page(page); + page_cache_release(page); + + return 0; } -static int ll_extent_lock_callback(struct ldlm_lock *lock, - struct ldlm_lock_desc *new, void *data, - int flag) +int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new, + void *data, int flag) { - struct lustre_handle lockh = { 0 }; - int rc; + struct inode *inode; + struct ll_inode_info *lli; + struct lov_stripe_md *lsm; + int stripe; + __u64 kms; + ENTRY; if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) { @@ -942,60 +869,37 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock, LBUG(); } - switch (flag) { - case LDLM_CB_BLOCKING: - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel failed: %d\n", rc); - break; - case LDLM_CB_CANCELING: { - struct inode *inode; - struct ll_inode_info *lli; - struct lov_stripe_md *lsm; - int stripe; - __u64 kms; - - /* This lock wasn't granted, don't try to evict pages */ - if (lock->l_req_mode != lock->l_granted_mode) - RETURN(0); - - inode = ll_inode_from_lock(lock); - if (inode == NULL) - RETURN(0); - lli = ll_i2info(inode); - if (lli == NULL) - goto iput; - if (lli->lli_smd == NULL) - goto iput; - lsm = lli->lli_smd; - - stripe = ll_lock_to_stripe_offset(inode, lock); - if (stripe < 0) - goto iput; - - ll_pgcache_remove_extent(inode, lsm, lock, stripe); + inode = ll_inode_from_lock(lock); + if (inode == NULL) + RETURN(0); + lli = ll_i2info(inode); + if (lli == NULL) + GOTO(iput, 0); + if (lli->lli_smd == NULL) + GOTO(iput, 0); + lsm = lli->lli_smd; - lov_stripe_lock(lsm); - lock_res_and_lock(lock); - kms = ldlm_extent_shift_kms(lock, - lsm->lsm_oinfo[stripe]->loi_kms); + stripe = ll_lock_to_stripe_offset(inode, lock); + if (stripe < 0) + GOTO(iput, 0); - if (lsm->lsm_oinfo[stripe]->loi_kms != kms) - LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, - lsm->lsm_oinfo[stripe]->loi_kms, kms); - lsm->lsm_oinfo[stripe]->loi_kms = kms; - unlock_res_and_lock(lock); - lov_stripe_unlock(lsm); - iput: - iput(inode); - break; - } - default: - LBUG(); - } + lov_stripe_lock(lsm); + lock_res_and_lock(lock); + kms = ldlm_extent_shift_kms(lock, + lsm->lsm_oinfo[stripe]->loi_kms); + + if (lsm->lsm_oinfo[stripe]->loi_kms != kms) + LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64, + lsm->lsm_oinfo[stripe]->loi_kms, kms); + lsm->lsm_oinfo[stripe]->loi_kms = kms; + unlock_res_and_lock(lock); + lov_stripe_unlock(lsm); + ll_queue_done_writing(inode, 0); + EXIT; +iput: + iput(inode); - RETURN(0); + return 0; } #if 0 @@ -1170,7 +1074,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_cb_bl = ll_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; einfo.ei_cbdata = NULL; @@ -1233,7 +1137,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags) * acquired only if there were no conflicting locks. */ einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = LCK_PR; - einfo.ei_cb_bl = ll_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; einfo.ei_cbdata = inode; @@ -1288,7 +1192,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode, einfo.ei_type = LDLM_EXTENT; einfo.ei_mode = mode; - einfo.ei_cb_bl = ll_extent_lock_callback; + einfo.ei_cb_bl = osc_extent_blocking_cb; einfo.ei_cb_cp = ldlm_completion_ast; einfo.ei_cb_gl = ll_glimpse_callback; einfo.ei_cbdata = inode; diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 3b73342f174b7f71d995ebbd8db22094247bb742..e4871e84bb67782892bcb4b53ab1784accc3a178 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -499,6 +499,7 @@ struct ll_async_page { llap_origin:3, llap_ra_used:1, llap_ignore_quota:1, + llap_nocache:1, llap_lockless_io_page:1; void *llap_cookie; struct page *llap_page; @@ -655,6 +656,10 @@ int ll_dir_setstripe(struct inode *inode, struct lov_user_md *lump, int set_default); int ll_dir_getstripe(struct inode *inode, struct lov_mds_md **lmm, int *lmm_size, struct ptlrpc_request **request); +void ll_pin_extent_cb(void *data); +int ll_page_removal_cb(void *data, int discard); +int ll_extent_lock_cancel_cb(struct ldlm_lock *lock, struct ldlm_lock_desc *new, + void *data, int flag); /* llite/dcache.c */ extern struct dentry_operations ll_init_d_ops; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 0c32ba0426f320005d2d1c30f7b6cab47ddb32b9..04eee3efd91e07874b8b1dc91509f2c89baeaa11 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -36,6 +36,7 @@ #include <lustre_param.h> #include <lustre_log.h> #include <obd_cksum.h> +#include <lustre_cache.h> #include "llite_internal.h" cfs_mem_cache_t *ll_file_data_slab; @@ -424,15 +425,33 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) sbi->ll_lco.lco_flags = data->ocd_connect_flags; spin_unlock(&sbi->ll_lco.lco_lock); - ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp); + err = obd_register_page_removal_cb(sbi->ll_dt_exp, + ll_page_removal_cb, + ll_pin_extent_cb); + if (err) { + CERROR("cannot register page removal callback: rc = %d\n",err); + GOTO(out_dt, err); + } + err = obd_register_lock_cancel_cb(sbi->ll_dt_exp, + ll_extent_lock_cancel_cb); + if (err) { + CERROR("cannot register lock cancel callback: rc = %d\n", err); + GOTO(out_page_rm_cb, err); + } + + err = ll_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);; + if (err) { + CERROR("cannot set max EA and cookie sizes: rc = %d\n", err); + GOTO(out_lock_cn_cb, err); + } err = obd_prep_async_page(sbi->ll_dt_exp, NULL, NULL, NULL, - 0, NULL, NULL, NULL); + 0, NULL, NULL, NULL, 0, NULL); if (err < 0) { LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this " "filesystem. There must be at least one " "active OST for a client to start.\n"); - GOTO(out_dt_fid, err); + GOTO(out_lock_cn_cb, err); } if (!ll_async_page_slab) { @@ -442,13 +461,13 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) ll_async_page_slab_size, 0, 0); if (!ll_async_page_slab) - GOTO(out_dt_fid, err = -ENOMEM); + GOTO(out_lock_cn_cb, err = -ENOMEM); } err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); - GOTO(out_dt_fid, err); + GOTO(out_lock_cn_cb, err); } CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid)); sbi->ll_root_fid = rootfid; @@ -469,7 +488,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) free_capa(oc); if (err) { CERROR("md_getattr failed for root: rc = %d\n", err); - GOTO(out_dt_fid, err); + GOTO(out_lock_cn_cb, err); } memset(&lmd, 0, sizeof(lmd)); err = md_get_lustre_md(sbi->ll_md_exp, request, sbi->ll_dt_exp, @@ -477,7 +496,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) if (err) { CERROR("failed to understand root inode md: rc = %d\n", err); ptlrpc_req_finished (request); - GOTO(out_dt_fid, err); + GOTO(out_lock_cn_cb, err); } LASSERT(fid_is_sane(&sbi->ll_root_fid)); @@ -523,7 +542,12 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) out_root: if (root) iput(root); -out_dt_fid: +out_lock_cn_cb: + obd_unregister_lock_cancel_cb(sbi->ll_dt_exp, + ll_extent_lock_cancel_cb); +out_page_rm_cb: + obd_unregister_page_removal_cb(sbi->ll_dt_exp, + ll_page_removal_cb); obd_fid_fini(sbi->ll_dt_exp); out_dt: obd_disconnect(sbi->ll_dt_exp); @@ -715,6 +739,10 @@ void client_common_put_super(struct super_block *sb) list_del(&sbi->ll_conn_chain); + obd_unregister_page_removal_cb(sbi->ll_dt_exp, + ll_page_removal_cb); + obd_unregister_lock_cancel_cb(sbi->ll_dt_exp,ll_extent_lock_cancel_cb); + obd_fid_fini(sbi->ll_dt_exp); obd_disconnect(sbi->ll_dt_exp); sbi->ll_dt_exp = NULL; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 94ed74993ef570b7c16bd1a47b01baed9005ca03..3ac6fc3049aee339debd350ffa2b2054bb304248 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -593,7 +593,9 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction) return count; } -struct ll_async_page *llap_from_page(struct page *page, unsigned origin) +static struct ll_async_page *llap_from_page_with_lockh(struct page *page, + unsigned origin, + struct lustre_handle *lockh) { struct ll_async_page *llap; struct obd_export *exp; @@ -646,9 +648,14 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin) llap->llap_magic = LLAP_MAGIC; llap->llap_cookie = (void *)llap + size_round(sizeof(*llap)); + /* XXX: for bug 11270 - check for lockless origin here! */ + if (origin == LLAP_ORIGIN_LOCKLESS_IO) + llap->llap_nocache = 1; + rc = obd_prep_async_page(exp, ll_i2info(inode)->lli_smd, NULL, page, (obd_off)page->index << CFS_PAGE_SHIFT, - &ll_async_page_ops, llap, &llap->llap_cookie); + &ll_async_page_ops, llap, &llap->llap_cookie, + llap->llap_nocache, lockh); if (rc) { OBD_SLAB_FREE(llap, ll_async_page_slab, ll_async_page_slab_size); @@ -698,6 +705,12 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin) RETURN(llap); } +struct ll_async_page *llap_from_page(struct page *page, + unsigned origin) +{ + return llap_from_page_with_lockh(page, origin, NULL); +} + static int queue_or_sync_write(struct obd_export *exp, struct inode *inode, struct ll_async_page *llap, unsigned to, obd_flag async_flags) @@ -799,12 +812,14 @@ out: int ll_commit_write(struct file *file, struct page *page, unsigned from, unsigned to) { + struct ll_file_data *fd = LUSTRE_FPRIVATE(file); struct inode *inode = page->mapping->host; struct ll_inode_info *lli = ll_i2info(inode); struct lov_stripe_md *lsm = lli->lli_smd; struct obd_export *exp; struct ll_async_page *llap; loff_t size; + struct lustre_handle *lockh = NULL; int rc = 0; ENTRY; @@ -815,7 +830,10 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from, CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n", inode, page, from, to, page->index); - llap = llap_from_page(page, LLAP_ORIGIN_COMMIT_WRITE); + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) + lockh = &fd->fd_cwlockh; + + llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_COMMIT_WRITE, lockh); if (IS_ERR(llap)) RETURN(PTR_ERR(llap)); @@ -1012,6 +1030,7 @@ static void __ll_put_llap(struct page *page) * here. */ void ll_removepage(struct page *page) { + struct ll_async_page *llap = llap_cast_private(page); ENTRY; LASSERT(!in_interrupt()); @@ -1023,36 +1042,13 @@ void ll_removepage(struct page *page) return; } - LASSERT(!llap_cast_private(page)->llap_lockless_io_page); + LASSERT(!llap->llap_lockless_io_page); + LASSERT(!llap->llap_nocache); LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n"); __ll_put_llap(page); EXIT; } -static int ll_page_matches(struct page *page, int fd_flags) -{ - struct lustre_handle match_lockh = {0}; - struct inode *inode = page->mapping->host; - ldlm_policy_data_t page_extent; - int flags, matches; - ENTRY; - - if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED)) - RETURN(1); - - page_extent.l_extent.start = (__u64)page->index << CFS_PAGE_SHIFT; - page_extent.l_extent.end = - page_extent.l_extent.start + CFS_PAGE_SIZE - 1; - flags = LDLM_FL_TEST_LOCK | LDLM_FL_BLOCK_GRANTED; - if (!(fd_flags & LL_FILE_READAHEAD)) - flags |= LDLM_FL_CBPENDING; - matches = obd_match(ll_i2sbi(inode)->ll_dt_exp, - ll_i2info(inode)->lli_smd, LDLM_EXTENT, - &page_extent, LCK_PR | LCK_PW, &flags, inode, - &match_lockh); - RETURN(matches); -} - static int ll_issue_page_read(struct obd_export *exp, struct ll_async_page *llap, struct obd_io_group *oig, int defer) @@ -1769,6 +1765,7 @@ int ll_writepage(struct page *page) if (IS_ERR(llap)) GOTO(out, rc = PTR_ERR(llap)); + LASSERT(!llap->llap_nocache); LASSERT(!PageWriteback(page)); set_page_writeback(page); @@ -1816,6 +1813,7 @@ int ll_readpage(struct file *filp, struct page *page) struct obd_export *exp; struct ll_async_page *llap; struct obd_io_group *oig = NULL; + struct lustre_handle *lockh = NULL; int rc; ENTRY; @@ -1847,9 +1845,19 @@ int ll_readpage(struct file *filp, struct page *page) if (exp == NULL) GOTO(out, rc = -EINVAL); - llap = llap_from_page(page, LLAP_ORIGIN_READPAGE); - if (IS_ERR(llap)) + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) + lockh = &fd->fd_cwlockh; + + llap = llap_from_page_with_lockh(page, LLAP_ORIGIN_READPAGE, lockh); + if (IS_ERR(llap)) { + if (PTR_ERR(llap) == -ENOLCK) { + CWARN("ino %lu page %lu (%llu) not covered by " + "a lock (mmap?). check debug logs.\n", + inode->i_ino, page->index, + (long long)page->index << PAGE_CACHE_SHIFT); + } GOTO(out, rc = PTR_ERR(llap)); + } if (ll_i2sbi(inode)->ll_ra_info.ra_max_pages) ras_update(ll_i2sbi(inode), inode, &fd->fd_ras, page->index, @@ -1870,22 +1878,6 @@ int ll_readpage(struct file *filp, struct page *page) GOTO(out_oig, rc = 0); } - if (likely((fd->fd_flags & LL_FILE_IGNORE_LOCK) == 0)) { - rc = ll_page_matches(page, fd->fd_flags); - if (rc < 0) { - LL_CDEBUG_PAGE(D_ERROR, page, - "lock match failed: rc %d\n", rc); - GOTO(out, rc); - } - - if (rc == 0) { - CWARN("ino %lu page %lu (%llu) not covered by " - "a lock (mmap?). check debug logs.\n", - inode->i_ino, page->index, - (long long)page->index << CFS_PAGE_SHIFT); - } - } - rc = ll_issue_page_read(exp, llap, oig, 0); if (rc) GOTO(out, rc); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 6e6844e1969b798b4212d120ad5109fa96b0e394..161800ecf4668f07b7f6ecb7a9b0e5e8e3d849f8 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -48,6 +48,7 @@ #include <obd_ost.h> #include <lprocfs_status.h> #include <lustre_param.h> +#include <lustre_cache.h> #include "lov_internal.h" @@ -87,6 +88,94 @@ void lov_putref(struct obd_device *obd) mutex_up(&lov->lov_lock); } +static int lov_register_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func, + obd_pin_extent_cb pin_cb) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + int i, rc = 0; + + if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func) + return -EBUSY; + + if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb) + return -EBUSY; + + for (i = 0; i < lov->desc.ld_tgt_count; i++) { + if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp) + continue; + rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp, + func, pin_cb); + } + + lov->lov_page_removal_cb = func; + lov->lov_page_pin_cb = pin_cb; + + return rc; +} + +static int lov_unregister_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + int i, rc = 0; + + if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func) + return -EINVAL; + + lov->lov_page_removal_cb = NULL; + lov->lov_page_pin_cb = NULL; + + for (i = 0; i < lov->desc.ld_tgt_count; i++) { + if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp) + continue; + rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp, + func); + } + + return rc; +} + +static int lov_register_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb func) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + int i, rc = 0; + + if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func) + return -EBUSY; + + for (i = 0; i < lov->desc.ld_tgt_count; i++) { + if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp) + continue; + rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp, + func); + } + + lov->lov_lock_cancel_cb = func; + + return rc; +} + +static int lov_unregister_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb func) +{ + struct lov_obd *lov = &exp->exp_obd->u.lov; + int i, rc = 0; + + if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func) + return -EINVAL; + + for (i = 0; i < lov->desc.ld_tgt_count; i++) { + if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp) + continue; + rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp, + func); + } + lov->lov_lock_cancel_cb = NULL; + return rc; +} + #define MAX_STRING_SIZE 128 static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, struct obd_connect_data *data) @@ -160,10 +249,33 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, RETURN(-ENODEV); } + rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_page_removal_cb, + lov->lov_page_pin_cb); + if (rc) { + obd_disconnect(lov->lov_tgts[index]->ltd_exp); + lov->lov_tgts[index]->ltd_exp = NULL; + RETURN(rc); + } + + rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_lock_cancel_cb); + if (rc) { + obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_page_removal_cb); + obd_disconnect(lov->lov_tgts[index]->ltd_exp); + lov->lov_tgts[index]->ltd_exp = NULL; + RETURN(rc); + } + rc = obd_register_observer(tgt_obd, obd); if (rc) { CERROR("Target %s register_observer error %d\n", obd_uuid2str(&tgt_uuid), rc); + obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_lock_cancel_cb); + obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_page_removal_cb); obd_disconnect(lov->lov_tgts[index]->ltd_exp); lov->lov_tgts[index]->ltd_exp = NULL; RETURN(rc); @@ -268,6 +380,10 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index) CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", obd->obd_name, osc_obd->obd_name); + obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_lock_cancel_cb); + obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp, + lov->lov_page_removal_cb); if (lov->lov_tgts[index]->ltd_active) { lov->lov_tgts[index]->ltd_active = 0; @@ -1684,10 +1800,12 @@ static struct obd_async_page_ops lov_async_page_ops = { int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, cfs_page_t *page, obd_off offset, struct obd_async_page_ops *ops, - void *data, void **res) + void *data, void **res, int nocache, + struct lustre_handle *lockh) { struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_async_page *lap; + struct lov_lock_handles *lov_lockh = NULL; int rc = 0; ENTRY; @@ -1704,7 +1822,8 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, } rc = size_round(sizeof(*lap)) + obd_prep_async_page(lov->lov_tgts[i]->ltd_exp, NULL, - NULL, NULL, 0, NULL, NULL, NULL); + NULL, NULL, 0, NULL, NULL, NULL, 0, + NULL); RETURN(rc); } ASSERT_LSM_MAGIC(lsm); @@ -1727,10 +1846,19 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, lap->lap_sub_cookie = (void *)lap + size_round(sizeof(*lap)); + if (lockh) { + lov_lockh = lov_handle2llh(lockh); + if (lov_lockh) { + lockh = lov_lockh->llh_handles + lap->lap_stripe; + } + } + rc = obd_prep_async_page(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp, lsm, loi, page, lap->lap_sub_offset, &lov_async_page_ops, lap, - &lap->lap_sub_cookie); + &lap->lap_sub_cookie, nocache, lockh); + if (lov_lockh) + lov_llh_put(lov_lockh); if (rc) RETURN(rc); CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page, @@ -2752,6 +2880,10 @@ struct obd_ops lov_obd_ops = { .o_llog_init = lov_llog_init, .o_llog_finish = lov_llog_finish, .o_notify = lov_notify, + .o_register_page_removal_cb = lov_register_page_removal_cb, + .o_unregister_page_removal_cb = lov_unregister_page_removal_cb, + .o_register_lock_cancel_cb = lov_register_lock_cancel_cb, + .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb, }; static quota_interface_t *quota_interface; diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index f57b55c4c515ea7068b589e850e79da4fd0ac6ac..01e06544025c50eee26de49f4d7d0ceb35e697e0 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -1102,6 +1102,10 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats) LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotacheck); LPROCFS_OBD_OP_INIT(num_private_stats, stats, quotactl); LPROCFS_OBD_OP_INIT(num_private_stats, stats, ping); + LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_page_removal_cb); + LPROCFS_OBD_OP_INIT(num_private_stats,stats,unregister_page_removal_cb); + LPROCFS_OBD_OP_INIT(num_private_stats, stats, register_lock_cancel_cb); + LPROCFS_OBD_OP_INIT(num_private_stats, stats,unregister_lock_cancel_cb); } int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats) diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index f0c6fb4beead6370f621f8cdd7e0f186e7645727..e24fe8a06aa782e7673ba75893e06a73fb0012a0 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -785,7 +785,7 @@ static int echo_client_async_page(struct obd_export *exp, int rw, rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, eap->eap_off, &ec_async_page_ops, - eap, &eap->eap_cookie); + eap, &eap->eap_cookie, 1, NULL); if (rc) { spin_lock(&eas.eas_lock); eas.eas_rc = rc; diff --git a/lustre/osc/Makefile.in b/lustre/osc/Makefile.in index ce9107fc6409b56bc58b2cc40f52eb7a1b60ab60..2eb2eea34dac9a74e521c1d3903e3c67662d4763 100644 --- a/lustre/osc/Makefile.in +++ b/lustre/osc/Makefile.in @@ -1,4 +1,4 @@ MODULES := osc -osc-objs := osc_request.o lproc_osc.o osc_create.o +osc-objs := osc_request.o lproc_osc.o osc_create.o cache.o @INCLUDE_RULES@ diff --git a/lustre/osc/autoMakefile.am b/lustre/osc/autoMakefile.am index 2b00785c30c5fe4e29f3fdf258bff06dfc6414e3..985e4739d1e302c383731c3a751353492f1c3ddd 100644 --- a/lustre/osc/autoMakefile.am +++ b/lustre/osc/autoMakefile.am @@ -5,7 +5,7 @@ if LIBLUSTRE noinst_LIBRARIES = libosc.a -libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h +libosc_a_SOURCES = osc_request.c osc_create.c osc_internal.h cache.c libosc_a_CPPFLAGS = $(LLCPPFLAGS) libosc_a_CFLAGS = $(LLCFLAGS) endif diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 89f341cb4de43cd11c82e2cde1ae1d48b5e053f2..9b6c19b602dc4a7270615dc6803a595604be8053 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -30,6 +30,9 @@ struct osc_async_page { struct obd_async_page_ops *oap_caller_ops; void *oap_caller_data; + struct list_head oap_page_list; + struct ldlm_lock *oap_ldlm_lock; + spinlock_t oap_lock; }; #define oap_page oap_brw_page.pg diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 3a8b7abd44a6cdca1452a5722f73609566654d5a..679e78fae3feac3ad7c51414d48f695aeb3b12cf 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -57,12 +57,14 @@ #include <lustre_log.h> #include <lustre_debug.h> #include <lustre_param.h> +#include <lustre_cache.h> #include "osc_internal.h" static quota_interface_t *quota_interface = NULL; extern quota_interface_t osc_quota_interface; static void osc_release_ppga(struct brw_page **ppga, obd_count count); +int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, @@ -2560,9 +2562,12 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, cfs_page_t *page, obd_off offset, struct obd_async_page_ops *ops, - void *data, void **res) + void *data, void **res, int nocache, + struct lustre_handle *lockh) { struct osc_async_page *oap; + struct ldlm_res_id oid = {{0}}; + int rc = 0; ENTRY; if (!page) @@ -2582,9 +2587,25 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, CFS_INIT_LIST_HEAD(&oap->oap_pending_item); CFS_INIT_LIST_HEAD(&oap->oap_urgent_item); CFS_INIT_LIST_HEAD(&oap->oap_rpc_item); + CFS_INIT_LIST_HEAD(&oap->oap_page_list); oap->oap_occ.occ_interrupted = osc_occ_interrupted; + spin_lock_init(&oap->oap_lock); + + /* If the page was marked as notcacheable - don't add to any locks */ + if (!nocache) { + oid.name[0] = loi->loi_id; + oid.name[2] = loi->loi_gr; + /* This is the only place where we can call cache_add_extent + without oap_lock, because this page is locked now, and + the lock we are adding it to is referenced, so cannot lose + any pages either. */ + rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh); + if (rc) + RETURN(rc); + } + CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset); RETURN(0); } @@ -2869,6 +2890,7 @@ static int osc_teardown_async_page(struct obd_export *exp, lop_update_pending(cli, lop, oap->oap_cmd, -1); } loi_list_maint(cli, loi); + cache_remove_extent(cli->cl_cache, oap); LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page); out: @@ -2876,6 +2898,49 @@ out: RETURN(rc); } +int osc_extent_blocking_cb(struct ldlm_lock *lock, + struct ldlm_lock_desc *new, void *data, + int flag) +{ + struct lustre_handle lockh = { 0 }; + int rc; + ENTRY; + + if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) { + LDLM_ERROR(lock, "cancelling lock with bad data %p", data); + LBUG(); + } + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel failed: %d\n", rc); + break; + case LDLM_CB_CANCELING: { + + ldlm_lock2handle(lock, &lockh); + /* This lock wasn't granted, don't try to do anything */ + if (lock->l_req_mode != lock->l_granted_mode) + RETURN(0); + + cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache, + &lockh); + + if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb) + lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb( + lock, new, data,flag); + break; + } + default: + LBUG(); + } + + RETURN(0); +} +EXPORT_SYMBOL(osc_extent_blocking_cb); + static void osc_set_data_with_check(struct lustre_handle *lockh, void *data, int flags) { @@ -2920,8 +2985,8 @@ static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, return 0; } -static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, - int intent, int rc) +static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req, + struct obd_info *oinfo, int intent, int rc) { ENTRY; @@ -2945,6 +3010,9 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo, oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime); } + if (!rc) + cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh); + /* Call the update callback. */ rc = oinfo->oi_cb_up(oinfo, rc); RETURN(rc); @@ -2971,7 +3039,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req, aa->oa_oi->oi_lockh, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc); + rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc); /* Release the lock for async request. */ if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK) @@ -3101,7 +3169,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, RETURN(rc); } - rc = osc_enqueue_fini(req, oinfo, intent, rc); + rc = osc_enqueue_fini(obd, req, oinfo, intent, rc); if (intent) ptlrpc_req_finished(req); @@ -3836,6 +3904,11 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, OST_MAXREQSIZE, ptlrpc_add_rqs_to_pool); + cli->cl_cache = cache_create(obd); + if (!cli->cl_cache) { + osc_cleanup(obd); + rc = -ENOMEM; + } } RETURN(rc); @@ -3901,12 +3974,50 @@ int osc_cleanup(struct obd_device *obd) /* free memory of osc quota cache */ lquota_cleanup(quota_interface, obd); + cache_destroy(obd->u.cli.cl_cache); rc = client_obd_cleanup(obd); ptlrpcd_decref(); RETURN(rc); } +static int osc_register_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func, + obd_pin_extent_cb pin_cb) +{ + return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func, + pin_cb); +} + +static int osc_unregister_page_removal_cb(struct obd_export *exp, + obd_page_removal_cb_t func) +{ + return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func); +} + +static int osc_register_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL); + + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb; + return 0; +} + +static int osc_unregister_lock_cancel_cb(struct obd_export *exp, + obd_lock_cancel_cb cb) +{ + if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) { + CERROR("Unregistering cancel cb %p, while only %p was " + "registered\n", cb, + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb); + RETURN(-EINVAL); + } + + exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL; + return 0; +} + static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) { struct lustre_cfg *lcfg = buf; @@ -3972,6 +4083,10 @@ struct obd_ops osc_obd_ops = { .o_llog_init = osc_llog_init, .o_llog_finish = osc_llog_finish, .o_process_config = osc_process_config, + .o_register_page_removal_cb = osc_register_page_removal_cb, + .o_unregister_page_removal_cb = osc_unregister_page_removal_cb, + .o_register_lock_cancel_cb = osc_register_lock_cancel_cb, + .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb, }; int __init osc_init(void) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 91b186738917905ee9190e822b417d477f2d95eb..b7df21d8f9fa4e0501694a81aae33e45cec2b154 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -2706,7 +2706,9 @@ test_62() { cat $f && error "cat succeeded, expect -EIO" lctl set_param fail_loc=0 } -run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)" +# This test is now irrelevant (as of bug 10718 inclusion), we no longer +# match every page all of the time. +#run_test 62 "verify obd_match failure doesn't LBUG (should -EIO)" # bug 2319 - oig_wait() interrupted causes crash because of invalid waitq. test_63a() { # was test_63 @@ -3475,6 +3477,20 @@ test_79() { # bug 12743 } run_test 79 "df report consistency check =======================" +test_80() { # bug 10718 + dd if=/dev/zero of=$DIR/$tfile bs=1M count=1 seek=1M + sync; sleep 1; sync + BEFORE=`date +%s` + cancel_lru_locks OSC + AFTER=`date +%s` + DIFF=$((AFTER-BEFORE)) + if [ $DIFF -gt 1 ] ; then + error "elapsed for 1M@1T = $DIFF" + fi + true +} +run_test 80 "Page eviction is equally fast at high offsets too ====" + # on the LLNL clusters, runas will still pick up root's $TMP settings, # which will not be writable for the runas user, and then you get a CVS # error message with a corrupt path string (CVS bug) and panic.