diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index 0885a80319549cd5733f4622597e1b7d5ba9421c..c4754ae409c86ebd51373a092e2a059434f9cd38 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -46,6 +46,8 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb) if (IS_ERR(page)) return lnb->rc = PTR_ERR(page); + LASSERT(page->mapping == mapping); + lnb->page = page; if (inode->i_size < lnb->offset + lnb->len - 1) @@ -242,11 +244,12 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, struct fsfilt_objinfo *fso; struct dentry *dentry; struct inode *inode; - int rc = 0, i, j, tot_bytes = 0; + int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0; unsigned long now = jiffies; ENTRY; - /* We are currently not supporting multi-obj BRW_READ RPCS at all */ + /* We are currently not supporting multi-obj BRW_READ RPCS at all. + * When we do this function's dentry cleanup will need to be fixed */ LASSERT(objcount == 1); OBD_ALLOC(fso, objcount * sizeof(*fso)); @@ -262,13 +265,13 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, dentry = filter_oa2dentry(exp->exp_obd, oa); if (IS_ERR(dentry)) - GOTO(out_objinfo, rc = PTR_ERR(dentry)); + GOTO(cleanup, rc = PTR_ERR(dentry)); if (dentry->d_inode == NULL) { CERROR("trying to BRW to non-existent file "LPU64"\n", o->ioo_id); f_dput(dentry); - GOTO(out_objinfo, rc = -ENOENT); + GOTO(cleanup, rc = -ENOENT); } fso[i].fso_dentry = dentry; @@ -288,11 +291,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, inode = dentry->d_inode; for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) { - if (j == 0) - lnb->dentry = dentry; - else - lnb->dentry = dget(dentry); - + lnb->dentry = dentry; lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; @@ -302,7 +301,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, /* If there's no more data, abort early. * lnb->page == NULL and lnb->rc == 0, so it's * easy to detect later. */ - f_dput(dentry); lnb->dentry = NULL; break; } else { @@ -314,13 +312,16 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, "page err %u@"LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset, j, o->ioo_bufcnt, dentry, rc); - f_dput(dentry); - GOTO(out_pages, rc); + cleanup_phase = 1; + GOTO(cleanup, rc); } tot_bytes += lnb->rc; - if (lnb->rc < lnb->len) - break; /* short read */ + if (lnb->rc < lnb->len) { + /* short read, be sure to wait on it */ + lnb++; + break; + } } } @@ -334,8 +335,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, if (rc) { CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len, lnb->offset, (int)(lnb - res), lnb->dentry, rc); - f_dput(lnb->dentry); - GOTO(out_pages, rc); + cleanup_phase = 1; + GOTO(cleanup, rc); } } @@ -343,24 +344,20 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ); EXIT; -out: - OBD_FREE(fso, objcount * sizeof(*fso)); - /* we saved the journal handle into oti->oti_handle instead */ - current->journal_info = NULL; - pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL); - return rc; -out_pages: - while (lnb-- > res) { - page_cache_release(lnb->page); - f_dput(lnb->dentry); + cleanup: + switch (cleanup_phase) { + case 1: + for (lnb = res; lnb < (res + niocount); lnb++) { + if (lnb->page) + page_cache_release(lnb->page); + } + f_dput(res->dentry); + case 0: + OBD_FREE(fso, objcount * sizeof(*fso)); + pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL); } - goto out; /* dropped the dentry refs already (one per page) */ - -out_objinfo: - for (i = 0; i < objcount && fso[i].fso_dentry; i++) - f_dput(fso[i].fso_dentry); - goto out; + return rc; } /* We need to balance prepare_write() calls with commit_write() calls. @@ -600,9 +597,30 @@ static int filter_write_locked_page(struct niobuf_local *lnb) RETURN(rc); } -int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_local *res, struct obd_trans_info *oti) +static int filter_commitrw_read(struct obd_export *exp, int objcount, + struct obd_ioobj *obj, int niocount, + struct niobuf_local *res, + struct obd_trans_info *oti) +{ + struct obd_ioobj *o; + struct niobuf_local *lnb; + int i, j; + ENTRY; + + for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) { + for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) { + if (lnb->page != NULL) + page_cache_release(lnb->page); + } + } + f_dput(res->dentry); + RETURN(0); +} + +static int +filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa, + int objcount, struct obd_ioobj *obj, int niocount, + struct niobuf_local *res, struct obd_trans_info *oti) { struct obd_run_ctxt saved; struct obd_ioobj *o; @@ -739,6 +757,21 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, RETURN(rc); } +/* XXX needs to trickle its oa down */ +int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, + int objcount, struct obd_ioobj *obj, int niocount, + struct niobuf_local *res, struct obd_trans_info *oti) +{ + if (cmd == OBD_BRW_WRITE) + return filter_commitrw_write(cmd, exp, oa, objcount, obj, + niocount, res, oti); + if (cmd == OBD_BRW_READ) + return filter_commitrw_read(exp, objcount, obj, niocount, + res, oti); + LBUG(); + return -EPROTO; +} + int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md *lsm, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *oti)