From 06fb20cfda6b2fe1c3b3e909d1448548505d033a Mon Sep 17 00:00:00 2001 From: pschwan <pschwan> Date: Wed, 10 Apr 2002 16:27:43 +0000 Subject: [PATCH] No new functionality outside of the DLM. ptlrpc_client no longer contains a peer, so the peer is passed in and managed separately. - All of the DLM calls are hooked up to the RPC system now - Some unmaintained local processing code removed - Added ldlm network-related bits to the test scripts --- lustre/include/linux/lustre_dlm.h | 54 ++++--- lustre/include/linux/lustre_idl.h | 36 +++-- lustre/include/linux/lustre_lite.h | 2 + lustre/include/linux/lustre_mds.h | 29 ++-- lustre/include/linux/lustre_net.h | 33 ++--- lustre/include/linux/obd.h | 3 + lustre/include/linux/obd_class.h | 8 +- lustre/include/linux/obd_support.h | 10 +- lustre/ldlm/Makefile.am | 2 +- lustre/ldlm/ldlm_lock.c | 32 +++-- lustre/ldlm/ldlm_lockd.c | 221 ++++++++++++++++++++--------- lustre/ldlm/ldlm_request.c | 143 +++++++++++++++++++ lustre/ldlm/ldlm_resource.c | 17 ++- lustre/ldlm/ldlm_test.c | 63 ++++++-- lustre/llite/dir.c | 4 +- lustre/llite/file.c | 8 +- lustre/llite/namei.c | 15 +- lustre/llite/super.c | 9 +- lustre/mdc/mdc_reint.c | 39 ++--- lustre/mdc/mdc_request.c | 70 ++++----- lustre/mds/handler.c | 6 +- lustre/osc/osc_request.c | 83 +++++++---- lustre/ost/ost_handler.c | 6 +- lustre/ptlrpc/client.c | 11 +- lustre/ptlrpc/niobuf.c | 48 ++----- lustre/tests/common.sh | 3 + lustre/tests/llmountcleanup.sh | 1 + 27 files changed, 649 insertions(+), 307 deletions(-) create mode 100644 lustre/ldlm/ldlm_request.c diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index c720b3a1d5..9fd38b557b 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -5,30 +5,29 @@ #ifndef _LUSTRE_DLM_H__ #define _LUSTRE_DLM_H__ -#include <linux/kp30.h> -#include <linux/list.h> +#ifdef __KERNEL__ #include <linux/obd_class.h> - -#ifdef __KERNEL__ +#include <linux/lustre_net.h> #define OBD_LDLM_DEVICENAME "ldlm" -typedef int cluster_host; -typedef int cluster_pid; +typedef int cluster_host; +typedef int cluster_pid; typedef enum { ELDLM_OK = 0, - ELDLM_BLOCK_GRANTED = 600, - ELDLM_BLOCK_CONV, - ELDLM_BLOCK_WAIT, - ELDLM_BAD_NAMESPACE, - ELDLM_LOCK_CHANGED + + ELDLM_LOCK_CHANGED = 300, + + ELDLM_NAMESPACE_EXISTS = 400, + ELDLM_BAD_NAMESPACE = 401 } ldlm_error_t; #define LDLM_FL_LOCK_CHANGED (1 << 0) -#define LDLM_FL_COMPLETION_AST (1 << 1) -#define LDLM_FL_BLOCKING_AST (1 << 2) +#define LDLM_FL_BLOCK_GRANTED (1 << 1) +#define LDLM_FL_BLOCK_CONV (1 << 2) +#define LDLM_FL_BLOCK_WAIT (1 << 3) #define L2B(c) (1 << c) @@ -108,10 +107,11 @@ struct ldlm_lock { ldlm_mode_t l_granted_mode; ldlm_lock_callback l_completion_ast; ldlm_lock_callback l_blocking_ast; - struct lustre_peer *l_peer; + struct lustre_peer l_peer; void *l_data; __u32 l_data_len; struct ldlm_extent l_extent; + struct ldlm_handle l_remote_handle; //void *l_event; //XXX cluster_host l_holder; __u32 l_version[RES_VERSION_SIZE]; @@ -188,6 +188,7 @@ int ldlm_extent_policy(struct ldlm_resource *, struct ldlm_extent *, /* ldlm_lock.c */ void ldlm_lock_free(struct ldlm_lock *lock); +void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc); ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, struct ldlm_handle *parent_lock_handle, @@ -213,9 +214,9 @@ int ldlm_test(struct obd_device *device); /* resource.c */ struct ldlm_namespace *ldlm_namespace_find(struct obd_device *, __u32 id); -struct ldlm_namespace *ldlm_namespace_new(struct obd_device *, __u32 id); +ldlm_error_t ldlm_namespace_new(struct obd_device *, __u32 id, + struct ldlm_namespace **); int ldlm_namespace_free(struct ldlm_namespace *ns); -void ldlm_resource_dump(struct ldlm_resource *res); struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, __u64 *name, __u32 type, int create); @@ -223,6 +224,27 @@ int ldlm_resource_put(struct ldlm_resource *res); void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, struct ldlm_lock *lock); void ldlm_resource_del_lock(struct ldlm_lock *lock); +void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc); +void ldlm_resource_dump(struct ldlm_resource *res); + +/* ldlm_request.c */ +int ldlm_cli_namespace_new(struct ptlrpc_client *, struct lustre_peer *, + __u32 ns_id, struct ptlrpc_request **); +int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, + __u32 ns_id, + struct ldlm_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + struct ldlm_extent *req_ex, + ldlm_mode_t mode, + int *flags, + void *data, + __u32 data_len, + struct ldlm_handle *lockh, + struct ptlrpc_request **request); +int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, + void *data, __u32 data_len); + #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 9e4d842554..067a834471 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -155,6 +155,7 @@ struct obd_ioobj { * MDS REQ RECORDS */ +/* opcodes */ #define MDS_GETATTR 1 #define MDS_OPEN 2 #define MDS_CLOSE 3 @@ -274,10 +275,11 @@ static inline void ll_inode2fid(struct ll_fid *fid, struct inode *inode) */ /* opcodes */ -#define LDLM_ENQUEUE 1 -#define LDLM_CONVERT 2 -#define LDLM_CANCEL 3 -#define LDLM_CALLBACK 4 +#define LDLM_NAMESPACE_NEW 1 +#define LDLM_ENQUEUE 2 +#define LDLM_CONVERT 3 +#define LDLM_CANCEL 4 +#define LDLM_CALLBACK 5 #define RES_NAME_SIZE 3 #define RES_VERSION_SIZE 4 @@ -302,19 +304,31 @@ struct ldlm_extent { __u64 end; }; +struct ldlm_resource_desc { + __u32 lr_ns_id; + __u32 lr_type; + __u64 lr_name[RES_NAME_SIZE]; + __u64 lr_version[RES_VERSION_SIZE]; +}; + +struct ldlm_lock_desc { + struct ldlm_resource_desc l_resource; + ldlm_mode_t l_req_mode; + ldlm_mode_t l_granted_mode; + struct ldlm_extent l_extent; + __u32 l_version[RES_VERSION_SIZE]; +}; + struct ldlm_request { - __u32 ns_id; - __u64 res_id[RES_NAME_SIZE]; - __u32 res_type; __u32 flags; - struct ldlm_extent lock_extent; - struct ldlm_handle parent_res_handle; - struct ldlm_handle parent_lock_handle; - ldlm_mode_t mode; + struct ldlm_lock_desc lock_desc; + struct ldlm_handle lock_handle1; + struct ldlm_handle lock_handle2; }; struct ldlm_reply { struct ldlm_handle lock_handle; + struct ldlm_extent lock_extent; }; /* diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 80dc7cdba8..41a222b700 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -43,7 +43,9 @@ struct ll_sb_info { unsigned long ll_cache_count; struct semaphore ll_list_mutex; struct ptlrpc_client ll_mds_client; + struct lustre_peer ll_mds_peer; struct ptlrpc_client ll_ost_client; + struct lustre_peer ll_ost_peer; }; diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 52a1d97d0a..6696006333 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -79,31 +79,30 @@ void mds_rename_pack(struct mds_rec_rename *, struct inode *srcdir, struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); /* llight/request.c */ -int mdc_getattr(struct ptlrpc_client *peer, ino_t ino, int type, int valid, - struct ptlrpc_request **); -int mdc_setattr(struct ptlrpc_client *peer, struct inode *inode, +int mdc_getattr(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, + int type, int valid, struct ptlrpc_request **); +int mdc_setattr(struct ptlrpc_client *, struct lustre_peer *, struct inode *, struct iattr *iattr, struct ptlrpc_request **); -int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags, - __u64 *fh, struct ptlrpc_request **req); -int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh, - struct ptlrpc_request **req); -int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset, - char *addr, struct ptlrpc_request **); -int mdc_create(struct ptlrpc_client *peer, +int mdc_open(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, int type, + int flags, __u64 *fh, struct ptlrpc_request **req); +int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, + int type, __u64 fh, struct ptlrpc_request **req); +int mdc_readpage(struct ptlrpc_client *, struct lustre_peer *, ino_t ino, + int type, __u64 offset, char *addr, struct ptlrpc_request **); +int mdc_create(struct ptlrpc_client *, struct lustre_peer *, struct inode *dir, const char *name, int namelen, const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid, __u32 gid, __u64 time, struct ptlrpc_request **); -int mdc_unlink(struct ptlrpc_client *peer, struct inode *dir, +int mdc_unlink(struct ptlrpc_client *, struct lustre_peer *, struct inode *dir, struct inode *child, const char *name, int namelen, struct ptlrpc_request **); -int mdc_link(struct ptlrpc_client *peer, struct dentry *src, +int mdc_link(struct ptlrpc_client *, struct lustre_peer *, struct dentry *src, struct inode *dir, const char *name, int namelen, struct ptlrpc_request **); -int mdc_rename(struct ptlrpc_client *peer, struct inode *src, +int mdc_rename(struct ptlrpc_client *, struct lustre_peer *, struct inode *src, struct inode *tgt, const char *old, int oldlen, - const char *new, int newlen, - struct ptlrpc_request **); + const char *new, int newlen, struct ptlrpc_request **); int mdc_create_client(char *uuid, struct ptlrpc_client *cl); struct mds_fs_operations { diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index c211ec1e89..bd37fce2ef 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -50,22 +50,23 @@ #define MDS_REPLY_PORTAL 11 #define MDS_BULK_PORTAL 12 -#define LDLM_REQUEST_PORTAL 13 -#define LDLM_REPLY_PORTAL 14 +#define LDLM_REQUEST_PORTAL 13 +#define LDLM_REPLY_PORTAL 14 +#define LDLM_CLI_REQUEST_PORTAL 15 +#define LDLM_CLI_REPLY_PORTAL 16 /* default rpc ring length */ #define RPC_RING_LENGTH 2 #define SVC_STOPPING 1 -#define SVC_RUNNING 2 -#define SVC_STOPPED 4 -#define SVC_KILLED 8 -#define SVC_EVENT 16 -#define SVC_LIST 32 -#define SVC_SIGNAL 64 +#define SVC_RUNNING 2 +#define SVC_STOPPED 4 +#define SVC_KILLED 8 +#define SVC_EVENT 16 +#define SVC_LIST 32 +#define SVC_SIGNAL 64 struct ptlrpc_client { - struct lustre_peer cli_server; struct obd_device *cli_obd; struct list_head cli_sending_head; struct list_head cli_sent_head; @@ -192,20 +193,20 @@ int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal); int ptl_send_buf(struct ptlrpc_request *, struct lustre_peer *, int portal); int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk); -int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, - struct ptlrpc_request *req); -int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, - struct ptlrpc_request *req); +int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req); +int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req); int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl); void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i); /* rpc/client.c */ void ptlrpc_init_client(int dev, int req_portal, int rep_portal, - struct ptlrpc_client *cl); -int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl); + struct ptlrpc_client *cl); +int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl, + struct lustre_peer *peer); int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req); int ptlrpc_queue_req(struct ptlrpc_client *peer, struct ptlrpc_request *req); -struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, +struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, + struct lustre_peer *peer, int opcode, int count, int *lengths, char **bufs); void ptlrpc_free_req(struct ptlrpc_request *request); struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index d97bdba5c0..4d41e8ab90 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -79,6 +79,8 @@ struct mds_obd { struct ldlm_obd { struct ptlrpc_service *ldlm_service; + struct ptlrpc_client *ldlm_client; + struct lustre_peer ldlm_server_peer; struct list_head ldlm_namespaces; spinlock_t ldlm_lock; @@ -126,6 +128,7 @@ struct ost_obd { struct osc_obd { struct obd_device *osc_tgt; struct ptlrpc_client *osc_client; + struct lustre_peer osc_peer; }; /* corresponds to one of the obd's */ diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 5e47065a8b..1ce67bdec5 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -24,10 +24,10 @@ #define __LINUX_CLASS_OBD_H #ifndef __KERNEL__ -#include <stdint.h> -#define __KERNEL__ -#include <linux/list.h> -#undef __KERNEL__ +# include <stdint.h> +# define __KERNEL__ +# include <linux/list.h> +# undef __KERNEL__ #else #include <asm/segment.h> #include <asm/uaccess.h> diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 94fb231e32..6e570093bb 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -69,10 +69,11 @@ extern unsigned long obd_fail_loc; #define OBD_FAIL_OST_PUNCH_NET 0x20b #define OBB_FAIL_LDLM 0x300 -#define OBD_FAIL_LDLM_ENQUEUE 0x301 -#define OBD_FAIL_LDLM_CONVERT 0x302 -#define OBD_FAIL_LDLM_CANCEL 0x303 -#define OBD_FAIL_LDLM_CALLBACK 0x304 +#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301 +#define OBD_FAIL_LDLM_ENQUEUE 0x302 +#define OBD_FAIL_LDLM_CONVERT 0x303 +#define OBD_FAIL_LDLM_CANCEL 0x304 +#define OBD_FAIL_LDLM_CALLBACK 0x305 /* preparation for a more advanced failure testbed (not functional yet) */ #define OBD_FAIL_MASK_SYS 0x0000FF00 @@ -95,6 +96,7 @@ do { \ } \ } while(0) +#include <linux/types.h> #include <linux/blkdev.h> static inline void OBD_FAIL_WRITE(int id, kdev_t dev) diff --git a/lustre/ldlm/Makefile.am b/lustre/ldlm/Makefile.am index 925775b2bb..da6256a9e1 100644 --- a/lustre/ldlm/Makefile.am +++ b/lustre/ldlm/Makefile.am @@ -9,7 +9,7 @@ modulefs_DATA = ldlm.o EXTRA_PROGRAMS = ldlm ldlm_SOURCES = ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \ -ldlm_extent.c +ldlm_extent.c ldlm_request.c include $(top_srcdir)/Rules diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index ef47d1ef0c..bddb89df81 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -84,6 +84,15 @@ void ldlm_lock_free(struct ldlm_lock *lock) kmem_cache_free(ldlm_lock_slab, lock); } +void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) +{ + ldlm_res2desc(lock->l_resource, &desc->l_resource); + desc->l_req_mode = lock->l_req_mode; + desc->l_granted_mode = lock->l_granted_mode; + memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent)); + memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version)); +} + static int ldlm_lock_compat(struct ldlm_lock *lock) { struct list_head *tmp; @@ -119,7 +128,8 @@ static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) res->lr_most_restr = lock->l_granted_mode; if (lock->l_completion_ast) - lock->l_completion_ast(lock, NULL, NULL, 0); + lock->l_completion_ast(lock, NULL, + lock->l_data, lock->l_data_len); } /* XXX: Revisit the error handling; we do not, for example, do @@ -181,10 +191,8 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, memcpy(&lock->l_extent, req_ex, sizeof(*req_ex)); lock->l_data = data; lock->l_data_len = data_len; - if ((*flags) & LDLM_FL_COMPLETION_AST) - lock->l_completion_ast = completion; - if ((*flags) & LDLM_FL_BLOCKING_AST) - lock->l_blocking_ast = blocking; + lock->l_completion_ast = completion; + lock->l_blocking_ast = blocking; ldlm_object2handle(lock, lockh); spin_lock(&res->lr_lock); @@ -192,25 +200,27 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, if (!list_empty(&res->lr_converting)) { ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); - GOTO(out, rc = -ELDLM_BLOCK_CONV); + *flags |= LDLM_FL_BLOCK_CONV; + GOTO(out, ELDLM_OK); } if (!list_empty(&res->lr_waiting)) { ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); - GOTO(out, rc = -ELDLM_BLOCK_WAIT); + *flags |= LDLM_FL_BLOCK_WAIT; + GOTO(out, ELDLM_OK); } incompat = ldlm_lock_compat(lock); if (incompat) { ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); - GOTO(out, rc = -ELDLM_BLOCK_GRANTED); + *flags |= LDLM_FL_BLOCK_GRANTED; + GOTO(out, ELDLM_OK); } ldlm_grant_lock(res, lock); - GOTO(out, rc = ELDLM_OK); - + EXIT; out: spin_unlock(&res->lr_lock); - return rc; + return ELDLM_OK; } static int ldlm_reprocess_queue(struct ldlm_resource *res, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 4f384f13a1..7995590652 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -18,54 +18,149 @@ #define DEBUG_SUBSYSTEM S_LDLM -#include <linux/obd_class.h> #include <linux/lustre_dlm.h> -#include <linux/lustre_net.h> extern kmem_cache_t *ldlm_resource_slab; extern kmem_cache_t *ldlm_lock_slab; -static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len) +static int _ldlm_namespace_new(struct obd_device *obddev, + struct ptlrpc_request *req) { - LBUG(); - return 0; + struct ldlm_request *dlm_req; + struct ldlm_namespace *ns; + int rc; + ldlm_error_t err; + ENTRY; + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + if (rc) { + CERROR("out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + + err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id, + &ns); + req->rq_status = err; + + CERROR("err = %d\n", err); + + RETURN(0); } -static int ldlm_enqueue(struct ptlrpc_request *req) +static int _ldlm_enqueue(struct ptlrpc_request *req) { struct ldlm_reply *dlm_rep; struct ldlm_request *dlm_req; + int rc, size = sizeof(*dlm_rep); ldlm_error_t err; - int rc, bufsize = sizeof(*dlm_rep); - - rc = lustre_pack_msg(1, &bufsize, NULL, &req->rq_replen, - &req->rq_repbuf); + ENTRY; + + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; if (rc) { CERROR("out of memory\n"); req->rq_status = -ENOMEM; RETURN(0); } - req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent, + sizeof(dlm_rep->lock_extent)); + err = ldlm_local_lock_enqueue(req->rq_obd, - dlm_req->ns_id, - &dlm_req->parent_lock_handle, - dlm_req->res_id, - dlm_req->res_type, - &dlm_req->lock_extent, - dlm_req->mode, + dlm_req->lock_desc.l_resource.lr_ns_id, + &dlm_req->lock_handle2, + dlm_req->lock_desc.l_resource.lr_name, + dlm_req->lock_desc.l_resource.lr_type, + &dlm_rep->lock_extent, + dlm_req->lock_desc.l_req_mode, &dlm_req->flags, - ldlm_client_callback, - ldlm_client_callback, + //ldlm_cli_callback, + NULL, + ldlm_cli_callback, lustre_msg_buf(req->rq_reqmsg, 1), req->rq_reqmsg->buflens[1], &dlm_rep->lock_handle); + if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) { + struct ldlm_lock *lock; + lock = ldlm_handle2object(&dlm_rep->lock_handle); + memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer)); + memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1, + sizeof(lock->l_remote_handle)); + } req->rq_status = err; - CERROR("local_lock_enqueue: %d\n", err); + CERROR("err = %d\n", err); + + RETURN(0); +} + +static int _ldlm_convert(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + int rc; + ENTRY; + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + if (rc) { + CERROR("out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + + req->rq_status = + ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1, + dlm_req->lock_desc.l_req_mode, + &dlm_req->flags); + RETURN(0); +} + +static int _ldlm_cancel(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + int rc; + ENTRY; + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + if (rc) { + CERROR("out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + + req->rq_status = + ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1); + RETURN(0); +} + +static int _ldlm_callback(struct ptlrpc_request *req) +{ + struct ldlm_request *dlm_req; + struct ldlm_lock *lock; + int rc; + ENTRY; + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf); + req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf; + if (rc) { + CERROR("out of memory\n"); + req->rq_status = -ENOMEM; + RETURN(0); + } + dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); + + lock = ldlm_handle2object(&dlm_req->lock_handle1); + ldlm_lock_dump(lock); + + req->rq_status = 0; RETURN(0); } @@ -90,75 +185,59 @@ static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc, } switch (req->rq_reqmsg->opc) { + case LDLM_NAMESPACE_NEW: + CDEBUG(D_INODE, "namespace_new\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0); + rc = _ldlm_namespace_new(dev, req); + break; + case LDLM_ENQUEUE: CDEBUG(D_INODE, "enqueue\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); - rc = ldlm_enqueue(req); + rc = _ldlm_enqueue(req); break; -#if 0 + case LDLM_CONVERT: CDEBUG(D_INODE, "convert\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); - rc = ldlm_convert(req); + rc = _ldlm_convert(req); break; case LDLM_CANCEL: CDEBUG(D_INODE, "cancel\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); - rc = ldlm_cancel(req); + rc = _ldlm_cancel(req); break; case LDLM_CALLBACK: CDEBUG(D_INODE, "callback\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0); - rc = ldlm_callback(req); + rc = _ldlm_callback(req); break; -#endif default: - rc = ptlrpc_error(dev, svc, req); + rc = ptlrpc_error(svc, req); RETURN(rc); } - EXIT; out: - if (rc) { - CERROR("no header\n"); - return 0; - } - - if( req->rq_status) { - ptlrpc_error(dev, svc, req); - } else { - CDEBUG(D_NET, "sending reply\n"); - ptlrpc_reply(dev, svc, req); - } - - return 0; + if (rc) + RETURN(ptlrpc_error(svc, req)); + else + RETURN(ptlrpc_reply(svc, req)); } static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg, void *uarg) { struct obd_device *obddev = conn->oc_dev; - struct ptlrpc_client cl; int err; - ENTRY; - if ( _IOC_TYPE(cmd) != IOC_LDLM_TYPE || - _IOC_NR(cmd) < IOC_LDLM_MIN_NR || - _IOC_NR(cmd) > IOC_LDLM_MAX_NR ) { + if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR || + _IOC_NR(cmd) > IOC_LDLM_MAX_NR) { CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n", - _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd)); - EXIT; - return -EINVAL; - } - - ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl); - err = ptlrpc_connect_client(-1, "ldlm", &cl); - if (err) { - CERROR("cannot create client\n"); + _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd)); RETURN(-EINVAL); } @@ -166,15 +245,13 @@ static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg, case IOC_LDLM_TEST: { err = ldlm_test(obddev); CERROR("-- done err %d\n", err); - EXIT; - break; + GOTO(out, err); } default: - err = -EINVAL; - EXIT; - break; + GOTO(out, err = -EINVAL); } + out: return err; } @@ -195,8 +272,23 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data) LBUG(); err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm"); - if (err) + if (err) { CERROR("cannot start thread\n"); + LBUG(); + } + + OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); + if (ldlm->ldlm_client == NULL) + LBUG(); + + ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + ldlm->ldlm_client); + err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client, + &ldlm->ldlm_server_peer); + if (err) { + CERROR("cannot create client\n"); + LBUG(); + } MOD_INC_USE_COUNT; RETURN(0); @@ -257,7 +349,7 @@ static int ldlm_free_all(struct obd_device *obddev) ldlm_lock(obddev); - list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { + list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { struct ldlm_namespace *ns; ns = list_entry(tmp, struct ldlm_namespace, ns_link); @@ -282,6 +374,7 @@ static int ldlm_cleanup(struct obd_device *obddev) CERROR("Request list not empty!\n"); } + OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client)); OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service)); if (ldlm_free_all(obddev)) { @@ -334,7 +427,7 @@ static void __exit ldlm_exit(void) MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>"); MODULE_DESCRIPTION("Lustre Lock Management Module v0.1"); -MODULE_LICENSE("GPL"); +MODULE_LICENSE("GPL"); module_init(ldlm_init); module_exit(ldlm_exit); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c new file mode 100644 index 0000000000..3e796ac75f --- /dev/null +++ b/lustre/ldlm/ldlm_request.c @@ -0,0 +1,143 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2002 Cluster File Systems, Inc. + * + * This code is issued under the GNU General Public License. + * See the file COPYING in this distribution + * + * by Cluster File Systems, Inc. + */ + +#define EXPORT_SYMTAB + +#define DEBUG_SUBSYSTEM S_LDLM + +#include <linux/lustre_dlm.h> + +int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct lustre_peer *peer, + __u32 ns_id, + struct ldlm_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + struct ldlm_extent *req_ex, + ldlm_mode_t mode, + int *flags, + void *data, + __u32 data_len, + struct ldlm_handle *lockh, + struct ptlrpc_request **request) +{ + struct ldlm_request *body; + struct ldlm_reply *reply; + struct ptlrpc_request *req; + char *bufs[2] = {NULL, data}; + int rc, size[2] = {sizeof(*body), data_len}; + +#if 0 + ldlm_local_lock_enqueue(obddev, ns_id, parent_lock_handle, res_id, type, + req_ex, mode, flags); +#endif + + /* FIXME: if this is a local lock, stop here. */ + + req = ptlrpc_prep_req(cl, peer, LDLM_ENQUEUE, 2, size, bufs); + if (!req) + GOTO(out, rc = -ENOMEM); + + /* Dump all of this data into the request buffer */ + body = lustre_msg_buf(req->rq_reqmsg, 0); + body->lock_desc.l_resource.lr_ns_id = ns_id; + body->lock_desc.l_resource.lr_type = type; + memcpy(body->lock_desc.l_resource.lr_name, res_id, + sizeof(body->lock_desc.l_resource.lr_name)); + + body->lock_desc.l_req_mode = mode; + if (req_ex) + memcpy(&body->lock_desc.l_extent, req_ex, + sizeof(body->lock_desc.l_extent)); + body->flags = *flags; + + /* FIXME: lock_handle1 will be the shadow handle */ + + if (parent_lock_handle) + memcpy(&body->lock_handle2, parent_lock_handle, + sizeof(body->lock_handle2)); + + /* Continue as normal. */ + size[0] = sizeof(*reply); + req->rq_replen = lustre_msg_size(1, size); + + rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_check_status(req, rc); + if (rc != ELDLM_OK) + GOTO(out, rc); + + reply = lustre_msg_buf(req->rq_repmsg, 0); + CERROR("remote handle: %p\n", + (void *)(unsigned long)reply->lock_handle.addr); + CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start, + reply->lock_extent.end); + + EXIT; + out: + *request = req; + return rc; +} + +int ldlm_cli_namespace_new(struct ptlrpc_client *cl, struct lustre_peer *peer, + __u32 ns_id, struct ptlrpc_request **request) +{ + struct ldlm_request *body; + struct ptlrpc_request *req; + int rc, size = sizeof(*body); + + req = ptlrpc_prep_req(cl, peer, LDLM_NAMESPACE_NEW, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + body->lock_desc.l_resource.lr_ns_id = ns_id; + + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_check_status(req, rc); + + EXIT; + out: + *request = req; + return rc; +} + +int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, + void *data, __u32 data_len) +{ + struct ldlm_request *body; + struct ptlrpc_request *req; + struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev; + struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client; + int rc, size[2] = {sizeof(*body), data_len}; + char *bufs[2] = {NULL, data}; + + req = ptlrpc_prep_req(cl, &lock->l_peer, LDLM_CALLBACK, 2, size, bufs); + if (!req) + GOTO(out, rc = -ENOMEM); + + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + if (new != NULL) + ldlm_lock2desc(new, &body->lock_desc); + + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(cl, req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + + EXIT; + out: + return rc; +} diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 4d8ed88b45..88f82f22d3 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -66,18 +66,20 @@ static void res_hash_init(struct ldlm_namespace *ns) ns->ns_hash = res_hash; } -struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id) +ldlm_error_t ldlm_namespace_new(struct obd_device *obddev, __u32 id, + struct ldlm_namespace **ns_out) { struct ldlm_namespace *ns; if (ldlm_namespace_find(obddev, id)) - LBUG(); + return -ELDLM_NAMESPACE_EXISTS; OBD_ALLOC(ns, sizeof(*ns)); if (!ns) LBUG(); ns->ns_id = id; + ns->ns_obddev = obddev; INIT_LIST_HEAD(&ns->ns_root_list); list_add(&ns->ns_link, &obddev->u.ldlm.ldlm_namespaces); @@ -85,7 +87,8 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obddev, __u32 id) res_hash_init(ns); atomic_set(&ns->ns_refcount, 0); - return ns; + *ns_out = ns; + return ELDLM_OK; } int ldlm_namespace_free(struct ldlm_namespace *ns) @@ -249,6 +252,14 @@ int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h) return 0; } +void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) +{ + desc->lr_ns_id = res->lr_namespace->ns_id; + desc->lr_type = res->lr_type; + memcpy(desc->lr_name, res->lr_name, sizeof(desc->lr_name)); + memcpy(desc->lr_version, res->lr_version, sizeof(desc->lr_version)); +} + void ldlm_resource_dump(struct ldlm_resource *res) { struct list_head *tmp; diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 17bbd765e6..39f4bd9391 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -41,8 +41,8 @@ int ldlm_test_basics(struct obd_device *obddev) ldlm_lock(obddev); - ns = ldlm_namespace_new(obddev, 1); - if (ns == NULL) + err = ldlm_namespace_new(obddev, 1, &ns); + if (err != ELDLM_OK) LBUG(); res = ldlm_resource_get(ns, NULL, res_id, LDLM_PLAIN, 1); @@ -50,7 +50,6 @@ int ldlm_test_basics(struct obd_device *obddev) LBUG(); /* Get a couple of read locks */ - flags = LDLM_FL_BLOCKING_AST; err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, NULL, LCK_CR, &flags, NULL, ldlm_test_callback, NULL, 0, &lockh_1); @@ -60,7 +59,9 @@ int ldlm_test_basics(struct obd_device *obddev) err = ldlm_local_lock_enqueue(obddev, 1, NULL, res_id, LDLM_PLAIN, NULL, LCK_EX, &flags, NULL, ldlm_test_callback, NULL, 0, &lockh_2); - if (err != -ELDLM_BLOCK_GRANTED) + if (err != ELDLM_OK) + LBUG(); + if (!(flags & LDLM_FL_BLOCK_GRANTED)) LBUG(); ldlm_resource_dump(res); @@ -88,8 +89,8 @@ int ldlm_test_extents(struct obd_device *obddev) ldlm_lock(obddev); - ns = ldlm_namespace_new(obddev, 2); - if (ns == NULL) + err = ldlm_namespace_new(obddev, 2, &ns); + if (err != ELDLM_OK) LBUG(); flags = 0; @@ -114,7 +115,9 @@ int ldlm_test_extents(struct obd_device *obddev) err = ldlm_local_lock_enqueue(obddev, 2, NULL, res_id, LDLM_EXTENT, &ext3, LCK_EX, &flags, NULL, NULL, NULL, 0, &ext3_h); - if (err != -ELDLM_BLOCK_GRANTED) + if (err != ELDLM_OK) + LBUG(); + if (!(flags & LDLM_FL_BLOCK_GRANTED)) LBUG(); if (flags & LDLM_FL_LOCK_CHANGED) LBUG(); @@ -141,13 +144,53 @@ int ldlm_test_extents(struct obd_device *obddev) return 0; } +static int ldlm_test_network(struct obd_device *obddev) +{ + struct ldlm_obd *ldlm = &obddev->u.ldlm; + struct ptlrpc_request *request; + + __u64 res_id[RES_NAME_SIZE] = {1, 2, 3}; + struct ldlm_extent ext = {4, 6}; + struct ldlm_handle lockh1, lockh2; + int flags = 0; + ldlm_error_t err; + + err = ldlm_cli_namespace_new(ldlm->ldlm_client, &ldlm->ldlm_server_peer, + 3, &request); + ptlrpc_free_req(request); + CERROR("ldlm_cli_namespace_new: %d\n", err); + if (err != ELDLM_OK) + GOTO(out, err); + + err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3, + NULL, res_id, LDLM_EXTENT, &ext, LCK_PR, &flags, + NULL, 0, &lockh1, &request); + ptlrpc_free_req(request); + CERROR("ldlm_cli_enqueue: %d\n", err); + + flags = 0; + err = ldlm_cli_enqueue(ldlm->ldlm_client, &ldlm->ldlm_server_peer, 3, + NULL, res_id, LDLM_EXTENT, &ext, LCK_EX, &flags, + NULL, 0, &lockh2, &request); + ptlrpc_free_req(request); + CERROR("ldlm_cli_enqueue: %d\n", err); + + EXIT; + out: + return err; +} + int ldlm_test(struct obd_device *obddev) { - int rc; + int rc; rc = ldlm_test_basics(obddev); - if (rc) + if (rc) RETURN(rc); rc = ldlm_test_extents(obddev); - RETURN(rc); + if (rc) + RETURN(rc); + + rc = ldlm_test_network(obddev); + RETURN(rc); } diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 93d17d717d..2a645fd679 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -80,8 +80,8 @@ static int ll_dir_readpage(struct file *file, struct page *page) offset = page->index << PAGE_SHIFT; buf = kmap(page); - rc = mdc_readpage(&sbi->ll_mds_client, inode->i_ino, S_IFDIR, offset, - buf, &request); + rc = mdc_readpage(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino, + S_IFDIR, offset, buf, &request); kunmap(page); ptlrpc_free_req(request); EXIT; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 2eb2829ada..25f1444fa8 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -62,8 +62,8 @@ static int ll_file_open(struct inode *inode, struct file *file) GOTO(out, rc = -ENOMEM); memset(fd, 0, sizeof(*fd)); - rc = mdc_open(&sbi->ll_mds_client, inode->i_ino, S_IFREG, - file->f_flags, &fd->fd_mdshandle, &req); + rc = mdc_open(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino, + S_IFREG, file->f_flags, &fd->fd_mdshandle, &req); if (!fd->fd_mdshandle) CERROR("mdc_open didn't assign fd_mdshandle\n"); @@ -134,8 +134,8 @@ static int ll_file_release(struct inode *inode, struct file *file) rc = -EIO; } - rc = mdc_close(&sbi->ll_mds_client, inode->i_ino, S_IFREG, - fd->fd_mdshandle, &req); + rc = mdc_close(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode->i_ino, + S_IFREG, fd->fd_mdshandle, &req); ptlrpc_free_req(req); if (rc) { if (rc > 0) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index ce2cdaadba..5d6fb60e3b 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -107,7 +107,7 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry) if (!ino) goto negative; - err = mdc_getattr(&sbi->ll_mds_client, ino, type, + err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, ino, type, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request); if (err) { CERROR("failure %d inode %ld\n", err, ino); @@ -141,8 +141,9 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, ENTRY; - err = mdc_create(&sbi->ll_mds_client, dir, name, namelen, tgt, tgtlen, - mode, id, current->uid, current->gid, time, &request); + err = mdc_create(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, name, + namelen, tgt, tgtlen, mode, id, current->uid, + current->gid, time, &request); if (err) { inode = ERR_PTR(err); GOTO(out, err); @@ -190,7 +191,8 @@ int ll_mdc_unlink(struct inode *dir, struct inode *child, ENTRY; - err = mdc_unlink(&sbi->ll_mds_client, dir, child, name, len, &request); + err = mdc_unlink(&sbi->ll_mds_client, &sbi->ll_mds_peer, dir, child, + name, len, &request); ptlrpc_free_req(request); EXIT; @@ -206,7 +208,8 @@ int ll_mdc_link(struct dentry *src, struct inode *dir, ENTRY; - err = mdc_link(&sbi->ll_mds_client, src, dir, name, len, &request); + err = mdc_link(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, dir, name, + len, &request); ptlrpc_free_req(request); EXIT; @@ -222,7 +225,7 @@ int ll_mdc_rename(struct inode *src, struct inode *tgt, ENTRY; - err = mdc_rename(&sbi->ll_mds_client, src, tgt, + err = mdc_rename(&sbi->ll_mds_client, &sbi->ll_mds_peer, src, tgt, old->d_name.name, old->d_name.len, new->d_name.name, new->d_name.len, &request); ptlrpc_free_req(request); diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 3e5a65dcea..50bb08f174 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -136,7 +136,8 @@ static struct super_block * ll_read_super(struct super_block *sb, /* the first parameter should become an mds device no */ ptlrpc_init_client(-1, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &sbi->ll_mds_client); - err = ptlrpc_connect_client(-1, "mds", &sbi->ll_mds_client); + err = ptlrpc_connect_client("mds", &sbi->ll_mds_client, + &sbi->ll_mds_peer); if (err) { CERROR("cannot find MDS\n"); GOTO(out_disc, sb = NULL); @@ -152,7 +153,8 @@ static struct super_block * ll_read_super(struct super_block *sb, sb->s_op = &ll_super_operations; /* make root inode */ - err = mdc_getattr(&sbi->ll_mds_client, sbi->ll_rootino, S_IFDIR, + err = mdc_getattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, + sbi->ll_rootino, S_IFDIR, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &request); if (err) { CERROR("mdc_getattr failed for root %d\n", err); @@ -260,7 +262,8 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) /* change incore inode */ ll_attr2inode(inode, attr, do_trunc); - err = mdc_setattr(&sbi->ll_mds_client, inode, attr, &request); + err = mdc_setattr(&sbi->ll_mds_client, &sbi->ll_mds_peer, inode, attr, + &request); if (err) CERROR("mdc_setattr fails (%d)\n", err); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index e2e062e374..58eb55a9e8 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -43,15 +43,16 @@ static int mdc_reint(struct ptlrpc_client *cl, struct ptlrpc_request *request) return rc; } -int mdc_setattr(struct ptlrpc_client *cl, struct inode *inode, - struct iattr *iattr, struct ptlrpc_request **request) +int mdc_setattr(struct ptlrpc_client *cl, struct lustre_peer *peer, + struct inode *inode, struct iattr *iattr, + struct ptlrpc_request **request) { struct mds_rec_setattr *rec; struct ptlrpc_request *req; int rc, size = sizeof(*rec); ENTRY; - req = ptlrpc_prep_req(cl, MDS_REINT, 1, &size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_REINT, 1, &size, NULL); if (!req) RETURN(-ENOMEM); @@ -67,10 +68,10 @@ int mdc_setattr(struct ptlrpc_client *cl, struct inode *inode, RETURN(rc); } -int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name, - int namelen, const char *tgt, int tgtlen, int mode, __u64 id, - __u32 uid, __u32 gid, __u64 time, - struct ptlrpc_request **request) +int mdc_create(struct ptlrpc_client *cl, struct lustre_peer *peer, + struct inode *dir, const char *name, int namelen, + const char *tgt, int tgtlen, int mode, __u64 id, __u32 uid, + __u32 gid, __u64 time, struct ptlrpc_request **request) { struct mds_rec_create *rec; struct ptlrpc_request *req; @@ -78,7 +79,7 @@ int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, MDS_REINT, 3, size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL); if (!req) RETURN(-ENOMEM); @@ -102,9 +103,9 @@ int mdc_create(struct ptlrpc_client *cl, struct inode *dir, const char *name, RETURN(rc); } -int mdc_unlink(struct ptlrpc_client *cl, struct inode *dir, - struct inode *child, const char *name, int namelen, - struct ptlrpc_request **request) +int mdc_unlink(struct ptlrpc_client *cl, struct lustre_peer *peer, + struct inode *dir, struct inode *child, const char *name, + int namelen, struct ptlrpc_request **request) { struct mds_rec_unlink *rec; struct ptlrpc_request *req; @@ -112,7 +113,7 @@ int mdc_unlink(struct ptlrpc_client *cl, struct inode *dir, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, MDS_REINT, 2, size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL); if (!req) RETURN(-ENOMEM); @@ -131,8 +132,9 @@ int mdc_unlink(struct ptlrpc_client *cl, struct inode *dir, RETURN(rc); } -int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir, - const char *name, int namelen, struct ptlrpc_request **request) +int mdc_link(struct ptlrpc_client *cl, struct lustre_peer *peer, + struct dentry *src, struct inode *dir, const char *name, + int namelen, struct ptlrpc_request **request) { struct mds_rec_link *rec; struct ptlrpc_request *req; @@ -140,7 +142,7 @@ int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, MDS_REINT, 2, size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_REINT, 2, size, NULL); if (!req) RETURN(-ENOMEM); @@ -159,8 +161,9 @@ int mdc_link(struct ptlrpc_client *cl, struct dentry *src, struct inode *dir, RETURN(rc); } -int mdc_rename(struct ptlrpc_client *cl, struct inode *src, struct inode *tgt, - const char *old, int oldlen, const char *new, int newlen, +int mdc_rename(struct ptlrpc_client *cl, struct lustre_peer *peer, + struct inode *src, struct inode *tgt, const char *old, + int oldlen, const char *new, int newlen, struct ptlrpc_request **request) { struct mds_rec_rename *rec; @@ -169,7 +172,7 @@ int mdc_rename(struct ptlrpc_client *cl, struct inode *src, struct inode *tgt, char *tmp; ENTRY; - req = ptlrpc_prep_req(cl, MDS_REINT, 3, size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_REINT, 3, size, NULL); if (!req) RETURN(-ENOMEM); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index ec567e894d..b2ee0cee6e 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -22,47 +22,26 @@ #define EXPORT_SYMTAB -#include <linux/config.h> #include <linux/module.h> -#include <linux/kernel.h> -#include <linux/mm.h> -#include <linux/string.h> -#include <linux/stat.h> -#include <linux/errno.h> -#include <linux/locks.h> -#include <linux/unistd.h> - -#include <asm/system.h> -#include <asm/uaccess.h> -#include <linux/module.h> - -#include <linux/fs.h> -#include <linux/stat.h> -#include <asm/uaccess.h> -#include <asm/segment.h> #include <linux/miscdevice.h> #define DEBUG_SUBSYSTEM S_MDC -#include <linux/obd_support.h> -#include <linux/obd_class.h> -#include <linux/lustre_lib.h> -#include <linux/lustre_idl.h> #include <linux/lustre_mds.h> #define REQUEST_MINOR 244 extern int mds_queue_req(struct ptlrpc_request *); -int mdc_connect(struct ptlrpc_client *cl, ino_t ino, int type, int valid, - struct ptlrpc_request **request) +int mdc_connect(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, + int type, int valid, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mds_body *body; int rc, size = sizeof(*body); ENTRY; - req = ptlrpc_prep_req(cl, MDS_GETATTR, 1, &size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -88,15 +67,15 @@ int mdc_connect(struct ptlrpc_client *cl, ino_t ino, int type, int valid, } -int mdc_getattr(struct ptlrpc_client *cl, ino_t ino, int type, int valid, - struct ptlrpc_request **request) +int mdc_getattr(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, + int type, int valid, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct mds_body *body; int rc, size = sizeof(*body); ENTRY; - req = ptlrpc_prep_req(cl, MDS_GETATTR, 1, &size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_GETATTR, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -121,14 +100,14 @@ int mdc_getattr(struct ptlrpc_client *cl, ino_t ino, int type, int valid, return rc; } -int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags, - __u64 *fh, struct ptlrpc_request **request) +int mdc_open(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, + int type, int flags, __u64 *fh, struct ptlrpc_request **request) { struct mds_body *body; int rc, size = sizeof(*body); struct ptlrpc_request *req; - req = ptlrpc_prep_req(cl, MDS_OPEN, 1, &size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_OPEN, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -153,14 +132,14 @@ int mdc_open(struct ptlrpc_client *cl, ino_t ino, int type, int flags, return rc; } -int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh, - struct ptlrpc_request **request) +int mdc_close(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, + int type, __u64 fh, struct ptlrpc_request **request) { struct mds_body *body; int rc, size = sizeof(*body); struct ptlrpc_request *req; - req = ptlrpc_prep_req(cl, MDS_CLOSE, 1, &size, NULL); + req = ptlrpc_prep_req(cl, peer, MDS_CLOSE, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -179,8 +158,9 @@ int mdc_close(struct ptlrpc_client *cl, ino_t ino, int type, __u64 fh, return rc; } -int mdc_readpage(struct ptlrpc_client *cl, ino_t ino, int type, __u64 offset, - char *addr, struct ptlrpc_request **request) +int mdc_readpage(struct ptlrpc_client *cl, struct lustre_peer *peer, ino_t ino, + int type, __u64 offset, char *addr, + struct ptlrpc_request **request) { struct ptlrpc_request *req = NULL; struct ptlrpc_bulk_desc *bulk = NULL; @@ -193,14 +173,14 @@ int mdc_readpage(struct ptlrpc_client *cl, ino_t ino, int type, __u64 offset, CDEBUG(D_INODE, "inode: %ld\n", ino); - bulk = ptlrpc_prep_bulk(&cl->cli_server); + bulk = ptlrpc_prep_bulk(peer); if (bulk == NULL) { CERROR("%s: cannot init bulk desc\n", __FUNCTION__); rc = -ENOMEM; goto out; } - req = ptlrpc_prep_req(cl, MDS_READPAGE, 2, size, bufs); + req = ptlrpc_prep_req(cl, peer, MDS_READPAGE, 2, size, bufs); if (!req) GOTO(out, rc = -ENOMEM); @@ -244,6 +224,7 @@ static int request_ioctl(struct inode *inode, struct file *file, { int err; struct ptlrpc_client cl; + struct lustre_peer peer; struct ptlrpc_request *request; ENTRY; @@ -260,7 +241,7 @@ static int request_ioctl(struct inode *inode, struct file *file, } ptlrpc_init_client(-1, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl); - err = ptlrpc_connect_client(-1, "mds", &cl); + err = ptlrpc_connect_client("mds", &cl, &peer); if (err) { CERROR("cannot create client\n"); RETURN(-EINVAL); @@ -269,7 +250,7 @@ static int request_ioctl(struct inode *inode, struct file *file, switch (cmd) { case IOC_REQUEST_GETATTR: { CERROR("-- getting attr for ino %lu\n", arg); - err = mdc_getattr(&cl, arg, S_IFDIR, ~0, &request); + err = mdc_getattr(&cl, &peer, arg, S_IFDIR, ~0, &request); CERROR("-- done err %d\n", err); GOTO(out, err); @@ -283,7 +264,7 @@ static int request_ioctl(struct inode *inode, struct file *file, break; } CERROR("-- readpage 0 for ino %lu\n", arg); - err = mdc_readpage(&cl, arg, S_IFDIR, 0, buf, &request); + err = mdc_readpage(&cl, &peer, arg, S_IFDIR, 0, buf, &request); CERROR("-- done err %d\n", err); OBD_FREE(buf, PAGE_SIZE); @@ -300,7 +281,7 @@ static int request_ioctl(struct inode *inode, struct file *file, iattr.ia_atime = 0; iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - err = mdc_setattr(&cl, &inode, &iattr, &request); + err = mdc_setattr(&cl, &peer, &inode, &iattr, &request); CERROR("-- done err %d\n", err); GOTO(out, err); @@ -316,7 +297,7 @@ static int request_ioctl(struct inode *inode, struct file *file, iattr.ia_atime = 0; iattr.ia_valid = ATTR_MODE | ATTR_ATIME; - err = mdc_create(&cl, &inode, + err = mdc_create(&cl, &peer, &inode, "foofile", strlen("foofile"), NULL, 0, 0100707, 47114711, 11, 47, 0, &request); @@ -329,7 +310,8 @@ static int request_ioctl(struct inode *inode, struct file *file, __u64 fh, ino; copy_from_user(&ino, (__u64 *)arg, sizeof(ino)); CERROR("-- opening ino %llu\n", ino); - err = mdc_open(&cl, ino, S_IFDIR, O_RDONLY, &fh, &request); + err = mdc_open(&cl, &peer, ino, S_IFDIR, O_RDONLY, &fh, + &request); copy_to_user((__u64 *)arg, &fh, sizeof(fh)); CERROR("-- done err %d (fh=%Lu)\n", err, fh); @@ -338,7 +320,7 @@ static int request_ioctl(struct inode *inode, struct file *file, case IOC_REQUEST_CLOSE: { CERROR("-- closing ino 2, filehandle %lu\n", arg); - err = mdc_close(&cl, 2, S_IFDIR, arg, &request); + err = mdc_close(&cl, &peer, 2, S_IFDIR, arg, &request); CERROR("-- done err %d\n", err); GOTO(out, err); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 695a63f1d3..14d670e8d1 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -403,17 +403,17 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc, break; default: - rc = ptlrpc_error(dev, svc, req); + rc = ptlrpc_error(svc, req); RETURN(rc); } EXIT; out: if (rc) { - ptlrpc_error(dev, svc, req); + ptlrpc_error(svc, req); } else { CDEBUG(D_NET, "sending reply\n"); - ptlrpc_reply(dev, svc, req); + ptlrpc_reply(svc, req); } return 0; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index fb2c6da975..2928d3c6d8 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -44,21 +44,25 @@ #include <linux/lustre_net.h> #include <linux/obd_ost.h> -struct ptlrpc_client *osc_con2cl(struct obd_conn *conn) +static void osc_con2cl(struct obd_conn *conn, struct ptlrpc_client **cl, + struct lustre_peer **peer) { struct osc_obd *osc = &conn->oc_dev->u.osc; - return osc->osc_client; + *cl = osc->osc_client; + *peer = &osc->osc_peer; } static int osc_connect(struct obd_conn *conn) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(cl, OST_CONNECT, 0, NULL, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_CONNECT, 0, NULL, NULL); if (!request) RETURN(-ENOMEM); @@ -81,12 +85,14 @@ static int osc_connect(struct obd_conn *conn) static int osc_disconnect(struct obd_conn *conn) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(cl, OST_DISCONNECT, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_DISCONNECT, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -105,12 +111,14 @@ static int osc_disconnect(struct obd_conn *conn) static int osc_getattr(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(cl, OST_GETATTR, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_GETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -139,12 +147,14 @@ static int osc_getattr(struct obd_conn *conn, struct obdo *oa) static int osc_open(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(cl, OST_OPEN, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_OPEN, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -174,12 +184,14 @@ static int osc_open(struct obd_conn *conn, struct obdo *oa) static int osc_close(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(cl, OST_CLOSE, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_CLOSE, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -207,12 +219,14 @@ static int osc_close(struct obd_conn *conn, struct obdo *oa) static int osc_setattr(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(cl, OST_SETATTR, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_SETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -233,7 +247,8 @@ static int osc_setattr(struct obd_conn *conn, struct obdo *oa) static int osc_create(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -242,7 +257,8 @@ static int osc_create(struct obd_conn *conn, struct obdo *oa) CERROR("oa NULL\n"); RETURN(-EINVAL); } - request = ptlrpc_prep_req(cl, OST_CREATE, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_CREATE, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -270,7 +286,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, obd_off offset) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -279,7 +296,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, CERROR("oa NULL\n"); RETURN(-EINVAL); } - request = ptlrpc_prep_req(cl, OST_PUNCH, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_PUNCH, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -308,7 +326,8 @@ static int osc_punch(struct obd_conn *conn, struct obdo *oa, obd_size count, static int osc_destroy(struct obd_conn *conn, struct obdo *oa) { struct ptlrpc_request *request; - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ost_body *body; int rc, size = sizeof(*body); ENTRY; @@ -317,7 +336,8 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) CERROR("oa NULL\n"); RETURN(-EINVAL); } - request = ptlrpc_prep_req(cl, OST_DESTROY, 1, &size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_DESTROY, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -344,7 +364,10 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct niobuf *dst, struct niobuf *src) { - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; + + osc_con2cl(conn, &cl, &peer); if (cl->cli_obd) { /* local sendpage */ @@ -354,7 +377,7 @@ int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, struct ptlrpc_bulk_desc *bulk; int rc; - bulk = ptlrpc_prep_bulk(&cl->cli_server); + bulk = ptlrpc_prep_bulk(peer); if (bulk == NULL) return -ENOMEM; @@ -386,7 +409,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, obd_size *count, obd_off *offset, obd_flag *flags) { - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ptlrpc_request *request; struct ost_body *body; struct obd_ioobj ioo; @@ -406,7 +430,8 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, if (bulk == NULL) RETURN(-ENOMEM); - request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL); if (!request) GOTO(out, rc = -ENOMEM); @@ -418,7 +443,7 @@ int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, for (pages = 0, i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++, pages++) { - bulk[pages] = ptlrpc_prep_bulk(&cl->cli_server); + bulk[pages] = ptlrpc_prep_bulk(peer); if (bulk[pages] == NULL) GOTO(out, rc = -ENOMEM); @@ -465,7 +490,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, obd_count *oa_bufs, struct page **buf, obd_size *count, obd_off *offset, obd_flag *flags) { - struct ptlrpc_client *cl = osc_con2cl(conn); + struct ptlrpc_client *cl; + struct lustre_peer *peer; struct ptlrpc_request *request; struct obd_ioobj ioo; struct ost_body *body; @@ -484,7 +510,8 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, if (!src) RETURN(-ENOMEM); - request = ptlrpc_prep_req(cl, OST_BRW, 3, size, NULL); + osc_con2cl(conn, &cl, &peer); + request = ptlrpc_prep_req(cl, peer, OST_BRW, 3, size, NULL); if (!request) RETURN(-ENOMEM); body = lustre_msg_buf(request->rq_reqmsg, 0); @@ -565,7 +592,7 @@ static int osc_setup(struct obd_device *obddev, obd_count len, ptlrpc_init_client(dev, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, osc->osc_client); - rc = ptlrpc_connect_client(dev, "ost", osc->osc_client); + rc = ptlrpc_connect_client("ost", osc->osc_client, &osc->osc_peer); if (rc == 0) MOD_INC_USE_COUNT; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index b5f57cecc7..f543343a72 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -563,7 +563,7 @@ static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc, break; default: req->rq_status = -ENOTSUPP; - rc = ptlrpc_error(obddev, svc, req); + rc = ptlrpc_error(svc, req); RETURN(rc); } @@ -572,10 +572,10 @@ out: //req->rq_status = rc; if (rc) { CERROR("ost: processing error %d\n", rc); - ptlrpc_error(obddev, svc, req); + ptlrpc_error(svc, req); } else { CDEBUG(D_INODE, "sending reply\n"); - ptlrpc_reply(obddev, svc, req); + ptlrpc_reply(svc, req); } return 0; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index a1322ac97d..73d1324718 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -34,7 +34,7 @@ #include <linux/lustre_net.h> void ptlrpc_init_client(int dev, int req_portal, int rep_portal, - struct ptlrpc_client *cl) + struct ptlrpc_client *cl) { memset(cl, 0, sizeof(*cl)); spin_lock_init(&cl->cli_lock); @@ -50,12 +50,13 @@ void ptlrpc_init_client(int dev, int req_portal, int rep_portal, sema_init(&cl->cli_rpc_sem, 32); } -int ptlrpc_connect_client(int dev, char *uuid, struct ptlrpc_client *cl) +int ptlrpc_connect_client(char *uuid, struct ptlrpc_client *cl, + struct lustre_peer *peer) { int err; cl->cli_epoch++; - err = kportal_uuid_to_peer(uuid, &cl->cli_server); + err = kportal_uuid_to_peer(uuid, peer); if (err != 0) CERROR("cannot find peer %s!\n", uuid); @@ -75,7 +76,8 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) return bulk; } -struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, +struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, + struct lustre_peer *peer, int opcode, int count, int *lengths, char **bufs) { struct ptlrpc_request *request; @@ -99,6 +101,7 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, RETURN(NULL); } request->rq_type = PTL_RPC_REQUEST; + memcpy(&request->rq_peer, peer, sizeof(*peer)); request->rq_reqmsg = (struct lustre_msg *)request->rq_reqbuf; request->rq_reqmsg->opc = HTON__u32(opcode); request->rq_reqmsg->xid = HTON__u32(request->rq_xid); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 7f68332d28..90c7f51aa1 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -209,43 +209,17 @@ int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk) return rc; } -int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, - struct ptlrpc_request *req) +int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req) { - struct ptlrpc_request *clnt_req = req->rq_reply_handle; - ENTRY; - - if (req->rq_reply_handle == NULL) { - /* This is a request that came from the network via portals. */ - - /* FIXME: we need to increment the count of handled events */ - req->rq_type = PTL_RPC_REPLY; - req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid); - req->rq_repmsg->status = HTON__u32(req->rq_status); - req->rq_reqmsg->type = HTON__u32(req->rq_type); - ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal); - } else { - /* This is a local request that came from another thread. */ - - /* move the reply to the client */ - clnt_req->rq_replen = req->rq_replen; - clnt_req->rq_repbuf = req->rq_repbuf; - req->rq_repbuf = NULL; - req->rq_replen = 0; - - /* free the request buffer */ - OBD_FREE(req->rq_reqbuf, req->rq_reqlen); - req->rq_reqbuf = NULL; - - /* wake up the client */ - wake_up_interruptible(&clnt_req->rq_wait_for_rep); - } - - RETURN(0); + /* FIXME: we need to increment the count of handled events */ + req->rq_type = PTL_RPC_REPLY; + req->rq_repmsg->xid = HTON__u32(req->rq_reqmsg->xid); + req->rq_repmsg->status = HTON__u32(req->rq_status); + req->rq_reqmsg->type = HTON__u32(req->rq_type); + return ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal); } -int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, - struct ptlrpc_request *req) +int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req) { int rc; ENTRY; @@ -262,7 +236,7 @@ int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, req->rq_repmsg->type = HTON__u32(PTL_RPC_ERR); - rc = ptlrpc_reply(obddev, svc, req); + rc = ptlrpc_reply(svc, req); RETURN(rc); } @@ -296,7 +270,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl) down(&cl->cli_rpc_sem); - rc = PtlMEAttach(cl->cli_server.peer_ni, request->rq_reply_portal, + rc = PtlMEAttach(request->rq_peer.peer_ni, request->rq_reply_portal, local_id, request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &request->rq_reply_me_h); if (rc != PTL_OK) { @@ -325,7 +299,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct ptlrpc_client *cl) request->rq_replen, request->rq_xid, request->rq_reply_portal); list_add(&request->rq_list, &cl->cli_sending_head); - rc = ptl_send_buf(request, &cl->cli_server, request->rq_req_portal); + rc = ptl_send_buf(request, &request->rq_peer, request->rq_req_portal); RETURN(rc); cleanup2: diff --git a/lustre/tests/common.sh b/lustre/tests/common.sh index 9c398a1243..fd4a52940f 100644 --- a/lustre/tests/common.sh +++ b/lustre/tests/common.sh @@ -134,6 +134,7 @@ setup_portals() { add_uuid self add_uuid mds add_uuid ost + add_uuid ldlm quit EOF } @@ -167,4 +168,6 @@ setup_ldlm() { insmod $LUSTRE/ldlm/ldlm.o || exit -1 list_mods + echo "Press Enter to continue" + read } diff --git a/lustre/tests/llmountcleanup.sh b/lustre/tests/llmountcleanup.sh index a7a62cb243..196d2745ae 100755 --- a/lustre/tests/llmountcleanup.sh +++ b/lustre/tests/llmountcleanup.sh @@ -41,6 +41,7 @@ disconnect del_uuid self del_uuid mds del_uuid ost +del_uuid ldlm quit EOF -- GitLab