diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 9db73b57606f7b8b364a5855c88999e748c873bf..30f57b5d45136c2ecf899de57be6d33de828090e 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -93,8 +93,8 @@ struct lov_oinfo { /* per-stripe data structure */ /* used by the osc to keep track of what objects to build into rpcs */ struct loi_oap_pages loi_read_lop; struct loi_oap_pages loi_write_lop; - /* _cli_ is poorly named, it should be _ready_ */ - struct list_head loi_cli_item; + struct list_head loi_ready_item; + struct list_head loi_hp_ready_item; struct list_head loi_write_item; struct list_head loi_read_item; @@ -112,7 +112,8 @@ static inline void loi_init(struct lov_oinfo *loi) CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending); CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_urgent); CFS_INIT_LIST_HEAD(&loi->loi_write_lop.lop_pending_group); - CFS_INIT_LIST_HEAD(&loi->loi_cli_item); + CFS_INIT_LIST_HEAD(&loi->loi_ready_item); + CFS_INIT_LIST_HEAD(&loi->loi_hp_ready_item); CFS_INIT_LIST_HEAD(&loi->loi_write_item); CFS_INIT_LIST_HEAD(&loi->loi_read_item); } @@ -233,6 +234,7 @@ enum async_flags { the page is accounted for in the obd_io_group given to obd_queue_group_io */ + ASYNC_HP = 0x10, }; struct obd_async_page_ops { @@ -410,6 +412,7 @@ struct client_obd { */ client_obd_lock_t cl_loi_list_lock; struct list_head cl_loi_ready_list; + struct list_head cl_loi_hp_ready_list; struct list_head cl_loi_write_list; struct list_head cl_loi_read_list; int cl_r_in_flight; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index dd8147bbc5bb938281b208e9133d00d8b685c0a9..5bc07a2812ff33def0c39d49f4da4d89fa5480fa 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -266,6 +266,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) cli->cl_dirty_max = num_physpages << (CFS_PAGE_SHIFT - 3); CFS_INIT_LIST_HEAD(&cli->cl_cache_waiters); CFS_INIT_LIST_HEAD(&cli->cl_loi_ready_list); + CFS_INIT_LIST_HEAD(&cli->cl_loi_hp_ready_list); CFS_INIT_LIST_HEAD(&cli->cl_loi_write_list); CFS_INIT_LIST_HEAD(&cli->cl_loi_read_list); client_obd_list_lock_init(&cli->cl_loi_list_lock); diff --git a/lustre/osc/cache.c b/lustre/osc/cache.c index 40c33b1178d037030253b200ca35666e01c87922..371b78e12d88fd67cdae303f6eb116cc78c9bbc2 100644 --- a/lustre/osc/cache.c +++ b/lustre/osc/cache.c @@ -348,6 +348,9 @@ static int cache_remove_extents_from_lock(struct lustre_cache *cache, cache_extent_removal_event */ ext_data = extent->oap_page; cache->lc_pin_extent_cb(extent->oap_page); + + if (lock->l_flags & LDLM_FL_BL_AST) + extent->oap_async_flags |= ASYNC_HP; spin_unlock(&extent->oap_lock); spin_unlock(&lock->l_extents_list_lock); cache_extent_removal_event(cache, ext_data, diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 070c95e2c2ae9a76937a56e027fb8bddf3b62190..186a0fe9ff81e5083135dda4bfd25b99049b1d7a 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1771,6 +1771,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, RETURN(0); } +static int lop_makes_hprpc(struct loi_oap_pages *lop) +{ + struct osc_async_page *oap; + ENTRY; + + if (list_empty(&lop->lop_urgent)) + RETURN(0); + + oap = list_entry(lop->lop_urgent.next, + struct osc_async_page, oap_urgent_item); + + if (oap->oap_async_flags & ASYNC_HP) { + CDEBUG(D_CACHE, "hp request forcing RPC\n"); + RETURN(1); + } + + RETURN(0); +} + static void on_list(struct list_head *item, struct list_head *list, int should_be_on) { @@ -1784,9 +1803,17 @@ static void on_list(struct list_head *item, struct list_head *list, * can find pages to build into rpcs quickly */ static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) { - on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, - lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) || - lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); + if (lop_makes_hprpc(&loi->loi_write_lop) || + lop_makes_hprpc(&loi->loi_read_lop)) { + /* HP rpc */ + on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0); + on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1); + } else { + on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0); + on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)|| + lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); + } on_list(&loi->loi_write_item, &cli->cl_loi_write_list, loi->loi_write_lop.lop_num_pending); @@ -1882,8 +1909,10 @@ static void osc_oap_to_pending(struct osc_async_page *oap) else lop = &oap->oap_loi->loi_read_lop; - if (oap->oap_async_flags & ASYNC_URGENT) + if (oap->oap_async_flags & ASYNC_HP) list_add(&oap->oap_urgent_item, &lop->lop_urgent); + else if (oap->oap_async_flags & ASYNC_URGENT) + list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); list_add_tail(&oap->oap_pending_item, &lop->lop_pending); lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1); } @@ -2095,6 +2124,15 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, int srvlock = 0; ENTRY; + /* If there are HP OAPs we need to handle at least 1 of them, + * move it the beginning of the pending list for that. */ + if (!list_empty(&lop->lop_urgent)) { + oap = list_entry(lop->lop_urgent.next, + struct osc_async_page, oap_urgent_item); + if (oap->oap_async_flags & ASYNC_HP) + list_move(&oap->oap_pending_item, &lop->lop_pending); + } + /* first we find the pages we're allowed to work with */ list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){ ops = oap->oap_caller_ops; @@ -2291,7 +2329,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, #define LOI_DEBUG(LOI, STR, args...) \ CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ - !list_empty(&(LOI)->loi_cli_item), \ + !list_empty(&(LOI)->loi_ready_item) || \ + !list_empty(&(LOI)->loi_hp_ready_item), \ (LOI)->loi_write_lop.lop_num_pending, \ !list_empty(&(LOI)->loi_write_lop.lop_urgent), \ (LOI)->loi_read_lop.lop_num_pending, \ @@ -2303,11 +2342,15 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, struct lov_oinfo *osc_next_loi(struct client_obd *cli) { ENTRY; - /* first return all objects which we already know to have - * pages ready to be stuffed into rpcs */ + /* First return objects that have blocked locks so that they + * will be flushed quickly and other clients can get the lock, + * then objects which have pages ready to be stuffed into RPCs */ + if (!list_empty(&cli->cl_loi_hp_ready_list)) + RETURN(list_entry(cli->cl_loi_hp_ready_list.next, + struct lov_oinfo, loi_hp_ready_item)); if (!list_empty(&cli->cl_loi_ready_list)) RETURN(list_entry(cli->cl_loi_ready_list.next, - struct lov_oinfo, loi_cli_item)); + struct lov_oinfo, loi_ready_item)); /* then if we have cache waiters, return all objects with queued * writes. This is especially important when many small files @@ -2331,6 +2374,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli) RETURN(NULL); } +static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi) +{ + struct osc_async_page *oap; + int hprpc = 0; + + if (!list_empty(&loi->loi_write_lop.lop_urgent)) { + oap = list_entry(loi->loi_write_lop.lop_urgent.next, + struct osc_async_page, oap_urgent_item); + hprpc = !!(oap->oap_async_flags & ASYNC_HP); + } + + if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) { + oap = list_entry(loi->loi_write_lop.lop_urgent.next, + struct osc_async_page, oap_urgent_item); + hprpc = !!(oap->oap_async_flags & ASYNC_HP); + } + + return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc; +} + /* called with the loi list lock held */ static void osc_check_rpcs(struct client_obd *cli) { @@ -2341,7 +2404,7 @@ static void osc_check_rpcs(struct client_obd *cli) while ((loi = osc_next_loi(cli)) != NULL) { LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); - if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight) + if (osc_max_rpc_in_flight(cli, loi)) break; /* attempt some read/write balancing by alternating between @@ -2373,8 +2436,10 @@ static void osc_check_rpcs(struct client_obd *cli) /* attempt some inter-object balancing by issueing rpcs * for each object in turn */ - if (!list_empty(&loi->loi_cli_item)) - list_del_init(&loi->loi_cli_item); + if (!list_empty(&loi->loi_hp_ready_item)) + list_del_init(&loi->loi_hp_ready_item); + if (!list_empty(&loi->loi_ready_item)) + list_del_init(&loi->loi_ready_item); if (!list_empty(&loi->loi_write_item)) list_del_init(&loi->loi_write_item); if (!list_empty(&loi->loi_read_item)) @@ -2679,11 +2744,14 @@ static int osc_set_async_flags(struct obd_export *exp, if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY)) oap->oap_async_flags |= ASYNC_READY; - if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) { - if (list_empty(&oap->oap_rpc_item)) { + if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) && + list_empty(&oap->oap_rpc_item)) { + if (oap->oap_async_flags & ASYNC_HP) list_add(&oap->oap_urgent_item, &lop->lop_urgent); - loi_list_maint(cli, loi); - } + else + list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent); + oap->oap_async_flags |= ASYNC_URGENT; + loi_list_maint(cli, loi); } LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page, @@ -2818,8 +2886,9 @@ static int osc_teardown_async_page(struct obd_export *exp, if (!list_empty(&oap->oap_urgent_item)) { list_del_init(&oap->oap_urgent_item); - oap->oap_async_flags &= ~ASYNC_URGENT; + oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP); } + if (!list_empty(&oap->oap_pending_item)) { list_del_init(&oap->oap_pending_item); lop_update_pending(cli, lop, oap->oap_cmd, -1);