diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 8543659c4e4012d8ba016cd79c0de73a661d4865..04d6a2186f42cfec58c9607bb08035a0b1b72b86 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -181,6 +181,7 @@ struct ptlrpc_request_set { cfs_waitq_t set_waitq; cfs_waitq_t *set_wakeup_ptr; struct list_head set_requests; + struct list_head set_cblist; /* list of completion callbacks */ set_interpreter_func set_interpret; /* completion callback */ void *set_arg; /* completion context */ void *set_countp; /* pointer to NOB counter in case @@ -191,6 +192,12 @@ struct ptlrpc_request_set { struct list_head set_new_requests; }; +struct ptlrpc_set_cbdata { + struct list_head psc_item; + set_interpreter_func psc_interpret; + void *psc_data; +}; + struct ptlrpc_bulk_desc; /* @@ -689,6 +696,8 @@ void ptlrpc_restart_req(struct ptlrpc_request *req); void ptlrpc_abort_inflight(struct obd_import *imp); struct ptlrpc_request_set *ptlrpc_prep_set(void); +int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, + set_interpreter_func fn, void *data); int ptlrpc_set_next_timeout(struct ptlrpc_request_set *); int ptlrpc_check_set(struct ptlrpc_request_set *set); int ptlrpc_set_wait(struct ptlrpc_request_set *); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index cfb217eab271236c5413867dbd2f70471ca21e4b..19bf151fd070dae3ecab45407408db1a24225c8e 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -923,15 +923,12 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp, { struct ptlrpc_request_set *set = NULL; struct obd_info oinfo = { { { 0 } } }; - atomic_t nob; int rc = 0; ENTRY; set = ptlrpc_prep_set(); if (set == NULL) RETURN(-ENOMEM); - atomic_set(&nob, 0); - set->set_countp = &nob; oinfo.oi_oa = oa; oinfo.oi_md = lsm; @@ -940,8 +937,6 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp, rc = ptlrpc_set_wait(set); if (rc) CERROR("error from callback: rc = %d\n", rc); - else - rc = atomic_read(&nob); } else { CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "error from obd_brw_async: rc = %d\n", rc); diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index d32a4472bb3807c53906b4f1b827a076c1362fef..5ea3a6c0e87307d2ca66faddc89132f55384722b 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -133,15 +133,14 @@ static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, struct address_space *mapping, - struct lov_stripe_md *lsm, + struct obd_info *oinfo, + struct ptlrpc_request_set *set, size_t size, loff_t file_offset, struct page **pages, int page_count) { struct brw_page *pga; - struct obdo oa; int i, rc = 0; size_t length; - loff_t file_offset_orig = file_offset; ENTRY; OBD_ALLOC(pga, sizeof(*pga) * page_count); @@ -163,15 +162,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, POISON_PAGE(pages[i], 0x0d); } - ll_inode_fill_obdo(inode, rw, &oa); - - rc = obd_brw_rqset(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, - ll_i2obdexp(inode), &oa, lsm, page_count, pga, NULL); - if ((rc > 0) && (rw == WRITE)) { - lov_stripe_lock(lsm); - obd_adjust_kms(ll_i2obdexp(inode), lsm, file_offset_orig + rc, 0); - lov_stripe_unlock(lsm); - } + rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + ll_i2obdexp(inode), oinfo, page_count, + pga, NULL, set); + if (rc == 0) + rc = size; OBD_FREE(pga, sizeof(*pga) * page_count); RETURN(rc); @@ -191,6 +186,10 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, struct inode *inode = file->f_mapping->host; ssize_t count = iov_length(iov, nr_segs), tot_bytes = 0; struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct ptlrpc_request_set *set; + struct obd_info oinfo; + struct obdo oa; unsigned long seg; size_t size = MAX_DIO_SIZE; ENTRY; @@ -220,10 +219,30 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, RETURN(-EINVAL); } + set = ptlrpc_prep_set(); + if (set == NULL) + RETURN(-ENOMEM); + + ll_inode_fill_obdo(inode, rw, &oa); + oinfo.oi_oa = &oa; + oinfo.oi_md = lsm; + + /* need locking between buffered and direct access. and race with + *size changing by concurrent truncates and writes. */ + if (rw == READ) + LOCK_INODE_MUTEX(inode); + for (seg = 0; seg < nr_segs; seg++) { size_t iov_left = iov[seg].iov_len; unsigned long user_addr = (unsigned long)iov[seg].iov_base; + if (rw == READ) { + if (file_offset >= inode->i_size) + break; + if (file_offset + iov_left > inode->i_size) + iov_left = inode->i_size - file_offset; + } + while (iov_left > 0) { struct page **pages; int page_count; @@ -236,7 +255,7 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, if (page_count > 0) { result = ll_direct_IO_26_seg(rw, inode, file->f_mapping, - lli->lli_smd, + &oinfo, set, min(size,iov_left), file_offset, pages, page_count); @@ -261,8 +280,8 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, continue; } if (tot_bytes > 0) - RETURN(tot_bytes); - RETURN(page_count < 0 ? page_count : result); + GOTO(wait_io, tot_bytes); + GOTO(out, tot_bytes = page_count < 0 ? page_count : result); } tot_bytes += result; @@ -271,6 +290,24 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, user_addr += result; } } + + if (tot_bytes > 0) { + int rc; + wait_io: + rc = ptlrpc_set_wait(set); + if (rc) + GOTO(out, tot_bytes = rc); + if (rw == WRITE) { + lov_stripe_lock(lsm); + obd_adjust_kms(ll_i2obdexp(inode), lsm, file_offset, 0); + lov_stripe_unlock(lsm); + } + } +out: + if (rw == READ) + UNLOCK_INODE_MUTEX(inode); + + ptlrpc_set_destroy(set); RETURN(tot_bytes); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 31569ca2168c4dd5a7048e16a0f4cb10922f91ae..0e6c873e9937221b5c515ff7b6ac03b004d11516 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1583,8 +1583,10 @@ static int lov_brw_async(int cmd, struct obd_export *exp, } LASSERT(rc == 0); LASSERT(set->set_interpret == NULL); - set->set_interpret = (set_interpreter_func)lov_brw_interpret; - set->set_arg = (void *)lovset; + LASSERT(set->set_arg == NULL); + rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset); + if (rc) + GOTO(out, rc); RETURN(rc); out: @@ -2501,7 +2503,6 @@ static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm, RETURN(0); } - #if 0 struct lov_multi_wait { struct ldlm_lock *lock; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 381631cadffbb34876b79d97c3da31b9694073a9..955caa1b6a36f869e188a7fa5b01c9a7e4df44b5 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1354,7 +1354,6 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc) { struct osc_brw_async_args *aa = data; int i; - int nob = rc; ENTRY; rc = osc_brw_fini_request(request, rc); @@ -1364,8 +1363,6 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc) if (rc == 0) RETURN(0); } - if ((rc >= 0) && request->rq_set && request->rq_set->set_countp) - atomic_add(nob, (atomic_t *)request->rq_set->set_countp); client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock); if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE) aa->aa_cli->cl_w_in_flight--; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index c182672cf12ab97791c53c8308e485f666351abe..2d30542a097affef226d775b4c67d65065dfa922 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -573,6 +573,7 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void) set->set_remaining = 0; spin_lock_init(&set->set_new_req_lock); CFS_INIT_LIST_HEAD(&set->set_new_requests); + CFS_INIT_LIST_HEAD(&set->set_cblist); RETURN(set); } @@ -632,6 +633,23 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) EXIT; } +int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, + set_interpreter_func fn, void *data) +{ + struct ptlrpc_set_cbdata *cbdata; + + OBD_SLAB_ALLOC(cbdata, ptlrpc_cbdata_slab, + CFS_ALLOC_STD, sizeof(*cbdata)); + if (cbdata == NULL) + RETURN(-ENOMEM); + + cbdata->psc_interpret = fn; + cbdata->psc_data = data; + list_add_tail(&cbdata->psc_item, &set->set_cblist); + + RETURN(0); +} + void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req) { @@ -1375,6 +1393,19 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) int (*interpreter)(struct ptlrpc_request_set *set,void *,int) = set->set_interpret; rc = interpreter (set, set->set_arg, rc); + } else { + struct ptlrpc_set_cbdata *cbdata, *n; + int err; + + list_for_each_entry_safe(cbdata, n, + &set->set_cblist, psc_item) { + list_del_init(&cbdata->psc_item); + err = cbdata->psc_interpret(set, cbdata->psc_data, rc); + if (err && !rc) + rc = err; + OBD_SLAB_FREE(cbdata, ptlrpc_cbdata_slab, + sizeof(*cbdata)); + } } RETURN(rc); diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index f427fbe0293f42fa93980e29be9e49f8e32d9118..a769aa50605a2aea63898e2f4ae8a16c378a4f22 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -35,6 +35,7 @@ struct obd_import; struct ldlm_res_id; struct ptlrpc_request_set; extern int test_req_buffer_pressure; +extern cfs_mem_cache_t *ptlrpc_cbdata_slab; void ptlrpc_request_handle_notconn(struct ptlrpc_request *); void lustre_assert_wire_constants(void); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 66d51b9f8671dd84af0397fb359d6d5e133d48e0..4a46cb7f61cfefc93d2ae53a2af6c8182bc6f8c8 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -37,7 +37,7 @@ #include <lustre_net.h> #include "ptlrpc_internal.h" - +cfs_mem_cache_t *ptlrpc_cbdata_slab; extern spinlock_t ptlrpc_last_xid_lock; extern spinlock_t ptlrpc_rs_debug_lock; extern spinlock_t ptlrpc_all_services_lock; @@ -78,10 +78,20 @@ __init int ptlrpc_init(void) rc = ldlm_init(); if (rc) GOTO(cleanup, rc); + cleanup_phase = 4; + + ptlrpc_cbdata_slab = cfs_mem_cache_create("ptlrpc_cbdatas", + sizeof (struct ptlrpc_set_cbdata), 0, + SLAB_HWCACHE_ALIGN); + if (ptlrpc_cbdata_slab == NULL) + GOTO(cleanup, rc); + RETURN(0); cleanup: switch(cleanup_phase) { + case 4: + ldlm_exit(); case 3: ptlrpc_stop_pinger(); case 2: @@ -101,6 +111,7 @@ static void __exit ptlrpc_exit(void) ptlrpc_stop_pinger(); ptlrpc_exit_portals(); ptlrpc_cleanup_connection(); + cfs_mem_cache_destroy(ptlrpc_cbdata_slab); } /* connection.c */ @@ -149,6 +160,7 @@ EXPORT_SYMBOL(ptlrpc_retain_replayable_request); EXPORT_SYMBOL(ptlrpc_next_xid); EXPORT_SYMBOL(ptlrpc_prep_set); +EXPORT_SYMBOL(ptlrpc_set_add_cb); EXPORT_SYMBOL(ptlrpc_set_add_req); EXPORT_SYMBOL(ptlrpc_set_add_new_req); EXPORT_SYMBOL(ptlrpc_set_destroy); diff --git a/lustre/tests/directio.c b/lustre/tests/directio.c index ebcedb2c781063e69a3716234228b480efd27622..1108cba9bdcf897aa3eb07d3329bb6e1146de7e4 100644 --- a/lustre/tests/directio.c +++ b/lustre/tests/directio.c @@ -23,11 +23,12 @@ int main(int argc, char **argv) long len; off64_t seek; struct stat64 st; + char pad = 0xba; int action; int rc; if (argc < 5 || argc > 6) { - printf("Usage: %s <read/write/rdwr> file seek nr_blocks [blocksize]\n", argv[0]); + printf("Usage: %s <read/write/rdwr/readhole> file seek nr_blocks [blocksize]\n", argv[0]); return 1; } @@ -37,7 +38,10 @@ int main(int argc, char **argv) action = O_WRONLY; else if (!strcmp(argv[1], "rdwr")) action = O_RDWR; - else { + else if (!strcmp(argv[1], "readhole")) { + action = O_RDONLY; + pad = 0; + } else { printf("Usage: %s <read/write/rdwr> file seek nr_blocks [blocksize]\n", argv[0]); return 1; } @@ -74,7 +78,7 @@ int main(int argc, char **argv) printf("No memory %s\n", strerror(errno)); return 1; } - memset(wbuf, 0xba, len); + memset(wbuf, pad, len); if (action == O_WRONLY || action == O_RDWR) { if (lseek64(fd, seek, SEEK_SET) < 0) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 88779aeaf8385293decc07ad0d4f664b00180339..6285336b1f5b9def96e493f84afc1a871d874147 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -4302,9 +4302,19 @@ test_119b() # bug 11737 sync multiop $DIR/$tfile oO_RDONLY:O_DIRECT:r$((2048 * 1024)) || \ error "direct read failed" + rm -f $DIR/$tfile } run_test 119b "Sparse directIO read must return actual read amount" +test_119c() # bug 13099 +{ + BSIZE=1048576 + directio write $DIR/$tfile 3 1 $BSIZE || error "direct write failed" + directio readhole $DIR/$tfile 0 2 $BSIZE || error "reading hole failed" + rm -f $DIR/$tfile +} +run_test 119c "Testing for direct read hitting hole" + LDLM_POOL_CTL_RECALC=1 LDLM_POOL_CTL_SHRINK=2