diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 2c297445d557165e21275337cf59fa08e55941d7..994f2e562948bcce076a756308dadf929bb7ad91 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -390,7 +390,8 @@ struct ldlm_namespace { unsigned int ns_max_unused; unsigned int ns_max_age; - + unsigned int ns_timeouts; + /* Lower limit to number of pages in lock to keep it in cache */ unsigned int ns_shrink_thumb; cfs_time_t ns_next_dump; /* next debug dump, jiffies */ diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 291e85c3f25721f82484cada33723b3af247d5fe..69df240e49b596b67faa488ade7900115a5d0588 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -139,6 +139,7 @@ struct obd_export { exp_replay_needed:1, exp_need_sync:1, /* needs sync from connect */ exp_libclient:1; /* liblustre client? */ + struct list_head exp_queued_rpc; /* RPC to be handled */ union { struct mds_export_data eu_mds_data; struct filter_export_data eu_filter_data; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 1238ae94158ba6bb279d6fdd8b2e7521a35a803a..65bd60a26e3086d8127563f754085eceddd18065 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -144,7 +144,7 @@ #define MGS_MAXREPSIZE (9 * 1024) /* Absolute limits */ -#define OSS_THREADS_MIN 2 +#define OSS_THREADS_MIN 3 /* difficult replies, HPQ, others */ #define OSS_THREADS_MAX 512 #define OST_NBUFS (64 * num_online_cpus()) #define OST_BUFSIZE (8 * 1024) @@ -278,11 +278,28 @@ struct ptlrpc_request_pool { void (*prp_populate)(struct ptlrpc_request_pool *, int); }; +struct ldlm_lock; + +struct ptlrpc_hpreq_ops { + /** + * Check if the lock handle of the given lock is the same as + * taken from the request. + */ + int (*hpreq_lock_match)(struct ptlrpc_request *, struct ldlm_lock *); + /** + * Check if the request is a high priority one. + */ + int (*hpreq_check)(struct ptlrpc_request *); +}; + struct ptlrpc_request { int rq_type; /* one of PTL_RPC_MSG_* */ struct list_head rq_list; struct list_head rq_timed_list; /* server-side early replies */ struct list_head rq_history_list; /* server-side history */ + struct list_head rq_exp_list; /* server-side per-export list */ + struct ptlrpc_hpreq_ops *rq_ops; /* server-side hp handlers */ + __u64 rq_history_seq; /* history sequence # */ int rq_status; spinlock_t rq_lock; @@ -303,7 +320,8 @@ struct ptlrpc_request { rq_no_delay:1, rq_net_err:1, rq_early:1, rq_must_unlink:1, /* server-side flags */ rq_packed_final:1, /* packed final reply */ - rq_sent_final:1; /* stop sending early replies */ + rq_sent_final:1, /* stop sending early replies */ + rq_hp:1; /* high priority RPC */ enum rq_phase rq_phase; /* one of RQ_PHASE_* */ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ atomic_t rq_refcount; /* client-side refcount for SENT race, @@ -453,9 +471,9 @@ ptlrpc_rqphase2str(struct ptlrpc_request *req) FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"), \ FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \ FLAG(req->rq_no_resend, "N"), \ - FLAG(req->rq_waiting, "W") + FLAG(req->rq_waiting, "W"), FLAG(req->rq_hp, "H") -#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s" +#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s" void _debug_req(struct ptlrpc_request *req, __u32 mask, struct libcfs_debug_msg_data *data, const char *fmt, ...) @@ -554,6 +572,9 @@ struct ptlrpc_request_buffer_desc { typedef int (*svc_handler_t)(struct ptlrpc_request *req); typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *); +typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *); + +#define PTLRPC_SVC_HP_RATIO 10 struct ptlrpc_service { struct list_head srv_list; /* chain thru all services */ @@ -568,6 +589,7 @@ struct ptlrpc_service { int srv_threads_running; /* # running threads */ int srv_n_difficult_replies; /* # 'difficult' replies */ int srv_n_active_reqs; /* # reqs being served */ + int srv_n_hpreq; /* # HPreqs being served */ cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */ int srv_watchdog_factor; /* soft watchdog timeout mutiplier */ unsigned srv_cpu_affinity:1; /* bind threads to CPUs */ @@ -584,8 +606,11 @@ struct ptlrpc_service { cfs_timer_t srv_at_timer; /* early reply timer */ int srv_n_queued_reqs; /* # reqs in either of the queues below */ + int srv_hpreq_count; /* # hp requests handled */ + int srv_hpreq_ratio; /* # hp per lp reqs to handle */ struct list_head srv_req_in_queue; /* incoming reqs */ struct list_head srv_request_queue; /* reqs waiting for service */ + struct list_head srv_request_hpq; /* high priority queue */ struct list_head srv_request_history; /* request history */ __u64 srv_request_seq; /* next request sequence # */ @@ -610,6 +635,7 @@ struct ptlrpc_service { struct list_head srv_threads; /* service thread list */ svc_handler_t srv_handler; + svc_hpreq_handler_t srv_hpreq_handler; /* hp request handler */ char *srv_name; /* only statically allocated strings here; we don't clean them */ char *srv_thread_name; /* only statically allocated strings here; we don't clean them */ @@ -821,7 +847,7 @@ struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, cfs_proc_dir_entry_t *proc_entry, svcreq_printfn_t, int min_threads, int max_threads, - char *threadname); + char *threadname, svc_hpreq_handler_t); void ptlrpc_stop_all_threads(struct ptlrpc_service *svc); int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc); @@ -830,6 +856,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service); int liblustre_check_services (void *arg); void ptlrpc_daemonize(char *name); int ptlrpc_service_health_check(struct ptlrpc_service *); +void ptlrpc_hpreq_reorder(struct ptlrpc_request *req); struct ptlrpc_svc_data { char *name; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index e040a05d79b218a151ad1b9eaca76e4a4f8c100b..364f7c7e63904f9532578f104ade0b6013ce39e8 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -261,6 +261,8 @@ extern unsigned int obd_alloc_fail_rate; #define OBD_FAIL_PTLRPC_DUMP_LOG 0x50e #define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f +#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510 +#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511 #define OBD_FAIL_OBD_PING_NET 0x600 #define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601 diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 32e48ced6045c3c28e249f6fc4eea9f35577b7e3..c84f3403645e873e8cff3b9264793f403df218e1 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -358,6 +358,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) CFS_INIT_LIST_HEAD(&lock->l_extents_list); spin_lock_init(&lock->l_extents_list_lock); CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list); + lock->l_callback_timeout = 0; RETURN(lock); } @@ -1792,7 +1793,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, data->msg_fn, data->msg_line, fmt, args, " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: " - LPX64" expref: %d pid: %u\n", lock, + LPX64" expref: %d pid: %u timeout: %lu\n", lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], @@ -1800,7 +1801,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); va_end(args); return; } @@ -1812,7 +1813,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64 "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64 - " expref: %d pid: %u\n", + " expref: %d pid: %u timeout %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, @@ -1828,7 +1829,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; case LDLM_FLOCK: libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file, @@ -1836,7 +1837,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d " "["LPU64"->"LPU64"] flags: %x remote: "LPX64 - " expref: %d pid: %u\n", + " expref: %d pid: %u timeout: %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, @@ -1852,7 +1853,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; case LDLM_IBITS: libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file, @@ -1860,7 +1861,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s " "flags: %x remote: "LPX64" expref: %d " - "pid %u\n", + "pid: %u timeout: %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read (&lock->l_refc), @@ -1875,14 +1876,14 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; default: libcfs_debug_vmsg2(cdls, data->msg_subsys, level, data->msg_file, data->msg_fn, data->msg_line, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x " - "remote: "LPX64" expref: %d pid: %u\n", + "remote: "LPX64" expref: %d pid: %u timeout %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read (&lock->l_refc), @@ -1896,7 +1897,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; } va_end(args); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index f9309528796394eec318541a2542bff21257b811..df3d12051edaf2b44d9a83cbcf55db41aa05d94b 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -236,6 +236,31 @@ static int expired_lock_main(void *arg) RETURN(0); } +/** + * Check if there is a request in the export request list + * which prevents the lock canceling. + */ +static int ldlm_lock_busy(struct ldlm_lock *lock) +{ + struct ptlrpc_request *req; + int match = 0; + ENTRY; + + if (lock->l_export == NULL) + return 0; + + spin_lock(&lock->l_export->exp_lock); + list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { + if (req->rq_ops->hpreq_lock_match) { + match = req->rq_ops->hpreq_lock_match(req, lock); + if (match) + break; + } + } + spin_unlock(&lock->l_export->exp_lock); + RETURN(match); +} + /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { @@ -245,11 +270,33 @@ static void waiting_locks_callback(unsigned long unused) while (!list_empty(&waiting_locks_list)) { lock = list_entry(waiting_locks_list.next, struct ldlm_lock, l_pending_chain); - if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) || (lock->l_req_mode == LCK_GROUP)) break; + /* Check if we need to prolong timeout */ + if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) && + ldlm_lock_busy(lock)) { + int cont = 1; + + if (lock->l_pending_chain.next == &waiting_locks_list) + cont = 0; + + LDLM_LOCK_GET(lock); + spin_unlock_bh(&waiting_locks_spinlock); + LDLM_DEBUG(lock, "prolong the busy lock"); + ldlm_refresh_waiting_lock(lock); + spin_lock_bh(&waiting_locks_spinlock); + + if (!cont) { + LDLM_LOCK_PUT(lock); + break; + } + + LDLM_LOCK_PUT(lock); + continue; + } + lock->l_resource->lr_namespace->ns_timeouts++; LDLM_ERROR(lock, "lock callback timer expired after %lds: " "evicting client at %s ", cfs_time_current_sec()- lock->l_enqueued_time.tv_sec, @@ -317,15 +364,21 @@ static void waiting_locks_callback(unsigned long unused) */ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock) { - int timeout; + cfs_time_t timeout; cfs_time_t timeout_rounded; if (!list_empty(&lock->l_pending_chain)) return 0; - timeout = ldlm_get_enq_timeout(lock); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) || + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) + timeout = 2; + else + timeout = ldlm_get_enq_timeout(lock); - lock->l_callback_timeout = cfs_time_shift(timeout); + timeout = cfs_time_shift(timeout); + if (likely(cfs_time_after(timeout, lock->l_callback_timeout))) + lock->l_callback_timeout = timeout; timeout_rounded = round_timeout(lock->l_callback_timeout); @@ -457,7 +510,6 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock) LDLM_DEBUG(lock, "refreshed"); return 1; } - #else /* !__KERNEL__ */ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) @@ -604,6 +656,30 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, RETURN(rc); } +/** + * Check if there are requests in the export request list which prevent + * the lock canceling and make these requests high priority ones. + */ +static void ldlm_lock_reorder_req(struct ldlm_lock *lock) +{ + struct ptlrpc_request *req; + ENTRY; + + if (lock->l_export == NULL) { + LDLM_DEBUG(lock, "client lock: no-op"); + RETURN_EXIT; + } + + spin_lock(&lock->l_export->exp_lock); + list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { + if (!req->rq_hp && req->rq_ops->hpreq_lock_match && + req->rq_ops->hpreq_lock_match(req, lock)) + ptlrpc_hpreq_reorder(req); + } + spin_unlock(&lock->l_export->exp_lock); + EXIT; +} + /* * ->l_blocking_ast() method for server-side locks. This is invoked when newly * enqueued server lock conflicts with given one. @@ -631,6 +707,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LASSERT(lock); LASSERT(data != NULL); + ldlm_lock_reorder_req(lock); + req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size, NULL); @@ -2074,7 +2152,7 @@ static int ldlm_setup(void) ldlm_callback_handler, "ldlm_cbd", ldlm_svc_proc_dir, NULL, ldlm_min_threads, ldlm_max_threads, - "ldlm_cb"); + "ldlm_cb", NULL); if (!ldlm_state->ldlm_cb_service) { CERROR("failed to start service\n"); @@ -2088,7 +2166,7 @@ static int ldlm_setup(void) ldlm_cancel_handler, "ldlm_canceld", ldlm_svc_proc_dir, NULL, ldlm_min_threads, ldlm_max_threads, - "ldlm_cn"); + "ldlm_cn", NULL); if (!ldlm_state->ldlm_cancel_service) { CERROR("failed to start service\n"); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 1865867761ccabefc7df53c9920af3d0a5133b33..60d6cc29dd13c1dee78d9c7d2b61cc618165da77 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -114,6 +114,7 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock) timeout = timeout + (timeout >> 1); /* 150% */ return max(timeout, ldlm_enqueue_min); } +EXPORT_SYMBOL(ldlm_get_enq_timeout); static int is_granted_or_cancelled(struct ldlm_lock *lock) { diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index baff0ba2bf84d66462b49a1b8fd2e0c7225eb319..bdf04684f1fc86d53d8770e9382f7560959413cc 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -283,6 +283,12 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); } else { + snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_timeouts", + ns->ns_name); + lock_vars[0].data = &ns->ns_timeouts; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes", ns->ns_name); lock_vars[0].data = &ns->ns_max_nolock_size; @@ -366,6 +372,7 @@ ldlm_namespace_new(struct obd_device *obd, char *name, ns->ns_nr_unused = 0; ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE; + ns->ns_timeouts = 0; spin_lock_init(&ns->ns_unused_lock); ns->ns_orig_connect_flags = 0; ns->ns_connect_flags = 0; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d89a320495f6beb270d6e8b1bbebdd9d71558be6..b1df5f5979ac993c4cc44bdf7cf2ad3a23e442ed 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2689,7 +2689,8 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR, mds_handle, LUSTRE_MDS_NAME, obd->obd_proc_entry, target_print_req, - mds_min_threads, mds_max_threads, "ll_mdt"); + mds_min_threads, mds_max_threads, "ll_mdt", + NULL); if (!mds->mds_service) { CERROR("failed to start service\n"); @@ -2707,7 +2708,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) mds_handle, "mds_setattr", obd->obd_proc_entry, target_print_req, mds_min_threads, mds_max_threads, - "ll_mdt_attr"); + "ll_mdt_attr", NULL); if (!mds->mds_setattr_service) { CERROR("failed to start getattr service\n"); GOTO(err_thread, rc = -ENOMEM); @@ -2724,7 +2725,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) mds_handle, "mds_readpage", obd->obd_proc_entry, target_print_req, MDS_THREADS_MIN_READPAGE, mds_max_threads, - "ll_mdt_rdpg"); + "ll_mdt_rdpg", NULL); if (!mds->mds_readpage_service) { CERROR("failed to start readpage service\n"); GOTO(err_thread2, rc = -ENOMEM); diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index e8e190cb346852a3ea925a5d074bfebd7a70b764..e30a0f51572ae37f729deaf6c37709f2bfe8789d 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -220,7 +220,7 @@ static int mgs_setup(struct obd_device *obd, obd_count len, void *buf) mgs_handle, LUSTRE_MGS_NAME, obd->obd_proc_entry, NULL, MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX, - "ll_mgs"); + "ll_mgs", NULL); if (!mgs->mgs_service) { CERROR("failed to start service\n"); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 2102677313b21474db915e54b0f8fdc1179bb6e6..959048fbfc1a436f9b7451ad385405f4277be32d 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -677,6 +677,7 @@ void class_export_destroy(struct obd_export *exp) LASSERT(list_empty(&exp->exp_outstanding_replies)); LASSERT(list_empty(&exp->exp_req_replay_queue)); + LASSERT(list_empty(&exp->exp_queued_rpc)); obd_destroy_export(exp); OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle); @@ -703,6 +704,7 @@ struct obd_export *class_new_export(struct obd_device *obd, export->exp_obd = obd; CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies); CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue); + CFS_INIT_LIST_HEAD(&export->exp_queued_rpc); CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); class_handle_hash(&export->exp_handle, export_handle_addref); export->exp_last_request_time = cfs_time_current_sec(); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index fc40f5d473560eff817cc1c2533cc3235b929780..13faf6eeb7abd712861878c03c7925c5da717408 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -278,10 +278,9 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, /* check that we do support OBD_CONNECT_TRUNCLOCK. */ CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) - RETURN(-EFAULT); + /* ost_body is varified and swabbed in ost_hpreq_handler() */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); oinfo.oi_oa = &body->oa; oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size; @@ -596,7 +595,9 @@ static void ost_brw_lock_put(int mode, struct ost_prolong_data { struct obd_export *opd_exp; ldlm_policy_data_t opd_policy; + struct obdo *opd_oa; ldlm_mode_t opd_mode; + int opd_lock_match; }; static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) @@ -626,6 +627,14 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) return LDLM_ITER_CONTINUE; } + /* Fill the obdo with the matched lock handle. + * XXX: it is possible in some cases the IO RPC is covered by several + * locks, even for the write case, so it may need to be a lock list. */ + if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) { + opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie; + opd->opd_oa->o_valid |= OBD_MD_FLHANDLE; + } + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { /* ignore locks not being cancelled */ return LDLM_ITER_CONTINUE; @@ -634,17 +643,18 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) /* OK. this is a possible lock the user holds doing I/O * let's refresh eviction timer for it */ ldlm_refresh_waiting_lock(lock); + opd->opd_lock_match = 1; return LDLM_ITER_CONTINUE; } -static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, - struct niobuf_remote *nb, struct obdo *oa, - ldlm_mode_t mode) +static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, + struct niobuf_remote *nb, struct obdo *oa, + ldlm_mode_t mode) { struct ldlm_res_id res_id = { .name = { obj->ioo_id } }; int nrbufs = obj->ioo_bufcnt; - struct ost_prolong_data opd; + struct ost_prolong_data opd = { 0 }; ENTRY; @@ -664,16 +674,28 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, lock = ldlm_handle2lock(&oa->o_handle); if (lock != NULL) { ost_prolong_locks_iter(lock, &opd); + if (opd.opd_lock_match) { + LDLM_LOCK_PUT(lock); + RETURN(1); + } + + /* Check if the lock covers the whole IO region, + * otherwise iterate through the resource. */ + if (lock->l_policy_data.l_extent.end >= + opd.opd_policy.l_extent.end && + lock->l_policy_data.l_extent.start <= + opd.opd_policy.l_extent.start) { + LDLM_LOCK_PUT(lock); + RETURN(0); + } LDLM_LOCK_PUT(lock); - EXIT; - return; } } + opd.opd_oa = oa; ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, ost_prolong_locks_iter, &opd); - - EXIT; + RETURN(opd.opd_lock_match); } static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) @@ -708,39 +730,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Missing/short ost_body\n"); - GOTO(out, rc = -EFAULT); - } + /* ost_body, ioobj & noibuf_remote are verified and swabbed in + * ost_rw_hpreq_check(). */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); - ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo), - lustre_swab_obd_ioobj); - if (ioo == NULL) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo)); + LASSERT(ioo != NULL); niocount = ioo->ioo_bufcnt; - if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", - niocount); - GOTO(out, rc = -EFAULT); - } - - remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb), - lustre_swab_niobuf_remote); - if (remote_nb == NULL) { - CERROR("Missing/short niobuf\n"); - GOTO(out, rc = -EFAULT); - } - if (lustre_req_need_swab(req)) { - /* swab remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote (&remote_nb[i]); - } + remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*remote_nb)); + LASSERT(remote_nb != NULL); rc = lustre_pack_reply(req, 2, size, NULL); if (rc) @@ -797,7 +798,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (rc != 0) GOTO(out_lock, rc); - ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR); + ost_rw_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR); nob = 0; for (i = 0; i < npages; i++) { @@ -966,7 +967,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) __u32 *rcs; __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int objcount, niocount, npages; - int rc, swab, i, j; + int rc, i, j; obd_count client_cksum = 0, server_cksum = 0; cksum_type_t cksum_type = OBD_CKSUM_CRC32; int no_reply = 0; @@ -991,56 +992,22 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); - swab = lustre_req_need_swab(req); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Missing/short ost_body\n"); - GOTO(out, rc = -EFAULT); - } + /* ost_body, ioobj & noibuf_remote are verified and swabbed in + * ost_rw_hpreq_check(). */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / sizeof(*ioo); - if (objcount == 0) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } - if (objcount > 1) { - CERROR("too many ioobjs (%d)\n", objcount); - GOTO(out, rc = -EFAULT); - } - - lustre_set_req_swabbed(req, REQ_REC_OFF + 1); ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, objcount * sizeof(*ioo)); - LASSERT (ioo != NULL); - for (niocount = i = 0; i < objcount; i++) { - if (swab) - lustre_swab_obd_ioobj(&ioo[i]); - if (ioo[i].ioo_bufcnt == 0) { - CERROR("ioo[%d] has zero bufcnt\n", i); - GOTO(out, rc = -EFAULT); - } + LASSERT(ioo != NULL); + for (niocount = i = 0; i < objcount; i++) niocount += ioo[i].ioo_bufcnt; - } - if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", - niocount); - GOTO(out, rc = -EFAULT); - } - - remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb), - lustre_swab_niobuf_remote); - if (remote_nb == NULL) { - CERROR("Missing/short niobuf\n"); - GOTO(out, rc = -EFAULT); - } - if (swab) { /* swab the remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote (&remote_nb[i]); - } + remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*remote_nb)); + LASSERT(remote_nb != NULL); size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs); rc = lustre_pack_reply(req, 3, size, NULL); @@ -1097,7 +1064,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } - ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW); + ost_rw_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW); /* obd_preprw clobbers oa->valid, so save what we need */ if (body->oa.o_valid & OBD_MD_FLCKSUM) { @@ -1535,6 +1502,250 @@ int ost_msg_check_version(struct lustre_msg *msg) return rc; } +static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int objcount, niocount; + int mode, opc, i; + __u64 start, end; + ENTRY; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + LASSERT(opc == OST_READ || opc == OST_WRITE); + + /* As the request may be covered by several locks, do not look at + * o_handle, look at the RPC IO region. */ + body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), + lustre_swab_obdo); + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + objcount * sizeof(*ioo)); + LASSERT(ioo != NULL); + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + + nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*nb)); + LASSERT(nb != NULL); + + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; + + start = nb[0].offset & CFS_PAGE_MASK; + end = (nb[ioo->ioo_bufcnt - 1].offset + + nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK; + + if (!(lock->l_granted_mode & mode)) + RETURN(0); + + if (lock->l_policy_data.l_extent.end < start || + lock->l_policy_data.l_extent.start > end) + RETURN(0); + + RETURN(1); +} + +/** + * Swab buffers needed to call ost_rw_prolong_locks() and call it. + * Return the value from ost_rw_prolong_locks() which is non-zero if + * there is a cancelled lock which is waiting for this IO request. + */ +static int ost_rw_hpreq_check(struct ptlrpc_request *req) +{ + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int objcount, niocount; + int mode, opc, i; + ENTRY; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + LASSERT(opc == OST_READ || opc == OST_WRITE); + + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + objcount * sizeof(*ioo)); + LASSERT(ioo != NULL); + + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*nb)); + LASSERT(nb != NULL); + LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)); + + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; + RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode)); +} + +static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa) +{ + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + struct ost_prolong_data opd = { 0 }; + __u64 start, end; + ENTRY; + + start = oa->o_size; + end = start + oa->o_blocks; + + opd.opd_mode = LCK_PW; + opd.opd_exp = exp; + opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK; + if (oa->o_blocks == OBD_OBJECT_EOF || end < start) + opd.opd_policy.l_extent.end = OBD_OBJECT_EOF; + else + opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK; + + CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", + res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start, + opd.opd_policy.l_extent.end); + + opd.opd_oa = oa; + ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, + ost_prolong_locks_iter, &opd); + RETURN(opd.opd_lock_match); +} + +static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct ost_body *body; + ENTRY; + + body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), + lustre_swab_obdo); + LASSERT(body != NULL); + + if (body->oa.o_valid & OBD_MD_FLHANDLE && + body->oa.o_handle.cookie == lock->l_handle.h_cookie) + RETURN(1); + RETURN(0); +} + +static int ost_punch_hpreq_check(struct ptlrpc_request *req) +{ + struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, + REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) || + !(body->oa.o_flags & OBD_FL_TRUNCLOCK)); + + RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa)); +} + +struct ptlrpc_hpreq_ops ost_hpreq_rw = { + .hpreq_lock_match = ost_rw_hpreq_lock_match, + .hpreq_check = ost_rw_hpreq_check, +}; + +struct ptlrpc_hpreq_ops ost_hpreq_punch = { + .hpreq_lock_match = ost_punch_hpreq_lock_match, + .hpreq_check = ost_punch_hpreq_check, +}; + +/** Assign high priority operations to the request if needed. */ +static int ost_hpreq_handler(struct ptlrpc_request *req) +{ + ENTRY; + if (req->rq_export) { + int opc = lustre_msg_get_opc(req->rq_reqmsg); + struct ost_body *body; + + if (opc == OST_READ || opc == OST_WRITE) { + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + int objcount, niocount; + int swab, i; + + body = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(*body), + lustre_swab_obdo); + if (!body) { + CERROR("Missing/short ost_body\n"); + RETURN(-EFAULT); + } + objcount = lustre_msg_buflen(req->rq_reqmsg, + REQ_REC_OFF + 1) / + sizeof(*ioo); + if (objcount == 0) { + CERROR("Missing/short ioobj\n"); + RETURN(-EFAULT); + } + if (objcount > 1) { + CERROR("too many ioobjs (%d)\n", objcount); + RETURN(-EFAULT); + } + + swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) && + lustre_req_need_swab(req); + ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, + objcount * sizeof(*ioo), + lustre_swab_obd_ioobj); + if (!ioo) { + CERROR("Missing/short ioobj\n"); + RETURN(-EFAULT); + } + for (niocount = i = 0; i < objcount; i++) { + if (i > 0 && swab) + lustre_swab_obd_ioobj(&ioo[i]); + if (ioo[i].ioo_bufcnt == 0) { + CERROR("ioo[%d] has zero bufcnt\n", i); + RETURN(-EFAULT); + } + niocount += ioo[i].ioo_bufcnt; + } + if (niocount > PTLRPC_MAX_BRW_PAGES) { + DEBUG_REQ(D_ERROR, req, "bulk has too many " + "pages (%d)", niocount); + RETURN(-EFAULT); + } + + swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) && + lustre_req_need_swab(req); + nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, + niocount * sizeof(*nb), + lustre_swab_niobuf_remote); + if (!nb) { + CERROR("Missing/short niobuf\n"); + RETURN(-EFAULT); + } + + if (swab) { + /* swab remaining niobufs */ + for (i = 1; i < niocount; i++) + lustre_swab_niobuf_remote(&nb[i]); + } + + if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) + req->rq_ops = &ost_hpreq_rw; + } else if (opc == OST_PUNCH) { + body = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(*body), + lustre_swab_obdo); + if (!body) { + CERROR("Missing/short ost_body\n"); + RETURN(-EFAULT); + } + + if (!(body->oa.o_valid & OBD_MD_FLFLAGS) || + !(body->oa.o_flags & OBD_FL_TRUNCLOCK)) + req->rq_ops = &ost_hpreq_punch; + } + } + RETURN(0); +} + static int ost_handle(struct ptlrpc_request *req) { struct obd_trans_info trans_info = { 0, }; @@ -1877,7 +2088,7 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) /* Insure a 4x range for dynamic threads */ if (oss_min_threads > OSS_THREADS_MAX / 4) oss_min_threads = OSS_THREADS_MAX / 4; - oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4); + oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1); } ost->ost_service = @@ -1886,7 +2097,8 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR, ost_handle, LUSTRE_OSS_NAME, obd->obd_proc_entry, target_print_req, - oss_min_threads, oss_max_threads, "ll_ost"); + oss_min_threads, oss_max_threads, "ll_ost", + NULL); if (ost->ost_service == NULL) { CERROR("failed to start OST service\n"); GOTO(out_lprocfs, rc = -ENOMEM); @@ -1916,7 +2128,7 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) obd->obd_proc_entry, target_print_req, oss_min_create_threads, oss_max_create_threads, - "ll_ost_creat"); + "ll_ost_creat", NULL); if (ost->ost_create_service == NULL) { CERROR("failed to start OST create service\n"); GOTO(out_service, rc = -ENOMEM); @@ -1932,7 +2144,8 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR, ost_handle, "ost_io", obd->obd_proc_entry, target_print_req, - oss_min_threads, oss_max_threads, "ll_ost_io"); + oss_min_threads, oss_max_threads, "ll_ost_io", + ost_hpreq_handler); if (ost->ost_io_service == NULL) { CERROR("failed to start OST I/O service\n"); GOTO(out_create, rc = -ENOMEM); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index f425e516aa75e362e844c5e25848aa76f514c671..faa9b0f76e13b433733166559e5c5edd3ff0189d 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -593,6 +593,7 @@ ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode, CFS_INIT_LIST_HEAD(&request->rq_replay_list); CFS_INIT_LIST_HEAD(&request->rq_set_chain); CFS_INIT_LIST_HEAD(&request->rq_history_list); + CFS_INIT_LIST_HEAD(&request->rq_exp_list); cfs_waitq_init(&request->rq_reply_waitq); request->rq_xid = ptlrpc_next_xid(); atomic_set(&request->rq_refcount, 1); @@ -1554,6 +1555,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */ LASSERTF(list_empty(&request->rq_list), "req %p\n", request); LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request); + LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request); LASSERTF(!request->rq_replay, "req %p\n", request); /* We must take it off the imp_replay_list first. Otherwise, we'll set diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 37ef917fcad7a5785d5235329c224f904c65c47a..558c83e7e2bf68572d721e2a325e68127a7fba9a 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -507,6 +507,32 @@ static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off, return rc; } +static int ptlrpc_lprocfs_rd_hp_ratio(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct ptlrpc_service *svc = data; + int rc = snprintf(page, count, "%d", svc->srv_hpreq_ratio); + return rc; +} + +static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct ptlrpc_service *svc = data; + int rc, val; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc < 0) + return rc; + if (val < 0) + return -ERANGE; + + spin_lock(&svc->srv_lock); + svc->srv_hpreq_ratio = val; + spin_unlock(&svc->srv_lock); + return count; +} + void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry, struct ptlrpc_service *svc) { @@ -522,6 +548,10 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry, {.name = "timeouts", .read_fptr = ptlrpc_lprocfs_rd_timeouts, .data = svc}, + {.name = "high_priority_ratio", + .read_fptr = ptlrpc_lprocfs_rd_hp_ratio, + .write_fptr = ptlrpc_lprocfs_wr_hp_ratio, + .data = svc}, {NULL} }; static struct file_operations req_history_fops = { diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 8603df3a8b242c62ad596ad2b7aad8e8836575bd..9bb631f721718e07b0c5abc7c704f6204d921505 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1121,8 +1121,11 @@ void *lustre_swab_buf(struct lustre_msg *msg, int index, int min_size, void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size, void *swabber) { - if (!ptlrpc_reqbuf_need_swab(req, index)) - swabber = NULL; + if (lustre_req_swabbed(req, index)) + return lustre_msg_buf(req->rq_reqmsg, index, min_size); + + if (!lustre_req_need_swab(req)) + swabber = NULL; lustre_set_req_swabbed(req, index); return lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber); @@ -1131,7 +1134,10 @@ void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size, void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size, void *swabber) { - if (!ptlrpc_repbuf_need_swab(req, index)) + if (lustre_rep_swabbed(req, index)) + return lustre_msg_buf(req->rq_repmsg, index, min_size); + + if (!lustre_rep_need_swab(req)) swabber = NULL; lustre_set_rep_swabbed(req, index); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 9e8f5d29d7a18d7d24a6d8cab122aad140ce2ce9..0989c6c8b00eb66b06c40b5dc84295da3f99fb58 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -200,6 +200,7 @@ EXPORT_SYMBOL(ptlrpc_start_thread); EXPORT_SYMBOL(ptlrpc_unregister_service); EXPORT_SYMBOL(ptlrpc_daemonize); EXPORT_SYMBOL(ptlrpc_service_health_check); +EXPORT_SYMBOL(ptlrpc_hpreq_reorder); /* pack_generic.c */ EXPORT_SYMBOL(lustre_msg_check_version); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 72f80f9acb21cad8b550bfe0b8391320869575e1..310001787c73097ef0d92ef7c865068cab798dd0 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -302,7 +302,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, svc_handler_t handler, char *name, cfs_proc_dir_entry_t *proc_entry, svcreq_printfn_t svcreq_printfn, - int min_threads, int max_threads, char *threadname) + int min_threads, int max_threads, char *threadname, + svc_hpreq_handler_t hp_handler) { int rc; struct ptlrpc_service *service; @@ -335,11 +336,16 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_threads_min = min_threads; service->srv_threads_max = max_threads; service->srv_thread_name = threadname; + service->srv_hpreq_handler = hp_handler; + service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO; + service->srv_hpreq_count = 0; + service->srv_n_hpreq = 0; rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); CFS_INIT_LIST_HEAD(&service->srv_request_queue); + CFS_INIT_LIST_HEAD(&service->srv_request_hpq); CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds); CFS_INIT_LIST_HEAD(&service->srv_active_rqbds); CFS_INIT_LIST_HEAD(&service->srv_history_rqbds); @@ -497,6 +503,11 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) { struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + if (req->rq_export) { + class_export_put(req->rq_export); + req->rq_export = NULL; + } + if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ DEBUG_REQ(D_INFO, req, "free req"); @@ -514,7 +525,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) { struct obd_export *oldest_exp; - time_t oldest_time; + time_t oldest_time, new_time; ENTRY; @@ -525,9 +536,13 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) of the list, we can be really lazy here - we don't have to evict at the exact right moment. Eventually, all silent exports will make it to the top of the list. */ - exp->exp_last_request_time = max(exp->exp_last_request_time, - cfs_time_current_sec() + extra_delay); + /* Do not pay attention on 1sec or smaller renewals. */ + new_time = cfs_time_current_sec() + extra_delay; + if (exp->exp_last_request_time + 1 /*second */ >= new_time) + RETURN_EXIT; + + exp->exp_last_request_time = new_time; CDEBUG(D_INFO, "updating export %s at %ld\n", exp->exp_client_uuid.uuid, exp->exp_last_request_time); @@ -540,8 +555,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) if (list_empty(&exp->exp_obd_chain_timed)) { /* this one is not timed */ spin_unlock(&exp->exp_obd->obd_dev_lock); - EXIT; - return; + RETURN_EXIT; } list_move_tail(&exp->exp_obd_chain_timed, @@ -890,6 +904,167 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) RETURN(0); } +/** + * Put the request to the export list if the request may become + * a high priority one. + */ +static int ptlrpc_hpreq_init(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + int rc; + ENTRY; + + if (svc->srv_hpreq_handler) { + rc = svc->srv_hpreq_handler(req); + if (rc) + RETURN(rc); + } + if (req->rq_export && req->rq_ops) { + spin_lock(&req->rq_export->exp_lock); + list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc); + spin_unlock(&req->rq_export->exp_lock); + } + + RETURN(0); +} + +/** Remove the request from the export list. */ +static void ptlrpc_hpreq_fini(struct ptlrpc_request *req) +{ + ENTRY; + if (req->rq_export && req->rq_ops) { + spin_lock(&req->rq_export->exp_lock); + list_del_init(&req->rq_exp_list); + spin_unlock(&req->rq_export->exp_lock); + } + EXIT; +} + +/** + * Make the request a high priority one. + * + * All the high priority requests are queued in a separate FIFO + * ptlrpc_service::srv_request_hpq list which is parallel to + * ptlrpc_service::srv_request_queue list but has a higher priority + * for handling. + * + * \see ptlrpc_server_handle_request(). + */ +static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + ENTRY; + LASSERT(svc != NULL); + spin_lock(&req->rq_lock); + if (req->rq_hp == 0) { + int opc = lustre_msg_get_opc(req->rq_reqmsg); + + /* Add to the high priority queue. */ + list_move_tail(&req->rq_list, &svc->srv_request_hpq); + req->rq_hp = 1; + if (opc != OBD_PING) + DEBUG_REQ(D_NET, req, "high priority req"); + } + spin_unlock(&req->rq_lock); + EXIT; +} + +void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) +{ + struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + ENTRY; + + spin_lock(&svc->srv_lock); + /* It may happen that the request is already taken for the processing + * but still in the export list, do not re-add it into the HP list. */ + if (req->rq_phase == RQ_PHASE_NEW) + ptlrpc_hpreq_reorder_nolock(svc, req); + spin_unlock(&svc->srv_lock); + EXIT; +} + +/** Check if the request if a high priority one. */ +static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req) +{ + int opc, rc = 0; + ENTRY; + + /* Check by request opc. */ + opc = lustre_msg_get_opc(req->rq_reqmsg); + if (opc == OBD_PING) + RETURN(1); + + /* Perform request specific check. */ + if (req->rq_ops && req->rq_ops->hpreq_check) + rc = req->rq_ops->hpreq_check(req); + RETURN(rc); +} + +/** Check if a request is a high priority one. */ +static int ptlrpc_server_request_add(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + int rc; + ENTRY; + + rc = ptlrpc_server_hpreq_check(req); + if (rc < 0) + RETURN(rc); + + spin_lock(&svc->srv_lock); + /* Before inserting the request into the queue, check if it is not + * inserted yet, or even already handled -- it may happen due to + * a racing ldlm_server_blocking_ast(). */ + if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) { + if (rc) + ptlrpc_hpreq_reorder_nolock(svc, req); + else + list_add_tail(&req->rq_list, &svc->srv_request_queue); + } + spin_unlock(&svc->srv_lock); + + RETURN(0); +} + +/* Only allow normal priority requests on a service that has a high-priority + * queue if forced (i.e. cleanup), if there are other high priority requests + * already being processed (i.e. those threads can service more high-priority + * requests), or if there are enough idle threads that a later thread can do + * a high priority request. */ +static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) +{ + return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 || + svc->srv_n_active_reqs < svc->srv_threads_running - 2; +} + +static struct ptlrpc_request * +ptlrpc_server_request_get(struct ptlrpc_service *svc, int force) +{ + struct ptlrpc_request *req = NULL; + ENTRY; + + if (ptlrpc_server_allow_normal(svc, force) && + !list_empty(&svc->srv_request_queue) && + (list_empty(&svc->srv_request_hpq) || + svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) { + req = list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count = 0; + } else if (!list_empty(&svc->srv_request_hpq)) { + req = list_entry(svc->srv_request_hpq.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count++; + } + RETURN(req); +} + +static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) +{ + return ((ptlrpc_server_allow_normal(svc, force) && + !list_empty(&svc->srv_request_queue)) || + !list_empty(&svc->srv_request_hpq)); +} + /* Handle freshly incoming reqs, add to timed early reply list, pass on to regular request queue */ static int @@ -951,10 +1126,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) lustre_msg_get_handle(req->rq_reqmsg)); if (req->rq_export) { rc = ptlrpc_check_req(req); - class_export_put(req->rq_export); - req->rq_export = NULL; if (rc) goto err_req; + ptlrpc_update_export_timer(req->rq_export, 0); } /* req_in handling should/must be fast */ @@ -974,12 +1148,15 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) } ptlrpc_at_add_timed(req); + rc = ptlrpc_hpreq_init(svc, req); + if (rc) + GOTO(err_req, rc); /* Move it over to the request processing queue */ - spin_lock(&svc->srv_lock); - list_add_tail(&req->rq_list, &svc->srv_request_queue); + rc = ptlrpc_server_request_add(svc, req); + if (rc) + GOTO(err_req, rc); cfs_waitq_signal(&svc->srv_waitq); - spin_unlock(&svc->srv_lock); RETURN(1); err_req: @@ -1001,13 +1178,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, struct timeval work_start; struct timeval work_end; long timediff; - int rc; + int opc, rc; + int fail_opc = 0; ENTRY; LASSERT(svc); spin_lock(&svc->srv_lock); - if (list_empty (&svc->srv_request_queue) || + if (!ptlrpc_server_request_pending(svc, 0) || ( #ifndef __KERNEL__ /* !@%$# liblustre only has 1 thread */ @@ -1022,14 +1200,46 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, RETURN(0); } - request = list_entry (svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); - list_del_init (&request->rq_list); + request = ptlrpc_server_request_get(svc, 0); + if (request == NULL) { + spin_unlock(&svc->srv_lock); + RETURN(0); + } + + opc = lustre_msg_get_opc(request->rq_reqmsg); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT)) + fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT; + else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) + fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT; + + if (unlikely(fail_opc)) { + if (request->rq_export && request->rq_ops) { + spin_unlock(&svc->srv_lock); + OBD_FAIL_TIMEOUT(fail_opc, 4); + spin_lock(&svc->srv_lock); + request = ptlrpc_server_request_get(svc, 0); + if (request == NULL) { + spin_unlock(&svc->srv_lock); + RETURN(0); + } + LASSERT(ptlrpc_server_request_pending(svc, 0)); + } + } + + list_del_init(&request->rq_list); svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; + if (request->rq_hp) + svc->srv_n_hpreq++; + + /* The phase is changed under the lock here because we need to know + * the request is under processing (see ptlrpc_hpreq_reorder()). */ + ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); spin_unlock(&svc->srv_lock); + ptlrpc_hpreq_fini(request); + if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG)) libcfs_debug_dumplog(); @@ -1049,9 +1259,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid); request->rq_svc_thread = thread; - request->rq_export = class_conn2export( - lustre_msg_get_handle(request->rq_reqmsg)); - if (request->rq_export) { if (ptlrpc_check_req(request)) goto put_conn; @@ -1071,8 +1278,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, goto put_rpc_export; } - ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); - CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc " "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), (request->rq_export ? @@ -1104,9 +1309,6 @@ put_rpc_export: class_export_rpc_put(export); put_conn: - if (request->rq_export != NULL) - class_export_put(request->rq_export); - if (cfs_time_current_sec() > request->rq_deadline) { DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer " "than estimated (%ld%+lds); client may timeout.", @@ -1143,6 +1345,10 @@ put_conn: work_end.tv_sec - request->rq_arrival_time.tv_sec); } + spin_lock(&svc->srv_lock); + if (request->rq_hp) + svc->srv_n_hpreq--; + spin_unlock(&svc->srv_lock); ptlrpc_server_finish_request(request); RETURN(1); @@ -1426,7 +1632,7 @@ static int ptlrpc_main(void *arg) svc->srv_rqbd_timeout == 0) || !list_empty(&svc->srv_req_in_queue) || !list_empty(&svc->srv_reply_queue) || - (!list_empty(&svc->srv_request_queue) && + (ptlrpc_server_request_pending(svc, 0) && (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) || svc->srv_at_check, @@ -1461,7 +1667,7 @@ static int ptlrpc_main(void *arg) ptlrpc_at_check_timed(svc); /* don't handle requests in the last thread */ - if (!list_empty (&svc->srv_request_queue) && + if (ptlrpc_server_request_pending(svc, 0) && (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) ptlrpc_server_handle_request(svc, thread); @@ -1706,15 +1912,14 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_n_active_reqs++; ptlrpc_server_finish_request(req); } - while (!list_empty(&service->srv_request_queue)) { - struct ptlrpc_request *req = - list_entry(service->srv_request_queue.next, - struct ptlrpc_request, - rq_list); + while (ptlrpc_server_request_pending(service, 1)) { + struct ptlrpc_request *req; + req = ptlrpc_server_request_get(service, 1); list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; + ptlrpc_hpreq_fini(req); ptlrpc_server_finish_request(req); } LASSERT(service->srv_n_queued_reqs == 0); @@ -1778,14 +1983,18 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) do_gettimeofday(&right_now); spin_lock(&svc->srv_lock); - if (list_empty(&svc->srv_request_queue)) { + if (!ptlrpc_server_request_pending(svc, 1)) { spin_unlock(&svc->srv_lock); return 0; } /* How long has the next entry been waiting? */ - request = list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); + if (list_empty(&svc->srv_request_queue)) + request = list_entry(svc->srv_request_hpq.next, + struct ptlrpc_request, rq_list); + else + request = list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); spin_unlock(&svc->srv_lock); diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index f8ee35487746965803fdafde9d24dd79b581e8ca..727ae08e5f711cb75ef60009f9ff0c649295373f 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -689,6 +689,56 @@ test_32b() { # bug 11270 } run_test 32b "lockless i/o" +test_33() { #16129 + for OPER in notimeout timeout ; do + rm $DIR1/$tfile 2>/dev/null + lock_in=0; + for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do + lock_in=$(($lock_in + $f)) + done + if [ $OPER == "timeout" ] ; then + for j in `seq $OSTCOUNT`; do + #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510 + do_facet ost$j lctl set_param fail_loc=0x510 + done + echo lock should expire + else + for j in `seq $OSTCOUNT`; do + #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511 + do_facet ost$j lctl set_param fail_loc=0x511 + done + echo lock should not expire + fi + echo writing on client1 + dd if=/dev/zero of=$DIR1/$tfile count=100 conv=notrunc > /dev/null 2>&1 + sync & + # wait for the flush + sleep 1 + echo reading on client2 + dd of=/dev/null if=$DIR2/$tfile > /dev/null 2>&1 + # wait for a lock timeout + sleep 4 + lock_out=0 + for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do + lock_out=$(($lock_out + $f)) + done + if [ $OPER == "timeout" ] ; then + if [ $lock_in == $lock_out ]; then + error "no lock timeout happened" + else + echo "success" + fi + else + if [ $lock_in != $lock_out ]; then + error "lock timeout happened" + else + echo "success" + fi + fi + done +} +run_test 33 "no lock timeout under IO" + log "cleanup: ======================================================" check_and_cleanup_lustre