-
Hongchao Zhang authored
1. In mgc_set_info_async(KEY=KEY_INIT_RECOV_BACKUP "init_recov_bk"), the MGC import should be reconnected only if its state is LUSTRE_IMP_DISCON 2. in mgc_target_register, if the target will regenerate the config, we should use some longer delay limit to wait the MGC to connect to MGS for the target (server) will fail to exit if the request expired due to delay limit. 3. In case of parallel mount, the async cleanup of OSS will affects the following mount for the OSS can't be setup again, then there should be some barrier to sync with the OSS cleanup. Lustre-change: https://review.whamcloud.com/10229 Lustre-commit: 1a7ff02c Change-Id: I805b84cf12100ec2cc68f95bb65a9c396e0fbc1b Signed-off-by:
Hongchao Zhang <hongchao.zhang@intel.com> Reviewed-by:
Fan Yong <fan.yong@intel.com> Reviewed-by:
Andreas Dilger <andreas.dilger@intel.com> Signed-off-by:
Minh Diep <minh.diep@intel.com> Reviewed-on: https://review.whamcloud.com/30581 Tested-by: Jenkins Tested-by:
Maloo <hpdd-maloo@intel.com> Reviewed-by:
John L. Hammond <john.hammond@intel.com>
Hongchao Zhang authored1. In mgc_set_info_async(KEY=KEY_INIT_RECOV_BACKUP "init_recov_bk"), the MGC import should be reconnected only if its state is LUSTRE_IMP_DISCON 2. in mgc_target_register, if the target will regenerate the config, we should use some longer delay limit to wait the MGC to connect to MGS for the target (server) will fail to exit if the request expired due to delay limit. 3. In case of parallel mount, the async cleanup of OSS will affects the following mount for the OSS can't be setup again, then there should be some barrier to sync with the OSS cleanup. Lustre-change: https://review.whamcloud.com/10229 Lustre-commit: 1a7ff02c Change-Id: I805b84cf12100ec2cc68f95bb65a9c396e0fbc1b Signed-off-by:
Hongchao Zhang <hongchao.zhang@intel.com> Reviewed-by:
Fan Yong <fan.yong@intel.com> Reviewed-by:
Andreas Dilger <andreas.dilger@intel.com> Signed-off-by:
Minh Diep <minh.diep@intel.com> Reviewed-on: https://review.whamcloud.com/30581 Tested-by: Jenkins Tested-by:
Maloo <hpdd-maloo@intel.com> Reviewed-by:
John L. Hammond <john.hammond@intel.com>
genops.c 70.63 KiB
/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 only,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License version 2 for more details (a copy is included
* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
* http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
/*
* Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
* Copyright (c) 2011, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/obdclass/genops.c
*
* These are the only exported functions, they provide some generic
* infrastructure for managing object devices
*/
#define DEBUG_SUBSYSTEM S_CLASS
#include <linux/pid_namespace.h>
#include <linux/kthread.h>
#include <obd_class.h>
#include <lustre_log.h>
#include <lprocfs_status.h>
#include <lustre_disk.h>
#include <lustre_kernelcomm.h>
static DEFINE_SPINLOCK(obd_types_lock);
static LIST_HEAD(obd_types);
DEFINE_RWLOCK(obd_dev_lock);
static struct obd_device *obd_devs[MAX_OBD_DEVICES];
static struct kmem_cache *obd_device_cachep;
struct kmem_cache *obdo_cachep;
EXPORT_SYMBOL(obdo_cachep);
static struct kmem_cache *import_cachep;
static LIST_HEAD(obd_zombie_imports);
static LIST_HEAD(obd_zombie_exports);
static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
static void obd_zombie_impexp_notify(void);
static void obd_zombie_export_add(struct obd_export *exp);
static void obd_zombie_import_add(struct obd_import *imp);
static void print_export_data(struct obd_export *exp,
const char *status, int locks, int debug_level);
static LIST_HEAD(obd_stale_exports);
static DEFINE_SPINLOCK(obd_stale_export_lock);
static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
/*
* support functions: we could use inter-module communication, but this
* is more portable to other OS's
*/
static struct obd_device *obd_device_alloc(void)
{
struct obd_device *obd;
OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
if (obd != NULL) {
obd->obd_magic = OBD_DEVICE_MAGIC;
}
return obd;
}
static void obd_device_free(struct obd_device *obd)
{
LASSERT(obd != NULL);
LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
obd, obd->obd_magic, OBD_DEVICE_MAGIC);
if (obd->obd_namespace != NULL) {
CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
obd, obd->obd_namespace, obd->obd_force);
LBUG();
}
lu_ref_fini(&obd->obd_reference);
OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
}
struct obd_type *class_search_type(const char *name)
{
struct list_head *tmp;
struct obd_type *type;
spin_lock(&obd_types_lock);
list_for_each(tmp, &obd_types) {
type = list_entry(tmp, struct obd_type, typ_chain);
if (strcmp(type->typ_name, name) == 0) {
spin_unlock(&obd_types_lock);
return type;
}
}
spin_unlock(&obd_types_lock);
return NULL;
}
EXPORT_SYMBOL(class_search_type);
struct obd_type *class_get_type(const char *name)
{
struct obd_type *type = class_search_type(name);
#ifdef HAVE_MODULE_LOADING_SUPPORT
if (!type) {
const char *modname = name;
if (strcmp(modname, "obdfilter") == 0)
modname = "ofd";
if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
modname = LUSTRE_OSP_NAME;
if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
modname = LUSTRE_MDT_NAME;
if (!request_module("%s", modname)) {
CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
type = class_search_type(name);
} else {
LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
modname);
}
}
#endif
if (type) {
spin_lock(&type->obd_type_lock);
type->typ_refcnt++;
try_module_get(type->typ_dt_ops->o_owner);
spin_unlock(&type->obd_type_lock);
}
return type;
}
void class_put_type(struct obd_type *type)
{
LASSERT(type);
spin_lock(&type->obd_type_lock);
type->typ_refcnt--;
module_put(type->typ_dt_ops->o_owner);
spin_unlock(&type->obd_type_lock);
}
#define CLASS_MAX_NAME 1024
int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
bool enable_proc, struct lprocfs_vars *vars,
const char *name, struct lu_device_type *ldt)
{
struct obd_type *type;
int rc = 0;
ENTRY;
/* sanity check */
LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
if (class_search_type(name)) {
CDEBUG(D_IOCTL, "Type %s already registered\n", name);
RETURN(-EEXIST);
}
rc = -ENOMEM;
OBD_ALLOC(type, sizeof(*type));
if (type == NULL)
RETURN(rc);
OBD_ALLOC_PTR(type->typ_dt_ops);
OBD_ALLOC_PTR(type->typ_md_ops);
OBD_ALLOC(type->typ_name, strlen(name) + 1);
if (type->typ_dt_ops == NULL ||
type->typ_md_ops == NULL ||
type->typ_name == NULL)
GOTO (failed, rc);
*(type->typ_dt_ops) = *dt_ops;
/* md_ops is optional */
if (md_ops)
*(type->typ_md_ops) = *md_ops;
strcpy(type->typ_name, name);
spin_lock_init(&type->obd_type_lock);
#ifdef CONFIG_PROC_FS
if (enable_proc) {
type->typ_procroot = lprocfs_register(type->typ_name,
proc_lustre_root,
vars, type);
if (IS_ERR(type->typ_procroot)) {
rc = PTR_ERR(type->typ_procroot);
type->typ_procroot = NULL;
GOTO(failed, rc);
}
}
#endif
if (ldt != NULL) {
type->typ_lu = ldt;
rc = lu_device_type_init(ldt);
if (rc != 0)
GOTO (failed, rc);
}
spin_lock(&obd_types_lock);
list_add(&type->typ_chain, &obd_types);
spin_unlock(&obd_types_lock);
RETURN (0);
failed:
if (type->typ_name != NULL) {
#ifdef CONFIG_PROC_FS
if (type->typ_procroot != NULL)
remove_proc_subtree(type->typ_name, proc_lustre_root);
#endif
OBD_FREE(type->typ_name, strlen(name) + 1);
}
if (type->typ_md_ops != NULL)
OBD_FREE_PTR(type->typ_md_ops);
if (type->typ_dt_ops != NULL)
OBD_FREE_PTR(type->typ_dt_ops);
OBD_FREE(type, sizeof(*type));
RETURN(rc);
}
EXPORT_SYMBOL(class_register_type);
int class_unregister_type(const char *name)
{
struct obd_type *type = class_search_type(name);
ENTRY;
if (!type) {
CERROR("unknown obd type\n");
RETURN(-EINVAL);
}
if (type->typ_refcnt) {
CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
/* This is a bad situation, let's make the best of it */
/* Remove ops, but leave the name for debugging */
OBD_FREE_PTR(type->typ_dt_ops);
OBD_FREE_PTR(type->typ_md_ops);
RETURN(-EBUSY);
}
/* we do not use type->typ_procroot as for compatibility purposes
* other modules can share names (i.e. lod can use lov entry). so
* we can't reference pointer as it can get invalided when another
* module removes the entry */
#ifdef CONFIG_PROC_FS
if (type->typ_procroot != NULL)
remove_proc_subtree(type->typ_name, proc_lustre_root);
if (type->typ_procsym != NULL)
lprocfs_remove(&type->typ_procsym);
#endif
if (type->typ_lu)
lu_device_type_fini(type->typ_lu);
spin_lock(&obd_types_lock);
list_del(&type->typ_chain);
spin_unlock(&obd_types_lock);
OBD_FREE(type->typ_name, strlen(name) + 1);
if (type->typ_dt_ops != NULL)
OBD_FREE_PTR(type->typ_dt_ops);
if (type->typ_md_ops != NULL)
OBD_FREE_PTR(type->typ_md_ops);
OBD_FREE(type, sizeof(*type));
RETURN(0);
} /* class_unregister_type */
EXPORT_SYMBOL(class_unregister_type);
/**
* Create a new obd device.
*
* Allocate the new obd_device and initialize it.
*
* \param[in] type_name obd device type string.
* \param[in] name obd device name.
* \param[in] uuid obd device UUID
*
* \retval newdev pointer to created obd_device
* \retval ERR_PTR(errno) on error
*/
struct obd_device *class_newdev(const char *type_name, const char *name,
const char *uuid)
{
struct obd_device *newdev;
struct obd_type *type = NULL;
ENTRY;
if (strlen(name) >= MAX_OBD_NAME) {
CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
RETURN(ERR_PTR(-EINVAL));
}
type = class_get_type(type_name);
if (type == NULL){
CERROR("OBD: unknown type: %s\n", type_name);
RETURN(ERR_PTR(-ENODEV));
}
newdev = obd_device_alloc();
if (newdev == NULL) {
class_put_type(type);
RETURN(ERR_PTR(-ENOMEM));
}
LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
newdev->obd_type = type;
newdev->obd_minor = -1;
rwlock_init(&newdev->obd_pool_lock);
newdev->obd_pool_limit = 0;
newdev->obd_pool_slv = 0;
INIT_LIST_HEAD(&newdev->obd_exports);
INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
INIT_LIST_HEAD(&newdev->obd_delayed_exports);
INIT_LIST_HEAD(&newdev->obd_exports_timed);
INIT_LIST_HEAD(&newdev->obd_nid_stats);
spin_lock_init(&newdev->obd_nid_lock);
spin_lock_init(&newdev->obd_dev_lock);
mutex_init(&newdev->obd_dev_mutex);
spin_lock_init(&newdev->obd_osfs_lock);
/* newdev->obd_osfs_age must be set to a value in the distant
* past to guarantee a fresh statfs is fetched on mount. */
newdev->obd_osfs_age = cfs_time_shift_64(-1000);
/* XXX belongs in setup not attach */
init_rwsem(&newdev->obd_observer_link_sem);
/* recovery data */
init_timer(&newdev->obd_recovery_timer);
spin_lock_init(&newdev->obd_recovery_task_lock);
init_waitqueue_head(&newdev->obd_next_transno_waitq);
init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
INIT_LIST_HEAD(&newdev->obd_final_req_queue);
INIT_LIST_HEAD(&newdev->obd_evict_list);
INIT_LIST_HEAD(&newdev->obd_lwp_list);
llog_group_init(&newdev->obd_olg);
/* Detach drops this */
atomic_set(&newdev->obd_refcount, 1);
lu_ref_init(&newdev->obd_reference);
lu_ref_add(&newdev->obd_reference, "newdev", newdev);
newdev->obd_conn_inprogress = 0;
strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
newdev->obd_name, newdev);
return newdev;
}
/**
* Free obd device.
*
* \param[in] obd obd_device to be freed
*
* \retval none
*/
void class_free_dev(struct obd_device *obd)
{
struct obd_type *obd_type = obd->obd_type;
LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
"!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
"obd %p != obd_devs[%d] %p\n",
obd, obd->obd_minor, obd_devs[obd->obd_minor]);
LASSERTF(atomic_read(&obd->obd_refcount) == 0,
"obd_refcount should be 0, not %d\n",
atomic_read(&obd->obd_refcount));
LASSERT(obd_type != NULL);
CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
obd->obd_name, obd->obd_type->typ_name);
CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
obd->obd_name, obd->obd_uuid.uuid);
if (obd->obd_stopping) {
int err;
/* If we're not stopping, we were never set up */
err = obd_cleanup(obd);
if (err)
CERROR("Cleanup %s returned %d\n",
obd->obd_name, err);
}
obd_device_free(obd);
class_put_type(obd_type);
}
/**
* Unregister obd device.
*
* Free slot in obd_dev[] used by \a obd.
*
* \param[in] new_obd obd_device to be unregistered
*
* \retval none
*/
void class_unregister_device(struct obd_device *obd)
{
write_lock(&obd_dev_lock);
if (obd->obd_minor >= 0) {
LASSERT(obd_devs[obd->obd_minor] == obd);
obd_devs[obd->obd_minor] = NULL;
obd->obd_minor = -1;
}
write_unlock(&obd_dev_lock);
}
/**
* Register obd device.
*
* Find free slot in obd_devs[], fills it with \a new_obd.
*
* \param[in] new_obd obd_device to be registered
*
* \retval 0 success
* \retval -EEXIST device with this name is registered
* \retval -EOVERFLOW obd_devs[] is full
*/
int class_register_device(struct obd_device *new_obd)
{
int ret = 0;
int i;
int new_obd_minor = 0;
bool minor_assign = false;
bool retried = false;
again:
write_lock(&obd_dev_lock);
for (i = 0; i < class_devno_max(); i++) {
struct obd_device *obd = class_num2obd(i);
if (obd != NULL &&
(strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
if (!retried) {
write_unlock(&obd_dev_lock);
/* the obd_device could be waited to be
* destroyed by the "obd_zombie_impexp_thread".
*/
obd_zombie_barrier();
retried = true;
goto again;
}
CERROR("%s: already exists, won't add\n",
obd->obd_name);
/* in case we found a free slot before duplicate */
minor_assign = false;
ret = -EEXIST;
break;
}
if (!minor_assign && obd == NULL) {
new_obd_minor = i;
minor_assign = true;
}
}
if (minor_assign) {
new_obd->obd_minor = new_obd_minor;
LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
"%p\n", new_obd_minor, obd_devs[new_obd_minor]);
obd_devs[new_obd_minor] = new_obd;
} else {
if (ret == 0) {
ret = -EOVERFLOW;
CERROR("%s: all %u/%u devices used, increase "
"MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
i, class_devno_max(), ret);
}
}
write_unlock(&obd_dev_lock);
RETURN(ret);
}
static int class_name2dev_nolock(const char *name)
{
int i;
if (!name)
return -1;
for (i = 0; i < class_devno_max(); i++) {
struct obd_device *obd = class_num2obd(i);
if (obd && strcmp(name, obd->obd_name) == 0) {
/* Make sure we finished attaching before we give
out any references */
LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
if (obd->obd_attached) {
return i;
}
break;
}
}
return -1;
}
int class_name2dev(const char *name)
{
int i;
if (!name)
return -1;
read_lock(&obd_dev_lock);
i = class_name2dev_nolock(name);
read_unlock(&obd_dev_lock);
return i;
}
EXPORT_SYMBOL(class_name2dev);
struct obd_device *class_name2obd(const char *name)
{
int dev = class_name2dev(name);
if (dev < 0 || dev > class_devno_max())
return NULL;
return class_num2obd(dev);
}
EXPORT_SYMBOL(class_name2obd);
int class_uuid2dev_nolock(struct obd_uuid *uuid)
{
int i;
for (i = 0; i < class_devno_max(); i++) {
struct obd_device *obd = class_num2obd(i);
if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
return i;
}
}
return -1;
}
int class_uuid2dev(struct obd_uuid *uuid)
{
int i;
read_lock(&obd_dev_lock);
i = class_uuid2dev_nolock(uuid);
read_unlock(&obd_dev_lock);
return i;
}
EXPORT_SYMBOL(class_uuid2dev);
struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
{
int dev = class_uuid2dev(uuid);
if (dev < 0)
return NULL;
return class_num2obd(dev);
}
EXPORT_SYMBOL(class_uuid2obd);
/**
* Get obd device from ::obd_devs[]
*
* \param num [in] array index
*
* \retval NULL if ::obd_devs[\a num] does not contains an obd device
* otherwise return the obd device there.
*/
struct obd_device *class_num2obd(int num)
{
struct obd_device *obd = NULL;
if (num < class_devno_max()) {
obd = obd_devs[num];
if (obd == NULL)
return NULL;
LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
"%p obd_magic %08x != %08x\n",
obd, obd->obd_magic, OBD_DEVICE_MAGIC);
LASSERTF(obd->obd_minor == num,
"%p obd_minor %0d != %0d\n",
obd, obd->obd_minor, num);
}
return obd;
}
/**
* Find obd in obd_dev[] by name or uuid.
*
* Increment obd's refcount if found.
*
* \param[in] str obd name or uuid
*
* \retval NULL if not found
* \retval target pointer to found obd_device
*/
struct obd_device *class_dev_by_str(const char *str)
{
struct obd_device *target = NULL;
struct obd_uuid tgtuuid;
int rc;
obd_str2uuid(&tgtuuid, str);
read_lock(&obd_dev_lock);
rc = class_uuid2dev_nolock(&tgtuuid);
if (rc < 0)
rc = class_name2dev_nolock(str);
if (rc >= 0)
target = class_num2obd(rc);
if (target != NULL)
class_incref(target, "find", current);
read_unlock(&obd_dev_lock);
RETURN(target);
}
EXPORT_SYMBOL(class_dev_by_str);
/**
* Get obd devices count. Device in any
* state are counted
* \retval obd device count
*/
int get_devices_count(void)
{
int index, max_index = class_devno_max(), dev_count = 0;
read_lock(&obd_dev_lock);
for (index = 0; index <= max_index; index++) {
struct obd_device *obd = class_num2obd(index);
if (obd != NULL)
dev_count++;
}
read_unlock(&obd_dev_lock);
return dev_count;
}
EXPORT_SYMBOL(get_devices_count);
void class_obd_list(void)
{
char *status;
int i;
read_lock(&obd_dev_lock);
for (i = 0; i < class_devno_max(); i++) {
struct obd_device *obd = class_num2obd(i);
if (obd == NULL)
continue;
if (obd->obd_stopping)
status = "ST";
else if (obd->obd_set_up)
status = "UP";
else if (obd->obd_attached)
status = "AT";
else
status = "--";
LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
i, status, obd->obd_type->typ_name,
obd->obd_name, obd->obd_uuid.uuid,
atomic_read(&obd->obd_refcount));
}
read_unlock(&obd_dev_lock);
return;
}
/* Search for a client OBD connected to tgt_uuid. If grp_uuid is
specified, then only the client with that uuid is returned,
otherwise any client connected to the tgt is returned. */
struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
const char * typ_name,
struct obd_uuid *grp_uuid)
{
int i;
read_lock(&obd_dev_lock);
for (i = 0; i < class_devno_max(); i++) {
struct obd_device *obd = class_num2obd(i);
if (obd == NULL)
continue;
if ((strncmp(obd->obd_type->typ_name, typ_name,
strlen(typ_name)) == 0)) {
if (obd_uuid_equals(tgt_uuid,
&obd->u.cli.cl_target_uuid) &&
((grp_uuid)? obd_uuid_equals(grp_uuid,
&obd->obd_uuid) : 1)) {
read_unlock(&obd_dev_lock);
return obd;
}
}
}
read_unlock(&obd_dev_lock);
return NULL;
}
EXPORT_SYMBOL(class_find_client_obd);
/* Iterate the obd_device list looking devices have grp_uuid. Start
searching at *next, and if a device is found, the next index to look
at is saved in *next. If next is NULL, then the first matching device
will always be returned. */
struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
{
int i;
if (next == NULL)
i = 0;
else if (*next >= 0 && *next < class_devno_max())
i = *next;
else
return NULL;
read_lock(&obd_dev_lock);
for (; i < class_devno_max(); i++) {
struct obd_device *obd = class_num2obd(i);
if (obd == NULL)
continue;
if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
if (next != NULL)
*next = i+1;
read_unlock(&obd_dev_lock);
return obd;
}
}
read_unlock(&obd_dev_lock);
return NULL;
}
EXPORT_SYMBOL(class_devices_in_group);
/**
* to notify sptlrpc log for \a fsname has changed, let every relevant OBD
* adjust sptlrpc settings accordingly.
*/
int class_notify_sptlrpc_conf(const char *fsname, int namelen)
{
struct obd_device *obd;
const char *type;
int i, rc = 0, rc2;
LASSERT(namelen > 0);
read_lock(&obd_dev_lock);
for (i = 0; i < class_devno_max(); i++) {
obd = class_num2obd(i);
if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
continue;
/* only notify mdc, osc, osp, lwp, mdt, ost
* because only these have a -sptlrpc llog */
type = obd->obd_type->typ_name;
if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
strcmp(type, LUSTRE_OSC_NAME) != 0 &&
strcmp(type, LUSTRE_OSP_NAME) != 0 &&
strcmp(type, LUSTRE_LWP_NAME) != 0 &&
strcmp(type, LUSTRE_MDT_NAME) != 0 &&
strcmp(type, LUSTRE_OST_NAME) != 0)
continue;
if (strncmp(obd->obd_name, fsname, namelen))
continue;
class_incref(obd, __FUNCTION__, obd);
read_unlock(&obd_dev_lock);
rc2 = obd_set_info_async(NULL, obd->obd_self_export,
sizeof(KEY_SPTLRPC_CONF),
KEY_SPTLRPC_CONF, 0, NULL, NULL);
rc = rc ? rc : rc2;
class_decref(obd, __FUNCTION__, obd);
read_lock(&obd_dev_lock);
}
read_unlock(&obd_dev_lock);
return rc;
}
EXPORT_SYMBOL(class_notify_sptlrpc_conf);
void obd_cleanup_caches(void)
{
ENTRY;
if (obd_device_cachep) {
kmem_cache_destroy(obd_device_cachep);
obd_device_cachep = NULL;
}
if (obdo_cachep) {
kmem_cache_destroy(obdo_cachep);
obdo_cachep = NULL;
}
if (import_cachep) {
kmem_cache_destroy(import_cachep);
import_cachep = NULL;
}
EXIT;
}
int obd_init_caches(void)
{
int rc;
ENTRY;
LASSERT(obd_device_cachep == NULL);
obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
sizeof(struct obd_device),
0, 0, NULL);
if (!obd_device_cachep)
GOTO(out, rc = -ENOMEM);
LASSERT(obdo_cachep == NULL);
obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
0, 0, NULL);
if (!obdo_cachep)
GOTO(out, rc = -ENOMEM);
LASSERT(import_cachep == NULL);
import_cachep = kmem_cache_create("ll_import_cache",
sizeof(struct obd_import),
0, 0, NULL);
if (!import_cachep)
GOTO(out, rc = -ENOMEM);
RETURN(0);
out:
obd_cleanup_caches();
RETURN(rc);
}
/* map connection to client */
struct obd_export *class_conn2export(struct lustre_handle *conn)
{
struct obd_export *export;
ENTRY;
if (!conn) {
CDEBUG(D_CACHE, "looking for null handle\n");
RETURN(NULL);
}
if (conn->cookie == -1) { /* this means assign a new connection */
CDEBUG(D_CACHE, "want a new connection\n");
RETURN(NULL);
}
CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
export = class_handle2object(conn->cookie, NULL);
RETURN(export);
}
EXPORT_SYMBOL(class_conn2export);
struct obd_device *class_exp2obd(struct obd_export *exp)
{
if (exp)
return exp->exp_obd;
return NULL;
}
EXPORT_SYMBOL(class_exp2obd);
struct obd_device *class_conn2obd(struct lustre_handle *conn)
{
struct obd_export *export;
export = class_conn2export(conn);
if (export) {
struct obd_device *obd = export->exp_obd;
class_export_put(export);
return obd;
}
return NULL;
}
struct obd_import *class_exp2cliimp(struct obd_export *exp)
{
struct obd_device *obd = exp->exp_obd;
if (obd == NULL)
return NULL;
return obd->u.cli.cl_import;
}
EXPORT_SYMBOL(class_exp2cliimp);
struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
{
struct obd_device *obd = class_conn2obd(conn);
if (obd == NULL)
return NULL;
return obd->u.cli.cl_import;
}
/* Export management functions */
static void class_export_destroy(struct obd_export *exp)
{
struct obd_device *obd = exp->exp_obd;
ENTRY;
LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
LASSERT(obd != NULL);
CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
exp->exp_client_uuid.uuid, obd->obd_name);
/* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
if (exp->exp_connection)
ptlrpc_put_connection_superhack(exp->exp_connection);
LASSERT(list_empty(&exp->exp_outstanding_replies));
LASSERT(list_empty(&exp->exp_uncommitted_replies));
LASSERT(list_empty(&exp->exp_req_replay_queue));
LASSERT(list_empty(&exp->exp_hp_rpcs));
obd_destroy_export(exp);
/* self export doesn't hold a reference to an obd, although it
* exists until freeing of the obd */
if (exp != obd->obd_self_export)
class_decref(obd, "export", exp);
OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
EXIT;
}
static void export_handle_addref(void *export)
{
class_export_get(export);
}
static struct portals_handle_ops export_handle_ops = {
.hop_addref = export_handle_addref,
.hop_free = NULL,
};
struct obd_export *class_export_get(struct obd_export *exp)
{
atomic_inc(&exp->exp_refcount);
CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
atomic_read(&exp->exp_refcount));
return exp;
}
EXPORT_SYMBOL(class_export_get);
void class_export_put(struct obd_export *exp)
{
LASSERT(exp != NULL);
LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
atomic_read(&exp->exp_refcount) - 1);
if (atomic_dec_and_test(&exp->exp_refcount)) {
struct obd_device *obd = exp->exp_obd;
CDEBUG(D_IOCTL, "final put %p/%s\n",
exp, exp->exp_client_uuid.uuid);
/* release nid stat refererence */
lprocfs_exp_cleanup(exp);
if (exp == obd->obd_self_export) {
/* self export should be destroyed without
* zombie thread as it doesn't hold a
* reference to obd and doesn't hold any
* resources */
class_export_destroy(exp);
/* self export is destroyed, no class
* references exist and it is safe to free
* obd */
class_free_dev(obd);
} else {
LASSERT(!list_empty(&exp->exp_obd_chain));
obd_zombie_export_add(exp);
}
}
}
EXPORT_SYMBOL(class_export_put);
/* Creates a new export, adds it to the hash table, and returns a
* pointer to it. The refcount is 2: one for the hash reference, and
* one for the pointer returned by this function. */
struct obd_export *__class_new_export(struct obd_device *obd,
struct obd_uuid *cluuid, bool is_self)
{
struct obd_export *export;
struct cfs_hash *hash = NULL;
int rc = 0;
ENTRY;
OBD_ALLOC_PTR(export);
if (!export)
return ERR_PTR(-ENOMEM);
export->exp_conn_cnt = 0;
export->exp_lock_hash = NULL;
export->exp_flock_hash = NULL;
/* 2 = class_handle_hash + last */
atomic_set(&export->exp_refcount, 2);
atomic_set(&export->exp_rpc_count, 0);
atomic_set(&export->exp_cb_count, 0);
atomic_set(&export->exp_locks_count, 0);
#if LUSTRE_TRACKS_LOCK_EXP_REFS
INIT_LIST_HEAD(&export->exp_locks_list);
spin_lock_init(&export->exp_locks_list_guard);
#endif
atomic_set(&export->exp_replay_count, 0);
export->exp_obd = obd;
INIT_LIST_HEAD(&export->exp_outstanding_replies);
spin_lock_init(&export->exp_uncommitted_replies_lock);
INIT_LIST_HEAD(&export->exp_uncommitted_replies);
INIT_LIST_HEAD(&export->exp_req_replay_queue);
INIT_LIST_HEAD(&export->exp_handle.h_link);
INIT_LIST_HEAD(&export->exp_hp_rpcs);
INIT_LIST_HEAD(&export->exp_reg_rpcs);
class_handle_hash(&export->exp_handle, &export_handle_ops);
export->exp_last_request_time = cfs_time_current_sec();
spin_lock_init(&export->exp_lock);
spin_lock_init(&export->exp_rpc_lock);
INIT_HLIST_NODE(&export->exp_uuid_hash);
INIT_HLIST_NODE(&export->exp_nid_hash);
INIT_HLIST_NODE(&export->exp_gen_hash);
spin_lock_init(&export->exp_bl_list_lock);
INIT_LIST_HEAD(&export->exp_bl_list);
INIT_LIST_HEAD(&export->exp_stale_list);
export->exp_sp_peer = LUSTRE_SP_ANY;
export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
export->exp_client_uuid = *cluuid;
obd_init_export(export);
if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
spin_lock(&obd->obd_dev_lock);
/* shouldn't happen, but might race */
if (obd->obd_stopping)
GOTO(exit_unlock, rc = -ENODEV);
hash = cfs_hash_getref(obd->obd_uuid_hash);
if (hash == NULL)
GOTO(exit_unlock, rc = -ENODEV);
spin_unlock(&obd->obd_dev_lock);
rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
if (rc != 0) {
LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
obd->obd_name, cluuid->uuid, rc);
GOTO(exit_err, rc = -EALREADY);
}
}
at_init(&export->exp_bl_lock_at, obd_timeout, 0);
spin_lock(&obd->obd_dev_lock);
if (obd->obd_stopping) {
if (hash)
cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
GOTO(exit_unlock, rc = -ESHUTDOWN);
}
if (!is_self) {
class_incref(obd, "export", export);
list_add_tail(&export->exp_obd_chain_timed,
&obd->obd_exports_timed);
list_add(&export->exp_obd_chain, &obd->obd_exports);
obd->obd_num_exports++;
} else {
INIT_LIST_HEAD(&export->exp_obd_chain_timed);
INIT_LIST_HEAD(&export->exp_obd_chain);
}
spin_unlock(&obd->obd_dev_lock);
if (hash)
cfs_hash_putref(hash);
RETURN(export);
exit_unlock:
spin_unlock(&obd->obd_dev_lock);
exit_err:
if (hash)
cfs_hash_putref(hash);
class_handle_unhash(&export->exp_handle);
LASSERT(hlist_unhashed(&export->exp_uuid_hash));
obd_destroy_export(export);
OBD_FREE_PTR(export);
return ERR_PTR(rc);
}
struct obd_export *class_new_export(struct obd_device *obd,
struct obd_uuid *uuid)
{
return __class_new_export(obd, uuid, false);
}
EXPORT_SYMBOL(class_new_export);
struct obd_export *class_new_export_self(struct obd_device *obd,
struct obd_uuid *uuid)
{
return __class_new_export(obd, uuid, true);
}
void class_unlink_export(struct obd_export *exp)
{
class_handle_unhash(&exp->exp_handle);
if (exp->exp_obd->obd_self_export == exp) {
class_export_put(exp);
return;
}
spin_lock(&exp->exp_obd->obd_dev_lock);
/* delete an uuid-export hashitem from hashtables */
if (!hlist_unhashed(&exp->exp_uuid_hash))
cfs_hash_del(exp->exp_obd->obd_uuid_hash,
&exp->exp_client_uuid,
&exp->exp_uuid_hash);
#ifdef HAVE_SERVER_SUPPORT
if (!hlist_unhashed(&exp->exp_gen_hash)) {
struct tg_export_data *ted = &exp->exp_target_data;
struct cfs_hash *hash;
/* Because obd_gen_hash will not be released until
* class_cleanup(), so hash should never be NULL here */
hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
LASSERT(hash != NULL);
cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
&exp->exp_gen_hash);
cfs_hash_putref(hash);
}
#endif /* HAVE_SERVER_SUPPORT */
list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
list_del_init(&exp->exp_obd_chain_timed);
exp->exp_obd->obd_num_exports--;
spin_unlock(&exp->exp_obd->obd_dev_lock);
atomic_inc(&obd_stale_export_num);
/* A reference is kept by obd_stale_exports list */
obd_stale_export_put(exp);
}
EXPORT_SYMBOL(class_unlink_export);
/* Import management functions */
static void class_import_destroy(struct obd_import *imp)
{
ENTRY;
CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
imp->imp_obd->obd_name);
LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
ptlrpc_put_connection_superhack(imp->imp_connection);
while (!list_empty(&imp->imp_conn_list)) {
struct obd_import_conn *imp_conn;
imp_conn = list_entry(imp->imp_conn_list.next,
struct obd_import_conn, oic_item);
list_del_init(&imp_conn->oic_item);
ptlrpc_put_connection_superhack(imp_conn->oic_conn);
OBD_FREE(imp_conn, sizeof(*imp_conn));
}
LASSERT(imp->imp_sec == NULL);
class_decref(imp->imp_obd, "import", imp);
OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
EXIT;
}
static void import_handle_addref(void *import)
{
class_import_get(import);
}
static struct portals_handle_ops import_handle_ops = {
.hop_addref = import_handle_addref,
.hop_free = NULL,
};
struct obd_import *class_import_get(struct obd_import *import)
{
atomic_inc(&import->imp_refcount);
CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
atomic_read(&import->imp_refcount),
import->imp_obd->obd_name);
return import;
}
EXPORT_SYMBOL(class_import_get);
void class_import_put(struct obd_import *imp)
{
ENTRY;
LASSERT(list_empty(&imp->imp_zombie_chain));
LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
atomic_read(&imp->imp_refcount) - 1,
imp->imp_obd->obd_name);
if (atomic_dec_and_test(&imp->imp_refcount)) {
CDEBUG(D_INFO, "final put import %p\n", imp);
obd_zombie_import_add(imp);
}
/* catch possible import put race */
LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
EXIT;
}
EXPORT_SYMBOL(class_import_put);
static void init_imp_at(struct imp_at *at) {
int i;
at_init(&at->iat_net_latency, 0, 0);
for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
/* max service estimates are tracked on the server side, so
don't use the AT history here, just use the last reported
val. (But keep hist for proc histogram, worst_ever) */
at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
AT_FLG_NOHIST);
}
}
struct obd_import *class_new_import(struct obd_device *obd)
{
struct obd_import *imp;
struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
OBD_ALLOC(imp, sizeof(*imp));
if (imp == NULL)
return NULL;
INIT_LIST_HEAD(&imp->imp_pinger_chain);
INIT_LIST_HEAD(&imp->imp_zombie_chain);
INIT_LIST_HEAD(&imp->imp_replay_list);
INIT_LIST_HEAD(&imp->imp_sending_list);
INIT_LIST_HEAD(&imp->imp_delayed_list);
INIT_LIST_HEAD(&imp->imp_committed_list);
INIT_LIST_HEAD(&imp->imp_unreplied_list);
imp->imp_known_replied_xid = 0;
imp->imp_replay_cursor = &imp->imp_committed_list;
spin_lock_init(&imp->imp_lock);
imp->imp_last_success_conn = 0;
imp->imp_state = LUSTRE_IMP_NEW;
imp->imp_obd = class_incref(obd, "import", imp);
mutex_init(&imp->imp_sec_mutex);
init_waitqueue_head(&imp->imp_recovery_waitq);
if (curr_pid_ns->child_reaper)
imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
else
imp->imp_sec_refpid = 1;
atomic_set(&imp->imp_refcount, 2);
atomic_set(&imp->imp_unregistering, 0);
atomic_set(&imp->imp_inflight, 0);
atomic_set(&imp->imp_replay_inflight, 0);
atomic_set(&imp->imp_inval_count, 0);
INIT_LIST_HEAD(&imp->imp_conn_list);
INIT_LIST_HEAD(&imp->imp_handle.h_link);
class_handle_hash(&imp->imp_handle, &import_handle_ops);
init_imp_at(&imp->imp_at);
/* the default magic is V2, will be used in connect RPC, and
* then adjusted according to the flags in request/reply. */
imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
return imp;
}
EXPORT_SYMBOL(class_new_import);
void class_destroy_import(struct obd_import *import)
{
LASSERT(import != NULL);
LASSERT(import != LP_POISON);
class_handle_unhash(&import->imp_handle);
spin_lock(&import->imp_lock);
import->imp_generation++;
spin_unlock(&import->imp_lock);
class_import_put(import);
}
EXPORT_SYMBOL(class_destroy_import);
#if LUSTRE_TRACKS_LOCK_EXP_REFS
void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
{
spin_lock(&exp->exp_locks_list_guard);
LASSERT(lock->l_exp_refs_nr >= 0);
if (lock->l_exp_refs_target != NULL &&
lock->l_exp_refs_target != exp) {
LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
exp, lock, lock->l_exp_refs_target);
}
if ((lock->l_exp_refs_nr ++) == 0) {
list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
lock->l_exp_refs_target = exp;
}
CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
lock, exp, lock->l_exp_refs_nr);
spin_unlock(&exp->exp_locks_list_guard);
}
EXPORT_SYMBOL(__class_export_add_lock_ref);
void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
{
spin_lock(&exp->exp_locks_list_guard);
LASSERT(lock->l_exp_refs_nr > 0);
if (lock->l_exp_refs_target != exp) {
LCONSOLE_WARN("lock %p, "
"mismatching export pointers: %p, %p\n",
lock, lock->l_exp_refs_target, exp);
}
if (-- lock->l_exp_refs_nr == 0) {
list_del_init(&lock->l_exp_refs_link);
lock->l_exp_refs_target = NULL;
}
CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
lock, exp, lock->l_exp_refs_nr);
spin_unlock(&exp->exp_locks_list_guard);
}
EXPORT_SYMBOL(__class_export_del_lock_ref);
#endif
/* A connection defines an export context in which preallocation can
be managed. This releases the export pointer reference, and returns
the export handle, so the export refcount is 1 when this function
returns. */
int class_connect(struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid)
{
struct obd_export *export;
LASSERT(conn != NULL);
LASSERT(obd != NULL);
LASSERT(cluuid != NULL);
ENTRY;
export = class_new_export(obd, cluuid);
if (IS_ERR(export))
RETURN(PTR_ERR(export));
conn->cookie = export->exp_handle.h_cookie;
class_export_put(export);
CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
cluuid->uuid, conn->cookie);
RETURN(0);
}
EXPORT_SYMBOL(class_connect);
/* if export is involved in recovery then clean up related things */
static void class_export_recovery_cleanup(struct obd_export *exp)
{
struct obd_device *obd = exp->exp_obd;
spin_lock(&obd->obd_recovery_task_lock);
if (obd->obd_recovering) {
if (exp->exp_in_recovery) {
spin_lock(&exp->exp_lock);
exp->exp_in_recovery = 0;
spin_unlock(&exp->exp_lock);
LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
atomic_dec(&obd->obd_connected_clients);
}
/* if called during recovery then should update
* obd_stale_clients counter,
* lightweight exports are not counted */
if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
exp->exp_obd->obd_stale_clients++;
}
spin_unlock(&obd->obd_recovery_task_lock);
spin_lock(&exp->exp_lock);
/** Cleanup req replay fields */
if (exp->exp_req_replay_needed) {
exp->exp_req_replay_needed = 0;
LASSERT(atomic_read(&obd->obd_req_replay_clients));
atomic_dec(&obd->obd_req_replay_clients);
}
/** Cleanup lock replay data */
if (exp->exp_lock_replay_needed) {
exp->exp_lock_replay_needed = 0;
LASSERT(atomic_read(&obd->obd_lock_replay_clients));
atomic_dec(&obd->obd_lock_replay_clients);
}
spin_unlock(&exp->exp_lock);
}
/* This function removes 1-3 references from the export:
* 1 - for export pointer passed
* and if disconnect really need
* 2 - removing from hash
* 3 - in client_unlink_export
* The export pointer passed to this function can destroyed */
int class_disconnect(struct obd_export *export)
{
int already_disconnected;
ENTRY;
if (export == NULL) {
CWARN("attempting to free NULL export %p\n", export);
RETURN(-EINVAL);
}
spin_lock(&export->exp_lock);
already_disconnected = export->exp_disconnected;
export->exp_disconnected = 1;
/* We hold references of export for uuid hash
* and nid_hash and export link at least. So
* it is safe to call cfs_hash_del in there. */
if (!hlist_unhashed(&export->exp_nid_hash))
cfs_hash_del(export->exp_obd->obd_nid_hash,
&export->exp_connection->c_peer.nid,
&export->exp_nid_hash);
spin_unlock(&export->exp_lock);
/* class_cleanup(), abort_recovery(), and class_fail_export()
* all end up in here, and if any of them race we shouldn't
* call extra class_export_puts(). */
if (already_disconnected) {
LASSERT(hlist_unhashed(&export->exp_nid_hash));
GOTO(no_disconn, already_disconnected);
}
CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
export->exp_handle.h_cookie);
class_export_recovery_cleanup(export);
class_unlink_export(export);
no_disconn:
class_export_put(export);
RETURN(0);
}
EXPORT_SYMBOL(class_disconnect);
/* Return non-zero for a fully connected export */
int class_connected_export(struct obd_export *exp)
{
int connected = 0;
if (exp) {
spin_lock(&exp->exp_lock);
connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
spin_unlock(&exp->exp_lock);
}
return connected;
}
EXPORT_SYMBOL(class_connected_export);
static void class_disconnect_export_list(struct list_head *list,
enum obd_option flags)
{
int rc;
struct obd_export *exp;
ENTRY;
/* It's possible that an export may disconnect itself, but
* nothing else will be added to this list. */
while (!list_empty(list)) {
exp = list_entry(list->next, struct obd_export,
exp_obd_chain);
/* need for safe call CDEBUG after obd_disconnect */
class_export_get(exp);
spin_lock(&exp->exp_lock);
exp->exp_flags = flags;
spin_unlock(&exp->exp_lock);
if (obd_uuid_equals(&exp->exp_client_uuid,
&exp->exp_obd->obd_uuid)) {
CDEBUG(D_HA,
"exp %p export uuid == obd uuid, don't discon\n",
exp);
/* Need to delete this now so we don't end up pointing
* to work_list later when this export is cleaned up. */
list_del_init(&exp->exp_obd_chain);
class_export_put(exp);
continue;
}
class_export_get(exp);
CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
"last request at %ld\n",
exp->exp_obd->obd_name, obd_export_nid2str(exp),
exp, exp->exp_last_request_time);
/* release one export reference anyway */
rc = obd_disconnect(exp);
CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
obd_export_nid2str(exp), exp, rc);
class_export_put(exp);
}
EXIT;
}
void class_disconnect_exports(struct obd_device *obd)
{
struct list_head work_list;
ENTRY;
/* Move all of the exports from obd_exports to a work list, en masse. */
INIT_LIST_HEAD(&work_list);
spin_lock(&obd->obd_dev_lock);
list_splice_init(&obd->obd_exports, &work_list);
list_splice_init(&obd->obd_delayed_exports, &work_list);
spin_unlock(&obd->obd_dev_lock);
if (!list_empty(&work_list)) {
CDEBUG(D_HA, "OBD device %d (%p) has exports, "
"disconnecting them\n", obd->obd_minor, obd);
class_disconnect_export_list(&work_list,
exp_flags_from_obd(obd));
} else
CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
obd->obd_minor, obd);
EXIT;
}
EXPORT_SYMBOL(class_disconnect_exports);
/* Remove exports that have not completed recovery.
*/
void class_disconnect_stale_exports(struct obd_device *obd,
int (*test_export)(struct obd_export *))
{
struct list_head work_list;
struct obd_export *exp, *n;
int evicted = 0;
ENTRY;
INIT_LIST_HEAD(&work_list);
spin_lock(&obd->obd_dev_lock);
list_for_each_entry_safe(exp, n, &obd->obd_exports,
exp_obd_chain) {
/* don't count self-export as client */
if (obd_uuid_equals(&exp->exp_client_uuid,
&exp->exp_obd->obd_uuid))
continue;
/* don't evict clients which have no slot in last_rcvd
* (e.g. lightweight connection) */
if (exp->exp_target_data.ted_lr_idx == -1)
continue;
spin_lock(&exp->exp_lock);
if (exp->exp_failed || test_export(exp)) {
spin_unlock(&exp->exp_lock);
continue;
}
exp->exp_failed = 1;
spin_unlock(&exp->exp_lock);
list_move(&exp->exp_obd_chain, &work_list);
evicted++;
CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
obd->obd_name, exp->exp_client_uuid.uuid,
exp->exp_connection == NULL ? "<unknown>" :
libcfs_nid2str(exp->exp_connection->c_peer.nid));
print_export_data(exp, "EVICTING", 0, D_HA);
}
spin_unlock(&obd->obd_dev_lock);
if (evicted)
LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
obd->obd_name, evicted);
class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
OBD_OPT_ABORT_RECOV);
EXIT;
}
EXPORT_SYMBOL(class_disconnect_stale_exports);
void class_fail_export(struct obd_export *exp)
{
int rc, already_failed;
spin_lock(&exp->exp_lock);
already_failed = exp->exp_failed;
exp->exp_failed = 1;
spin_unlock(&exp->exp_lock);
if (already_failed) {
CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
exp, exp->exp_client_uuid.uuid);
return;
}
CDEBUG(D_HA, "disconnecting export %p/%s\n",
exp, exp->exp_client_uuid.uuid);
if (obd_dump_on_timeout)
libcfs_debug_dumplog();
/* need for safe call CDEBUG after obd_disconnect */
class_export_get(exp);
/* Most callers into obd_disconnect are removing their own reference
* (request, for example) in addition to the one from the hash table.
* We don't have such a reference here, so make one. */
class_export_get(exp);
rc = obd_disconnect(exp);
if (rc)
CERROR("disconnecting export %p failed: %d\n", exp, rc);
else
CDEBUG(D_HA, "disconnected export %p/%s\n",
exp, exp->exp_client_uuid.uuid);
class_export_put(exp);
}
EXPORT_SYMBOL(class_fail_export);
char *obd_export_nid2str(struct obd_export *exp)
{
if (exp->exp_connection != NULL)
return libcfs_nid2str(exp->exp_connection->c_peer.nid);
return "(no nid)";
}
EXPORT_SYMBOL(obd_export_nid2str);
int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
{
struct cfs_hash *nid_hash;
struct obd_export *doomed_exp = NULL;
int exports_evicted = 0;
lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
spin_lock(&obd->obd_dev_lock);
/* umount has run already, so evict thread should leave
* its task to umount thread now */
if (obd->obd_stopping) {
spin_unlock(&obd->obd_dev_lock);
return exports_evicted;
}
nid_hash = obd->obd_nid_hash;
cfs_hash_getref(nid_hash);
spin_unlock(&obd->obd_dev_lock);
do {
doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
if (doomed_exp == NULL)
break;
LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
"nid %s found, wanted nid %s, requested nid %s\n",
obd_export_nid2str(doomed_exp),
libcfs_nid2str(nid_key), nid);
LASSERTF(doomed_exp != obd->obd_self_export,
"self-export is hashed by NID?\n");
exports_evicted++;
LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
"request\n", obd->obd_name,
obd_uuid2str(&doomed_exp->exp_client_uuid),
obd_export_nid2str(doomed_exp));
class_fail_export(doomed_exp);
class_export_put(doomed_exp);
} while (1);
cfs_hash_putref(nid_hash);
if (!exports_evicted)
CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
obd->obd_name, nid);
return exports_evicted;
}
EXPORT_SYMBOL(obd_export_evict_by_nid);
int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
{
struct cfs_hash *uuid_hash;
struct obd_export *doomed_exp = NULL;
struct obd_uuid doomed_uuid;
int exports_evicted = 0;
spin_lock(&obd->obd_dev_lock);
if (obd->obd_stopping) {
spin_unlock(&obd->obd_dev_lock);
return exports_evicted;
}
uuid_hash = obd->obd_uuid_hash;
cfs_hash_getref(uuid_hash);
spin_unlock(&obd->obd_dev_lock);
obd_str2uuid(&doomed_uuid, uuid);
if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
CERROR("%s: can't evict myself\n", obd->obd_name);
cfs_hash_putref(uuid_hash);
return exports_evicted;
}
doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
if (doomed_exp == NULL) {
CERROR("%s: can't disconnect %s: no exports found\n",
obd->obd_name, uuid);
} else {
CWARN("%s: evicting %s at adminstrative request\n",
obd->obd_name, doomed_exp->exp_client_uuid.uuid);
class_fail_export(doomed_exp);
class_export_put(doomed_exp);
exports_evicted++;
}
cfs_hash_putref(uuid_hash);
return exports_evicted;
}
#if LUSTRE_TRACKS_LOCK_EXP_REFS
void (*class_export_dump_hook)(struct obd_export*) = NULL;
EXPORT_SYMBOL(class_export_dump_hook);
#endif
static void print_export_data(struct obd_export *exp, const char *status,
int locks, int debug_level)
{
struct ptlrpc_reply_state *rs;
struct ptlrpc_reply_state *first_reply = NULL;
int nreplies = 0;
spin_lock(&exp->exp_lock);
list_for_each_entry(rs, &exp->exp_outstanding_replies,
rs_exp_list) {
if (nreplies == 0)
first_reply = rs;
nreplies++;
}
spin_unlock(&exp->exp_lock);
CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
"%p %s %llu stale:%d\n",
exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
atomic_read(&exp->exp_rpc_count),
atomic_read(&exp->exp_cb_count),
atomic_read(&exp->exp_locks_count),
exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
nreplies, first_reply, nreplies > 3 ? "..." : "",
exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
#if LUSTRE_TRACKS_LOCK_EXP_REFS
if (locks && class_export_dump_hook != NULL)
class_export_dump_hook(exp);
#endif
}
void dump_exports(struct obd_device *obd, int locks, int debug_level)
{
struct obd_export *exp;
spin_lock(&obd->obd_dev_lock);
list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
print_export_data(exp, "ACTIVE", locks, debug_level);
list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
print_export_data(exp, "UNLINKED", locks, debug_level);
list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
print_export_data(exp, "DELAYED", locks, debug_level);
spin_unlock(&obd->obd_dev_lock);
spin_lock(&obd_zombie_impexp_lock);
list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
print_export_data(exp, "ZOMBIE", locks, debug_level);
spin_unlock(&obd_zombie_impexp_lock);
}
void obd_exports_barrier(struct obd_device *obd)
{
int waited = 2;
LASSERT(list_empty(&obd->obd_exports));
spin_lock(&obd->obd_dev_lock);
while (!list_empty(&obd->obd_unlinked_exports)) {
spin_unlock(&obd->obd_dev_lock);
set_current_state(TASK_UNINTERRUPTIBLE);
schedule_timeout(cfs_time_seconds(waited));
if (waited > 5 && is_power_of_2(waited)) {
LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
"more than %d seconds. "
"The obd refcount = %d. Is it stuck?\n",
obd->obd_name, waited,
atomic_read(&obd->obd_refcount));
dump_exports(obd, 1, D_CONSOLE | D_WARNING);
}
waited *= 2;
spin_lock(&obd->obd_dev_lock);
}
spin_unlock(&obd->obd_dev_lock);
}
EXPORT_SYMBOL(obd_exports_barrier);
/* Total amount of zombies to be destroyed */
static int zombies_count = 0;
/**
* kill zombie imports and exports
*/
void obd_zombie_impexp_cull(void)
{
struct obd_import *import;
struct obd_export *export;
ENTRY;
do {
spin_lock(&obd_zombie_impexp_lock);
import = NULL;
if (!list_empty(&obd_zombie_imports)) {
import = list_entry(obd_zombie_imports.next,
struct obd_import,
imp_zombie_chain);
list_del_init(&import->imp_zombie_chain);
}
export = NULL;
if (!list_empty(&obd_zombie_exports)) {
export = list_entry(obd_zombie_exports.next,
struct obd_export,
exp_obd_chain);
list_del_init(&export->exp_obd_chain);
}
spin_unlock(&obd_zombie_impexp_lock);
if (import != NULL) {
class_import_destroy(import);
spin_lock(&obd_zombie_impexp_lock);
zombies_count--;
spin_unlock(&obd_zombie_impexp_lock);
}
if (export != NULL) {
class_export_destroy(export);
spin_lock(&obd_zombie_impexp_lock);
zombies_count--;
spin_unlock(&obd_zombie_impexp_lock);
}
cond_resched();
} while (import != NULL || export != NULL);
EXIT;
}
static DECLARE_COMPLETION(obd_zombie_start);
static DECLARE_COMPLETION(obd_zombie_stop);
static unsigned long obd_zombie_flags;
static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
static pid_t obd_zombie_pid;
enum {
OBD_ZOMBIE_STOP = 0x0001,
};
/**
* check for work for kill zombie import/export thread.
*/
static int obd_zombie_impexp_check(void *arg)
{
int rc;
spin_lock(&obd_zombie_impexp_lock);
rc = (zombies_count == 0) &&
!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
spin_unlock(&obd_zombie_impexp_lock);
RETURN(rc);
}
/**
* Add export to the obd_zombe thread and notify it.
*/
static void obd_zombie_export_add(struct obd_export *exp) {
atomic_dec(&obd_stale_export_num);
spin_lock(&exp->exp_obd->obd_dev_lock);
LASSERT(!list_empty(&exp->exp_obd_chain));
list_del_init(&exp->exp_obd_chain);
spin_unlock(&exp->exp_obd->obd_dev_lock);
spin_lock(&obd_zombie_impexp_lock);
zombies_count++;
list_add(&exp->exp_obd_chain, &obd_zombie_exports);
spin_unlock(&obd_zombie_impexp_lock);
obd_zombie_impexp_notify();
}
/**
* Add import to the obd_zombe thread and notify it.
*/
static void obd_zombie_import_add(struct obd_import *imp) {
LASSERT(imp->imp_sec == NULL);
spin_lock(&obd_zombie_impexp_lock);
LASSERT(list_empty(&imp->imp_zombie_chain));
zombies_count++;
list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
spin_unlock(&obd_zombie_impexp_lock);
obd_zombie_impexp_notify();
}
/**
* notify import/export destroy thread about new zombie.
*/
static void obd_zombie_impexp_notify(void)
{
/*
* Make sure obd_zomebie_impexp_thread get this notification.
* It is possible this signal only get by obd_zombie_barrier, and
* barrier gulps this notification and sleeps away and hangs ensues
*/
wake_up_all(&obd_zombie_waitq);
}
/**
* check whether obd_zombie is idle
*/
static int obd_zombie_is_idle(void)
{
int rc;
LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
spin_lock(&obd_zombie_impexp_lock);
rc = (zombies_count == 0);
spin_unlock(&obd_zombie_impexp_lock);
return rc;
}
/**
* wait when obd_zombie import/export queues become empty
*/
void obd_zombie_barrier(void)
{
struct l_wait_info lwi = { 0 };
if (obd_zombie_pid == current_pid())
/* don't wait for myself */
return;
l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
}
EXPORT_SYMBOL(obd_zombie_barrier);
struct obd_export *obd_stale_export_get(void)
{
struct obd_export *exp = NULL;
ENTRY;
spin_lock(&obd_stale_export_lock);
if (!list_empty(&obd_stale_exports)) {
exp = list_entry(obd_stale_exports.next,
struct obd_export, exp_stale_list);
list_del_init(&exp->exp_stale_list);
}
spin_unlock(&obd_stale_export_lock);
if (exp) {
CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
atomic_read(&obd_stale_export_num));
}
RETURN(exp);
}
EXPORT_SYMBOL(obd_stale_export_get);
void obd_stale_export_put(struct obd_export *exp)
{
ENTRY;
LASSERT(list_empty(&exp->exp_stale_list));
if (exp->exp_lock_hash &&
atomic_read(&exp->exp_lock_hash->hs_count)) {
CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
atomic_read(&obd_stale_export_num));
spin_lock_bh(&exp->exp_bl_list_lock);
spin_lock(&obd_stale_export_lock);
/* Add to the tail if there is no blocked locks,
* to the head otherwise. */
if (list_empty(&exp->exp_bl_list))
list_add_tail(&exp->exp_stale_list,
&obd_stale_exports);
else
list_add(&exp->exp_stale_list,
&obd_stale_exports);
spin_unlock(&obd_stale_export_lock);
spin_unlock_bh(&exp->exp_bl_list_lock);
} else {
class_export_put(exp);
}
EXIT;
}
EXPORT_SYMBOL(obd_stale_export_put);
/**
* Adjust the position of the export in the stale list,
* i.e. move to the head of the list if is needed.
**/
void obd_stale_export_adjust(struct obd_export *exp)
{
LASSERT(exp != NULL);
spin_lock_bh(&exp->exp_bl_list_lock);
spin_lock(&obd_stale_export_lock);
if (!list_empty(&exp->exp_stale_list) &&
!list_empty(&exp->exp_bl_list))
list_move(&exp->exp_stale_list, &obd_stale_exports);
spin_unlock(&obd_stale_export_lock);
spin_unlock_bh(&exp->exp_bl_list_lock);
}
EXPORT_SYMBOL(obd_stale_export_adjust);
/**
* destroy zombie export/import thread.
*/
static int obd_zombie_impexp_thread(void *unused)
{
unshare_fs_struct();
complete(&obd_zombie_start);
obd_zombie_pid = current_pid();
while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
struct l_wait_info lwi = { 0 };
l_wait_event(obd_zombie_waitq,
!obd_zombie_impexp_check(NULL), &lwi);
obd_zombie_impexp_cull();
/*
* Notify obd_zombie_barrier callers that queues
* may be empty.
*/
wake_up(&obd_zombie_waitq);
}
complete(&obd_zombie_stop);
RETURN(0);
}
/**
* start destroy zombie import/export thread
*/
int obd_zombie_impexp_init(void)
{
struct task_struct *task;
task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
if (IS_ERR(task))
RETURN(PTR_ERR(task));
wait_for_completion(&obd_zombie_start);
RETURN(0);
}
/**
* stop destroy zombie import/export thread
*/
void obd_zombie_impexp_stop(void)
{
set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
obd_zombie_impexp_notify();
wait_for_completion(&obd_zombie_stop);
LASSERT(list_empty(&obd_stale_exports));
}
/***** Kernel-userspace comm helpers *******/
/* Get length of entire message, including header */
int kuc_len(int payload_len)
{
return sizeof(struct kuc_hdr) + payload_len;
}
EXPORT_SYMBOL(kuc_len);
/* Get a pointer to kuc header, given a ptr to the payload
* @param p Pointer to payload area
* @returns Pointer to kuc header
*/
struct kuc_hdr * kuc_ptr(void *p)
{
struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
LASSERT(lh->kuc_magic == KUC_MAGIC);
return lh;
}
EXPORT_SYMBOL(kuc_ptr);
/* Alloc space for a message, and fill in header
* @return Pointer to payload area
*/
void *kuc_alloc(int payload_len, int transport, int type)
{
struct kuc_hdr *lh;
int len = kuc_len(payload_len);
OBD_ALLOC(lh, len);
if (lh == NULL)
return ERR_PTR(-ENOMEM);
lh->kuc_magic = KUC_MAGIC;
lh->kuc_transport = transport;
lh->kuc_msgtype = type;
lh->kuc_msglen = len;
return (void *)(lh + 1);
}
EXPORT_SYMBOL(kuc_alloc);
/* Takes pointer to payload area */
void kuc_free(void *p, int payload_len)
{
struct kuc_hdr *lh = kuc_ptr(p);
OBD_FREE(lh, kuc_len(payload_len));
}
EXPORT_SYMBOL(kuc_free);
struct obd_request_slot_waiter {
struct list_head orsw_entry;
wait_queue_head_t orsw_waitq;
bool orsw_signaled;
};
static bool obd_request_slot_avail(struct client_obd *cli,
struct obd_request_slot_waiter *orsw)
{
bool avail;
spin_lock(&cli->cl_loi_list_lock);
avail = !!list_empty(&orsw->orsw_entry);
spin_unlock(&cli->cl_loi_list_lock);
return avail;
};
/*
* For network flow control, the RPC sponsor needs to acquire a credit
* before sending the RPC. The credits count for a connection is defined
* by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
* the subsequent RPC sponsors need to wait until others released their
* credits, or the administrator increased the "cl_max_rpcs_in_flight".
*/
int obd_get_request_slot(struct client_obd *cli)
{
struct obd_request_slot_waiter orsw;
struct l_wait_info lwi;
int rc;
spin_lock(&cli->cl_loi_list_lock);
if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
cli->cl_r_in_flight++;
spin_unlock(&cli->cl_loi_list_lock);
return 0;
}
init_waitqueue_head(&orsw.orsw_waitq);
list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
orsw.orsw_signaled = false;
spin_unlock(&cli->cl_loi_list_lock);
lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
rc = l_wait_event(orsw.orsw_waitq,
obd_request_slot_avail(cli, &orsw) ||
orsw.orsw_signaled,
&lwi);
/* Here, we must take the lock to avoid the on-stack 'orsw' to be
* freed but other (such as obd_put_request_slot) is using it. */
spin_lock(&cli->cl_loi_list_lock);
if (rc != 0) {
if (!orsw.orsw_signaled) {
if (list_empty(&orsw.orsw_entry))
cli->cl_r_in_flight--;
else
list_del(&orsw.orsw_entry);
}
}
if (orsw.orsw_signaled) {
LASSERT(list_empty(&orsw.orsw_entry));
rc = -EINTR;
}
spin_unlock(&cli->cl_loi_list_lock);
return rc;
}
EXPORT_SYMBOL(obd_get_request_slot);
void obd_put_request_slot(struct client_obd *cli)
{
struct obd_request_slot_waiter *orsw;
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
/* If there is free slot, wakeup the first waiter. */
if (!list_empty(&cli->cl_loi_read_list) &&
likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
orsw = list_entry(cli->cl_loi_read_list.next,
struct obd_request_slot_waiter, orsw_entry);
list_del_init(&orsw->orsw_entry);
cli->cl_r_in_flight++;
wake_up(&orsw->orsw_waitq);
}
spin_unlock(&cli->cl_loi_list_lock);
}
EXPORT_SYMBOL(obd_put_request_slot);
__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
{
return cli->cl_max_rpcs_in_flight;
}
EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
{
struct obd_request_slot_waiter *orsw;
__u32 old;
int diff;
int i;
char *typ_name;
int rc;
if (max > OBD_MAX_RIF_MAX || max < 1)
return -ERANGE;
typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
/* adjust max_mod_rpcs_in_flight to ensure it is always
* strictly lower that max_rpcs_in_flight */
if (max < 2) {
CERROR("%s: cannot set max_rpcs_in_flight to 1 "
"because it must be higher than "
"max_mod_rpcs_in_flight value",
cli->cl_import->imp_obd->obd_name);
return -ERANGE;
}
if (max <= cli->cl_max_mod_rpcs_in_flight) {
rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
if (rc != 0)
return rc;
}
}
spin_lock(&cli->cl_loi_list_lock);
old = cli->cl_max_rpcs_in_flight;
cli->cl_max_rpcs_in_flight = max;
diff = max - old;
/* We increase the max_rpcs_in_flight, then wakeup some waiters. */
for (i = 0; i < diff; i++) {
if (list_empty(&cli->cl_loi_read_list))
break;
orsw = list_entry(cli->cl_loi_read_list.next,
struct obd_request_slot_waiter, orsw_entry);
list_del_init(&orsw->orsw_entry);
cli->cl_r_in_flight++;
wake_up(&orsw->orsw_waitq);
}
spin_unlock(&cli->cl_loi_list_lock);
return 0;
}
EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
__u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
{
return cli->cl_max_mod_rpcs_in_flight;
}
EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
{
struct obd_connect_data *ocd;
__u16 maxmodrpcs;
__u16 prev;
if (max > OBD_MAX_RIF_MAX || max < 1)
return -ERANGE;
/* cannot exceed or equal max_rpcs_in_flight */
if (max >= cli->cl_max_rpcs_in_flight) {
CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
"higher or equal to max_rpcs_in_flight value (%u)\n",
cli->cl_import->imp_obd->obd_name,
max, cli->cl_max_rpcs_in_flight);
return -ERANGE;
}
/* cannot exceed max modify RPCs in flight supported by the server */
ocd = &cli->cl_import->imp_connect_data;
if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
maxmodrpcs = ocd->ocd_maxmodrpcs;
else
maxmodrpcs = 1;
if (max > maxmodrpcs) {
CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
"higher than max_mod_rpcs_per_client value (%hu) "
"returned by the server at connection\n",
cli->cl_import->imp_obd->obd_name,
max, maxmodrpcs);
return -ERANGE;
}
spin_lock(&cli->cl_mod_rpcs_lock);
prev = cli->cl_max_mod_rpcs_in_flight;
cli->cl_max_mod_rpcs_in_flight = max;
/* wakeup waiters if limit has been increased */
if (cli->cl_max_mod_rpcs_in_flight > prev)
wake_up(&cli->cl_mod_rpcs_waitq);
spin_unlock(&cli->cl_mod_rpcs_lock);
return 0;
}
EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
#define pct(a, b) (b ? a * 100 / b : 0)
int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
struct seq_file *seq)
{
unsigned long mod_tot = 0, mod_cum;
struct timespec64 now;
int i;
ktime_get_real_ts64(&now);
spin_lock(&cli->cl_mod_rpcs_lock);
seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
(s64)now.tv_sec, now.tv_nsec);
seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
cli->cl_mod_rpcs_in_flight);
seq_printf(seq, "\n\t\t\tmodify\n");
seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
mod_cum = 0;
for (i = 0; i < OBD_HIST_MAX; i++) {
unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
mod_cum += mod;
seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
i, mod, pct(mod, mod_tot),
pct(mod_cum, mod_tot));
if (mod_cum == mod_tot)
break;
}
spin_unlock(&cli->cl_mod_rpcs_lock);
return 0;
}
EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
#undef pct
/* The number of modify RPCs sent in parallel is limited
* because the server has a finite number of slots per client to
* store request result and ensure reply reconstruction when needed.
* On the client, this limit is stored in cl_max_mod_rpcs_in_flight
* that takes into account server limit and cl_max_rpcs_in_flight
* value.
* On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
* one close request is allowed above the maximum.
*/
static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
bool close_req)
{
bool avail;
/* A slot is available if
* - number of modify RPCs in flight is less than the max
* - it's a close RPC and no other close request is in flight
*/
avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
(close_req && cli->cl_close_rpcs_in_flight == 0);
return avail;
}
static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
bool close_req)
{
bool avail;
spin_lock(&cli->cl_mod_rpcs_lock);
avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
spin_unlock(&cli->cl_mod_rpcs_lock);
return avail;
}
static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
{
if (it != NULL &&
(it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
it->it_op == IT_READDIR ||
(it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
return true;
return false;
}
/* Get a modify RPC slot from the obd client @cli according
* to the kind of operation @opc that is going to be sent
* and the intent @it of the operation if it applies.
* If the maximum number of modify RPCs in flight is reached
* the thread is put to sleep.
* Returns the tag to be set in the request message. Tag 0
* is reserved for non-modifying requests.
*/
__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
struct lookup_intent *it)
{
struct l_wait_info lwi = LWI_INTR(NULL, NULL);
bool close_req = false;
__u16 i, max;
/* read-only metadata RPCs don't consume a slot on MDT
* for reply reconstruction
*/
if (obd_skip_mod_rpc_slot(it))
return 0;
if (opc == MDS_CLOSE)
close_req = true;
do {
spin_lock(&cli->cl_mod_rpcs_lock);
max = cli->cl_max_mod_rpcs_in_flight;
if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
/* there is a slot available */
cli->cl_mod_rpcs_in_flight++;
if (close_req)
cli->cl_close_rpcs_in_flight++;
lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
cli->cl_mod_rpcs_in_flight);
/* find a free tag */
i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
max + 1);
LASSERT(i < OBD_MAX_RIF_MAX);
LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
spin_unlock(&cli->cl_mod_rpcs_lock);
/* tag 0 is reserved for non-modify RPCs */
return i + 1;
}
spin_unlock(&cli->cl_mod_rpcs_lock);
CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
"opc %u, max %hu\n",
cli->cl_import->imp_obd->obd_name, opc, max);
l_wait_event(cli->cl_mod_rpcs_waitq,
obd_mod_rpc_slot_avail(cli, close_req), &lwi);
} while (true);
}
EXPORT_SYMBOL(obd_get_mod_rpc_slot);
/* Put a modify RPC slot from the obd client @cli according
* to the kind of operation @opc that has been sent and the
* intent @it of the operation if it applies.
*/
void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
struct lookup_intent *it, __u16 tag)
{
bool close_req = false;
if (obd_skip_mod_rpc_slot(it))
return;
if (opc == MDS_CLOSE)
close_req = true;
spin_lock(&cli->cl_mod_rpcs_lock);
cli->cl_mod_rpcs_in_flight--;
if (close_req)
cli->cl_close_rpcs_in_flight--;
/* release the tag in the bitmap */
LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
spin_unlock(&cli->cl_mod_rpcs_lock);
wake_up(&cli->cl_mod_rpcs_waitq);
}
EXPORT_SYMBOL(obd_put_mod_rpc_slot);