diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c index 2fe1aa0f1c69b5c6d40c22dcb1a6527d45f10680..35f436e444996541f5978c7be803052e1a84453e 100644 --- a/lnet/klnds/ralnd/ralnd.c +++ b/lnet/klnds/ralnd/ralnd.c @@ -37,10 +37,10 @@ kra_tunables_t kranal_tunables; #define RANAL_SYSCTL 202 static ctl_table kranal_ctl_table[] = { - {RANAL_SYSCTL_TIMEOUT, "timeout", + {RANAL_SYSCTL_TIMEOUT, "timeout", &kranal_tunables.kra_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, - {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", + {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", &kranal_tunables.kra_listener_timeout, sizeof(int), 0644, NULL, &proc_dointvec}, {RANAL_SYSCTL_BACKLOG, "backlog", @@ -49,7 +49,7 @@ static ctl_table kranal_ctl_table[] = { {RANAL_SYSCTL_PORT, "port", &kranal_tunables.kra_port, sizeof(int), 0644, NULL, kranal_listener_procint}, - {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", + {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", &kranal_tunables.kra_max_immediate, sizeof(int), 0644, NULL, &proc_dointvec}, { 0 } @@ -89,7 +89,7 @@ kranal_sock_write (struct socket *sock, void *buffer, int nob) if (rc == nob) return 0; - + if (rc >= 0) return -EAGAIN; @@ -102,7 +102,6 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) int rc; mm_segment_t oldmm = get_fs(); long ticks = timeout * HZ; - int wanted = nob; unsigned long then; struct timeval tv; @@ -145,9 +144,6 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) ticks -= jiffies - then; set_fs(oldmm); - CDEBUG(D_WARNING, "rc %d at %d/%d bytes %d/%d secs\n", - rc, wanted - nob, wanted, timeout - (int)(ticks/HZ), timeout); - if (rc < 0) return rc; @@ -233,13 +229,6 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, ptl_nid_t dstnid) rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams); LASSERT(rrc == RAP_SUCCESS); - - CDEBUG(D_WARNING,"devid %d, riparams: HID %08x FDH %08x PT %08x CC %08x\n", - connreq->racr_devid, - connreq->racr_riparams.HostId, - connreq->racr_riparams.FmaDomainHndl, - connreq->racr_riparams.PTag, - connreq->racr_riparams.CompletionCookie); } int @@ -290,7 +279,7 @@ kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout) connreq->racr_timeout, RANAL_MIN_TIMEOUT); return -EPROTO; } - + return 0; } @@ -323,16 +312,16 @@ kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn) if (conn->rac_device != newconn->rac_device) continue; - + if (loopback && newconn->rac_my_connstamp == conn->rac_peer_connstamp && newconn->rac_peer_connstamp == conn->rac_my_connstamp) continue; - + LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp); CDEBUG(D_NET, "Closing stale conn nid:"LPX64 - " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, + " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, conn->rac_peer_connstamp, newconn->rac_peer_connstamp); count++; @@ -350,7 +339,7 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn) int loopback; loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid; - + list_for_each(tmp, &peer->rap_conns) { conn = list_entry(tmp, kra_conn_t, rac_list); @@ -376,12 +365,12 @@ kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn) /* 'newconn' is an earlier connection from 'peer'!!! */ if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp) return 2; - + /* 'conn' is an earlier connection from 'peer': it will be * removed when we cull stale conns later on... */ if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp) continue; - + /* 'newconn' has the SAME connection stamp; 'peer' isn't * playing the game... */ return 3; @@ -402,7 +391,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn) do { /* allocate a unique cqid */ conn->rac_cqid = kranal_data.kra_next_cqid++; } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL); - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } @@ -448,7 +437,7 @@ kranal_create_conn(kra_conn_t **connp, kra_device_t *dev) } void -kranal_destroy_conn(kra_conn_t *conn) +kranal_destroy_conn(kra_conn_t *conn) { RAP_RETURN rrc; @@ -512,9 +501,11 @@ kranal_close_conn_locked (kra_conn_t *conn, int error) /* Non-persistent peer with no more conns... */ kranal_unlink_peer_locked(peer); } - + /* Reset RX timeout to ensure we wait for an incoming CLOSE for the - * full timeout */ + * full timeout. If we get a CLOSE we know the peer has stopped all + * RDMA. Otherwise if we wait for the full timeout we can also be sure + * all RDMA has stopped. */ conn->rac_last_rx = jiffies; mb(); @@ -528,36 +519,29 @@ void kranal_close_conn (kra_conn_t *conn, int error) { unsigned long flags; - + write_lock_irqsave(&kranal_data.kra_global_lock, flags); - + if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, error); - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); } int -kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, +kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, __u32 peer_ip, int peer_port) { RAP_RETURN rrc; - CDEBUG(D_WARNING,"devid %d, riparams: HID %08x FDH %08x PT %08x CC %08x\n", - conn->rac_device->rad_id, - connreq->racr_riparams.HostId, - connreq->racr_riparams.FmaDomainHndl, - connreq->racr_riparams.PTag, - connreq->racr_riparams.CompletionCookie); - rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams); if (rrc != RAP_SUCCESS) { - CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", + CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rrc); return -EPROTO; } - + conn->rac_peerstamp = connreq->racr_peerstamp; conn->rac_peer_connstamp = connreq->racr_connstamp; conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout); @@ -566,7 +550,7 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, } int -kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, +kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, ptl_nid_t *dst_nidp, kra_conn_t **connp) { struct sockaddr_in addr; @@ -580,8 +564,6 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, int len; int i; - CDEBUG(D_WARNING,"!!\n"); - len = sizeof(addr); rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2); if (rc != 0) { @@ -592,26 +574,20 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, peer_ip = ntohl(addr.sin_addr.s_addr); peer_port = ntohs(addr.sin_port); - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - if (peer_port >= 1024) { CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n", HIPQUAD(peer_ip), peer_port); return -ECONNREFUSED; } - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - - rc = kranal_recv_connreq(sock, &rx_connreq, + rc = kranal_recv_connreq(sock, &rx_connreq, kranal_tunables.kra_listener_timeout); if (rc != 0) { - CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); return rc; } - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - for (i = 0;;i++) { if (i == kranal_data.kra_ndevs) { CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n", @@ -623,34 +599,26 @@ kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, break; } - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - rc = kranal_create_conn(&conn, dev); if (rc != 0) return rc; - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid); rc = kranal_sock_write(sock, &tx_connreq, sizeof(tx_connreq)); if (rc != 0) { - CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer_ip), peer_port, rc); kranal_conn_decref(conn); return rc; } - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port); if (rc != 0) { kranal_conn_decref(conn); return rc; } - CDEBUG(D_WARNING,"%u.%u.%u.%u\n", HIPQUAD(peer_ip)); - *connp = conn; *src_nidp = rx_connreq.racr_srcnid; *dst_nidp = rx_connreq.racr_dstnid; @@ -668,8 +636,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) for (port = 1023; port >= 512; port--) { - memset(&locaddr, 0, sizeof(locaddr)); - locaddr.sin_family = AF_INET; + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; locaddr.sin_port = htons(port); locaddr.sin_addr.s_addr = htonl(INADDR_ANY); @@ -686,7 +654,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) (struct sockaddr *)&locaddr, sizeof(locaddr)); if (rc != 0) { sock_release(sock); - + if (rc == -EADDRINUSE) { CDEBUG(D_NET, "Port %d already in use\n", port); continue; @@ -703,7 +671,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) *sockp = sock; return 0; } - + sock_release(sock); if (rc != -EADDRNOTAVAIL) { @@ -711,8 +679,8 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) port, HIPQUAD(peer->rap_ip), peer->rap_port, rc); return rc; } - - CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", + + CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", port, HIPQUAD(peer->rap_ip), peer->rap_port); } @@ -722,7 +690,7 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp) int -kranal_active_conn_handshake(kra_peer_t *peer, +kranal_active_conn_handshake(kra_peer_t *peer, ptl_nid_t *dst_nidp, kra_conn_t **connp) { kra_connreq_t connreq; @@ -732,8 +700,6 @@ kranal_active_conn_handshake(kra_peer_t *peer, int rc; unsigned int idx; - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); - /* spread connections over all devices using both peer NIDs to ensure * all nids use all devices */ idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid; @@ -743,45 +709,37 @@ kranal_active_conn_handshake(kra_peer_t *peer, if (rc != 0) return rc; - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); - kranal_pack_connreq(&connreq, conn, peer->rap_nid); - + rc = ranal_connect_sock(peer, &sock); if (rc != 0) goto failed_0; - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); - /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout * immediately after accepting a connection, so we connect and then * send immediately. */ rc = kranal_sock_write(sock, &connreq, sizeof(connreq)); if (rc != 0) { - CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", + CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_1; } - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); - rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout); if (rc != 0) { - CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", + CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", HIPQUAD(peer->rap_ip), peer->rap_port, rc); goto failed_1; } - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); - sock_release(sock); rc = -EPROTO; if (connreq.racr_srcnid != peer->rap_nid) { CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: " "received "LPX64" expected "LPX64"\n", - HIPQUAD(peer->rap_ip), peer->rap_port, + HIPQUAD(peer->rap_ip), peer->rap_port, connreq.racr_srcnid, peer->rap_nid); goto failed_0; } @@ -789,28 +747,24 @@ kranal_active_conn_handshake(kra_peer_t *peer, if (connreq.racr_devid != dev->rad_id) { CERROR("Unexpected device id from %u.%u.%u.%u/%d: " "received %d expected %d\n", - HIPQUAD(peer->rap_ip), peer->rap_port, + HIPQUAD(peer->rap_ip), peer->rap_port, connreq.racr_devid, dev->rad_id); goto failed_0; } - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); - - rc = kranal_set_conn_params(conn, &connreq, + rc = kranal_set_conn_params(conn, &connreq, peer->rap_ip, peer->rap_port); if (rc != 0) goto failed_0; *connp = conn; *dst_nidp = connreq.racr_dstnid; - CDEBUG(D_WARNING,LPX64"\n", peer->rap_nid); return 0; failed_1: sock_release(sock); failed_0: kranal_conn_decref(conn); - CDEBUG(D_WARNING,LPX64": %d\n", peer->rap_nid, rc); return rc; } @@ -831,7 +785,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) /* active: connd wants to connect to 'peer' */ LASSERT (peer != NULL); LASSERT (peer->rap_connecting); - + rc = kranal_active_conn_handshake(peer, &dst_nid, &conn); if (rc != 0) return rc; @@ -840,7 +794,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) if (!kranal_peer_active(peer)) { /* raced with peer getting unlinked */ - write_unlock_irqrestore(&kranal_data.kra_global_lock, + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); return -ESTALE; @@ -876,7 +830,7 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) } } - LASSERT (!new_peer == !kranal_peer_active(peer)); + LASSERT ((!new_peer) != (!kranal_peer_active(peer))); /* Refuse connection if peer thinks we are a different NID. We check * this while holding the global lock, to synch with connection @@ -910,7 +864,10 @@ kranal_conn_handshake (struct socket *sock, kra_peer_t *peer) list_add_tail(&peer->rap_list, kranal_nid2peerlist(peer_nid)); } - + + /* initialise timestamps before reaper looks at them */ + conn->rac_last_tx = conn->rac_last_rx = jiffies; + kranal_peer_addref(peer); /* +1 ref for conn */ conn->rac_peer = peer; list_add_tail(&conn->rac_list, &peer->rap_conns); @@ -959,11 +916,11 @@ kranal_connect (kra_peer_t *peer) LASSERT (peer->rap_connecting); - CDEBUG(D_WARNING,"About to handshake "LPX64"\n", peer->rap_nid); + CDEBUG(D_NET, "About to handshake "LPX64"\n", peer->rap_nid); rc = kranal_conn_handshake(NULL, peer); - CDEBUG(D_WARNING,"Done handshake "LPX64":%d \n", peer->rap_nid, rc); + CDEBUG(D_NET, "Done handshake "LPX64":%d \n", peer->rap_nid, rc); write_lock_irqsave(&kranal_data.kra_global_lock, flags); @@ -1055,7 +1012,7 @@ kranal_listener (void *arg) rc = sock->ops->listen(sock, kranal_tunables.kra_backlog); if (rc != 0) { - CERROR("Can't set listen backlog %d: %d\n", + CERROR("Can't set listen backlog %d: %d\n", kranal_tunables.kra_backlog, rc); goto out_1; } @@ -1095,7 +1052,7 @@ kranal_listener (void *arg) ras->ras_sock->type = sock->type; ras->ras_sock->ops = sock->ops; } - + set_current_state(TASK_INTERRUPTIBLE); rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK); @@ -1109,8 +1066,8 @@ kranal_listener (void *arg) if (rc == 0) { spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); - - list_add_tail(&ras->ras_list, + + list_add_tail(&ras->ras_list, &kranal_data.kra_connd_acceptq); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); @@ -1119,7 +1076,7 @@ kranal_listener (void *arg) ras = NULL; continue; } - + if (rc != -EAGAIN) { CERROR("Accept failed: %d, pausing...\n", rc); kranal_pause(HZ); @@ -1138,7 +1095,7 @@ kranal_listener (void *arg) sock_release(sock); kranal_data.kra_listener_sock = NULL; out_0: - /* set completion status and unblock thread waiting for me + /* set completion status and unblock thread waiting for me * (parent on startup failure, executioner on normal shutdown) */ kranal_data.kra_listener_shutdown = rc; up(&kranal_data.kra_listener_signal); @@ -1152,7 +1109,7 @@ kranal_start_listener (void) long pid; int rc; - CDEBUG(D_WARNING, "Starting listener\n"); + CDEBUG(D_NET, "Starting listener\n"); /* Called holding kra_nid_mutex: listener stopped */ LASSERT (kranal_data.kra_listener_sock == NULL); @@ -1170,7 +1127,7 @@ kranal_start_listener (void) rc = kranal_data.kra_listener_shutdown; LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL)); - CDEBUG(D_WARNING, "Listener %ld started OK\n", pid); + CDEBUG(D_NET, "Listener %ld started OK\n", pid); return rc; } @@ -1181,7 +1138,7 @@ kranal_stop_listener(int clear_acceptq) unsigned long flags; kra_acceptsock_t *ras; - CDEBUG(D_WARNING, "Stopping listener\n"); + CDEBUG(D_NET, "Stopping listener\n"); /* Called holding kra_nid_mutex: listener running */ LASSERT (kranal_data.kra_listener_sock != NULL); @@ -1193,11 +1150,11 @@ kranal_stop_listener(int clear_acceptq) down(&kranal_data.kra_listener_signal); LASSERT (kranal_data.kra_listener_sock == NULL); - CDEBUG(D_WARNING, "Listener stopped\n"); + CDEBUG(D_NET, "Listener stopped\n"); if (!clear_acceptq) return; - + /* Close any unhandled accepts */ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); @@ -1205,16 +1162,16 @@ kranal_stop_listener(int clear_acceptq) list_del_init(&kranal_data.kra_connd_acceptq); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); - + while (!list_empty(&zombie_accepts)) { - ras = list_entry(zombie_accepts.next, + ras = list_entry(zombie_accepts.next, kra_acceptsock_t, ras_list); list_del(&ras->ras_list); kranal_free_acceptsock(ras); } } -int +int kranal_listener_procint(ctl_table *table, int write, struct file *filp, void *buffer, size_t *lenp) { @@ -1282,7 +1239,7 @@ kranal_set_mynid(ptl_nid_t nid) kranal_data.kra_peerstamp++; ni->ni_pid.nid = nid; write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); - + /* Delete all existing peers and their connections after new * NID/connstamp set to ensure no old connections in our brave * new world. */ @@ -1397,7 +1354,7 @@ kranal_unlink_peer_locked (kra_peer_t *peer) } int -kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, +kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, int *persistencep) { kra_peer_t *peer; @@ -1665,7 +1622,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private) break; } case NAL_CMD_DEL_PEER: { - rc = kranal_del_peer(pcfg->pcfg_nid, + rc = kranal_del_peer(pcfg->pcfg_nid, /* flags == single_share */ pcfg->pcfg_flags != 0); break; @@ -1739,7 +1696,7 @@ kranal_alloc_txdescs(struct list_head *freelist, int n) PORTAL_ALLOC(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys)); if (tx->tx_phys == NULL) { - CERROR("Can't allocate %stx[%d]->tx_phys\n", + CERROR("Can't allocate %stx[%d]->tx_phys\n", isnblk ? "nblk " : "", i); PORTAL_FREE(tx, sizeof(*tx)); @@ -1818,7 +1775,7 @@ kranal_api_shutdown (nal_t *nal) { int i; unsigned long flags; - + if (nal->nal_refct != 0) { /* This module got the first ref */ PORTAL_MODULE_UNUSE; @@ -1884,9 +1841,9 @@ kranal_api_shutdown (nal_t *nal) spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); LASSERT (list_empty(&kranal_data.kra_connd_peers)); - spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); wake_up_all(&kranal_data.kra_connd_waitq); - spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); i = 2; while (atomic_read(&kranal_data.kra_nthreads) != 0) { @@ -1903,7 +1860,7 @@ kranal_api_shutdown (nal_t *nal) LASSERT (list_empty(&kranal_data.kra_peers[i])); PORTAL_FREE(kranal_data.kra_peers, - sizeof (struct list_head) * + sizeof (struct list_head) * kranal_data.kra_peer_hash_size); } @@ -1913,7 +1870,7 @@ kranal_api_shutdown (nal_t *nal) LASSERT (list_empty(&kranal_data.kra_conns[i])); PORTAL_FREE(kranal_data.kra_conns, - sizeof (struct list_head) * + sizeof (struct list_head) * kranal_data.kra_conn_hash_size); } @@ -1999,7 +1956,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, /* OK to call kranal_api_shutdown() to cleanup now */ kranal_data.kra_init = RANAL_INIT_DATA; - + kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE; PORTAL_ALLOC(kranal_data.kra_peers, sizeof(struct list_head) * kranal_data.kra_peer_hash_size); @@ -2091,7 +2048,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, return PTL_OK; failed: - kranal_api_shutdown(&kranal_api); + kranal_api_shutdown(&kranal_api); return PTL_FAIL; } @@ -2142,7 +2099,7 @@ kranal_module_init (void) return -ENODEV; } - kranal_tunables.kra_sysctl = + kranal_tunables.kra_sysctl = register_sysctl_table(kranal_top_ctl_table, 0); if (kranal_tunables.kra_sysctl == NULL) { CERROR("Can't register sysctl table\n"); diff --git a/lnet/klnds/ralnd/ralnd.h b/lnet/klnds/ralnd/ralnd.h index 7e437056100aa5120f39c833e6efcce048d4b7dd..e6602ddab9c7f98ef9d1f5a02e190e4d6aa48f32 100644 --- a/lnet/klnds/ralnd/ralnd.h +++ b/lnet/klnds/ralnd/ralnd.h @@ -77,7 +77,7 @@ #define RANAL_NTX 64 /* # tx descs */ #define RANAL_NTX_NBLK 256 /* # reserved tx descs */ -#define RANAL_FMA_CQ_SIZE 8192 /* # entries in receive CQ +#define RANAL_FMA_CQ_SIZE 8192 /* # entries in receive CQ * (overflow is a performance hit) */ #define RANAL_RESCHED 100 /* # scheduler loops before reschedule */ @@ -92,7 +92,7 @@ #define RANAL_PORT 988 /* listener's port */ #define RANAL_MAX_IMMEDIATE (2<<10) /* immediate payload breakpoint */ -typedef struct +typedef struct { int kra_timeout; /* comms timeout (seconds) */ int kra_listener_timeout; /* max time the listener can block */ @@ -117,8 +117,8 @@ typedef struct spinlock_t rad_lock; /* serialise */ void *rad_scheduler; /* scheduling thread */ } kra_device_t; - -typedef struct + +typedef struct { int kra_init; /* initialisation state */ int kra_shutdown; /* shut down? */ @@ -148,7 +148,7 @@ typedef struct long kra_new_min_timeout; /* minimum timeout on any new conn */ wait_queue_head_t kra_reaper_waitq; /* reaper sleeps here */ spinlock_t kra_reaper_lock; /* serialise */ - + struct list_head kra_connd_peers; /* peers waiting for a connection */ struct list_head kra_connd_acceptq; /* accepted sockets to handshake */ wait_queue_head_t kra_connd_waitq; /* connection daemons sleep here */ @@ -269,7 +269,7 @@ typedef struct kra_tx /* message descriptor */ struct list_head tx_list; /* queue on idle_txs/rac_sendq/rac_waitq */ struct kra_conn *tx_conn; /* owning conn */ lib_msg_t *tx_libmsg[2]; /* lib msgs to finalize on completion */ - unsigned long tx_qtime; /* when tx started to wait for something */ + unsigned long tx_qtime; /* when tx started to wait for something (jiffies) */ int tx_isnblk; /* I'm reserved for non-blocking sends */ int tx_nob; /* # bytes of payload */ int tx_buftype; /* payload buffer type */ @@ -306,7 +306,7 @@ typedef struct kra_tx /* message descriptor */ #define RANAL_TX_GETT_DONE 0x52 /* GET target about to send GET_DONE */ typedef struct kra_conn -{ +{ struct kra_peer *rac_peer; /* owning peer */ struct list_head rac_list; /* stash on peer's conn list */ struct list_head rac_hashlist; /* stash in connection hash table */ @@ -317,10 +317,10 @@ typedef struct kra_conn __u64 rac_peerstamp; /* peer's unique stamp */ __u64 rac_peer_connstamp; /* peer's unique connection stamp */ __u64 rac_my_connstamp; /* my unique connection stamp */ - unsigned long rac_last_tx; /* when I last sent an FMA message */ - unsigned long rac_last_rx; /* when I last received an FMA messages */ - long rac_keepalive; /* keepalive interval */ - long rac_timeout; /* infer peer death on (last_rx + timout > now) */ + unsigned long rac_last_tx; /* when I last sent an FMA message (jiffies) */ + unsigned long rac_last_rx; /* when I last received an FMA messages (jiffies) */ + long rac_keepalive; /* keepalive interval (seconds) */ + long rac_timeout; /* infer peer death if no rx for this many seconds */ __u32 rac_cqid; /* my completion callback id (non-unique) */ __u32 rac_tx_seq; /* tx msg sequence number */ __u32 rac_rx_seq; /* rx msg sequence number */ @@ -394,10 +394,10 @@ kranal_peer_decref(kra_peer_t *peer) } static inline struct list_head * -kranal_nid2peerlist (ptl_nid_t nid) +kranal_nid2peerlist (ptl_nid_t nid) { unsigned int hash = ((unsigned int)nid) % kranal_data.kra_peer_hash_size; - + return (&kranal_data.kra_peers[hash]); } @@ -426,27 +426,27 @@ kranal_conn_decref(kra_conn_t *conn) } static inline struct list_head * -kranal_cqid2connlist (__u32 cqid) +kranal_cqid2connlist (__u32 cqid) { unsigned int hash = cqid % kranal_data.kra_conn_hash_size; - + return (&kranal_data.kra_conns [hash]); } static inline kra_conn_t * -kranal_cqid2conn_locked (__u32 cqid) +kranal_cqid2conn_locked (__u32 cqid) { struct list_head *conns = kranal_cqid2connlist(cqid); struct list_head *tmp; kra_conn_t *conn; - + list_for_each(tmp, conns) { conn = list_entry(tmp, kra_conn_t, rac_hashlist); - + if (conn->rac_cqid == cqid) return conn; } - + return NULL; } @@ -464,8 +464,8 @@ kranal_page2phys (struct page *p) } extern void kranal_free_acceptsock (kra_acceptsock_t *ras); -extern int kranal_listener_procint (ctl_table *table, - int write, struct file *filp, +extern int kranal_listener_procint (ctl_table *table, + int write, struct file *filp, void *buffer, size_t *lenp); extern void kranal_update_reaper_timeout (long timeout); extern void kranal_tx_done (kra_tx_t *tx, int completion); @@ -484,3 +484,4 @@ extern void kranal_close_conn_locked (kra_conn_t *conn, int error); extern void kranal_terminate_conn_locked (kra_conn_t *conn); extern void kranal_connect (kra_peer_t *peer); extern int kranal_conn_handshake (struct socket *sock, kra_peer_t *peer); +extern void kranal_pause(int ticks); diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c index e805e6743342b57583d61faa54429b48393d5b99..38f1b77dd47f8e21178484f96e05254d4f79ca48 100644 --- a/lnet/klnds/ralnd/ralnd_cb.c +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -43,7 +43,9 @@ kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg) kra_device_t *dev; int i; unsigned long flags; - + + CDEBUG(D_NET, "callback for device %d\n", devid); + for (i = 0; i < kranal_data.kra_ndevs; i++) { dev = &kranal_data.kra_devices[i]; @@ -60,7 +62,7 @@ kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg) spin_unlock_irqrestore(&dev->rad_lock, flags); return; } - + CWARN("callback for unknown device %d\n", devid); } @@ -69,9 +71,9 @@ kranal_schedule_conn(kra_conn_t *conn) { kra_device_t *dev = conn->rac_device; unsigned long flags; - + spin_lock_irqsave(&dev->rad_lock, flags); - + if (!conn->rac_scheduled) { kranal_conn_addref(conn); /* +1 ref for scheduler */ conn->rac_scheduled = 1; @@ -83,11 +85,11 @@ kranal_schedule_conn(kra_conn_t *conn) } kra_tx_t * -kranal_get_idle_tx (int may_block) +kranal_get_idle_tx (int may_block) { unsigned long flags; kra_tx_t *tx = NULL; - + for (;;) { spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); @@ -132,7 +134,7 @@ kranal_get_idle_tx (int may_block) } spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); - + return tx; } @@ -159,40 +161,46 @@ kranal_new_tx_msg (int may_block, int type) } int -kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, +kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, int offset, int nob) - + { /* For now this is almost identical to kranal_setup_virt_buffer, but we * could "flatten" the payload into a single contiguous buffer ready * for sending direct over an FMA if we ever needed to. */ - LASSERT (nob > 0); - LASSERT (niov > 0); LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + LASSERT (nob >= 0); - while (offset >= iov->iov_len) { - offset -= iov->iov_len; - niov--; - iov++; + if (nob == 0) { + tx->tx_buffer = NULL; + } else { LASSERT (niov > 0); - } - if (nob > iov->iov_len - offset) { - CERROR("Can't handle multiple vaddr fragments\n"); - return -EMSGSIZE; + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); } tx->tx_buftype = RANAL_BUF_IMMEDIATE; tx->tx_nob = nob; - tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); return 0; } int -kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, +kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, int offset, int nob) - + { LASSERT (nob > 0); LASSERT (niov > 0); @@ -239,7 +247,7 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED; tx->tx_nob = nob; tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset)); - + phys->Address = kranal_page2phys(kiov->kiov_page); phys++; @@ -250,13 +258,13 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, LASSERT (nkiov > 0); if (kiov->kiov_offset != 0 || - ((resid > PAGE_SIZE) && + ((resid > PAGE_SIZE) && kiov->kiov_len < PAGE_SIZE)) { /* Can't have gaps */ CERROR("Can't make payload contiguous in I/O VM:" - "page %d, offset %d, len %d \n", - (int)(phys - tx->tx_phys), - kiov->kiov_offset, kiov->kiov_len); + "page %d, offset %d, len %d \n", + (int)(phys - tx->tx_phys), + kiov->kiov_offset, kiov->kiov_len); return -EINVAL; } @@ -276,15 +284,15 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, } static inline int -kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, +kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, struct iovec *iov, ptl_kiov_t *kiov, int offset, int nob) { LASSERT ((iov == NULL) != (kiov == NULL)); - + if (kiov != NULL) return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob); - + return kranal_setup_virt_buffer(tx, niov, iov, offset, nob); } @@ -300,13 +308,13 @@ kranal_map_buffer (kra_tx_t *tx) switch (tx->tx_buftype) { default: LBUG(); - + case RANAL_BUF_NONE: case RANAL_BUF_IMMEDIATE: case RANAL_BUF_PHYS_MAPPED: case RANAL_BUF_VIRT_MAPPED: break; - + case RANAL_BUF_PHYS_UNMAPPED: rrc = RapkRegisterPhys(dev->rad_handle, tx->tx_phys, tx->tx_phys_npages, @@ -334,13 +342,13 @@ kranal_unmap_buffer (kra_tx_t *tx) switch (tx->tx_buftype) { default: LBUG(); - + case RANAL_BUF_NONE: case RANAL_BUF_IMMEDIATE: case RANAL_BUF_PHYS_UNMAPPED: case RANAL_BUF_VIRT_UNMAPPED: break; - + case RANAL_BUF_PHYS_MAPPED: LASSERT (tx->tx_conn != NULL); dev = tx->tx_conn->rac_device; @@ -438,11 +446,11 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ - + LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ read_lock(g_lock); - + peer = kranal_find_peer_locked(nid); if (peer == NULL) { read_unlock(g_lock); @@ -456,7 +464,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) read_unlock(g_lock); return; } - + /* Making one or more connections; I'll need a write lock... */ read_unlock(g_lock); write_lock_irqsave(g_lock, flags); @@ -480,26 +488,26 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) if (!peer->rap_connecting) { LASSERT (list_empty(&peer->rap_tx_queue)); - + now = CURRENT_SECONDS; if (now < peer->rap_reconnect_time) { write_unlock_irqrestore(g_lock, flags); kranal_tx_done(tx, -EHOSTUNREACH); return; } - + peer->rap_connecting = 1; kranal_peer_addref(peer); /* extra ref for connd */ - + spin_lock(&kranal_data.kra_connd_lock); - + list_add_tail(&peer->rap_connd_list, &kranal_data.kra_connd_peers); wake_up(&kranal_data.kra_connd_waitq); - + spin_unlock(&kranal_data.kra_connd_lock); } - + /* A connection is being established; queue the message... */ list_add_tail(&tx->tx_list, &peer->rap_tx_queue); @@ -507,7 +515,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) } void -kranal_rdma(kra_tx_t *tx, int type, +kranal_rdma(kra_tx_t *tx, int type, kra_rdma_desc_t *sink, int nob, __u64 cookie) { kra_conn_t *conn = tx->tx_conn; @@ -520,7 +528,7 @@ kranal_rdma(kra_tx_t *tx, int type, /* No actual race with scheduler sending CLOSE (I'm she!) */ LASSERT (current == conn->rac_device->rad_scheduler); - + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_rdma_desc.SrcKey = tx->tx_map_key; @@ -556,6 +564,7 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) RAP_RETURN rrc; LASSERT (conn->rac_rxmsg != NULL); + CDEBUG(D_NET, "Consuming %p\n", conn); rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer, &nob_received, sizeof(kra_msg_t)); @@ -563,25 +572,26 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) conn->rac_rxmsg = NULL; - if (nob_received != nob) { - CWARN("Expected %d immediate bytes but got %d\n", - nob, nob_received); + if (nob_received < nob) { + CWARN("Incomplete immediate msg from "LPX64 + ": expected %d, got %d\n", + conn->rac_peer->rap_nid, nob, nob_received); return -EPROTO; } - + return 0; } ptl_err_t -kranal_do_send (lib_nal_t *nal, +kranal_do_send (lib_nal_t *nal, void *private, lib_msg_t *libmsg, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, + ptl_hdr_t *hdr, + int type, + ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, - struct iovec *iov, + unsigned int niov, + struct iovec *iov, ptl_kiov_t *kiov, int offset, int nob) @@ -605,7 +615,7 @@ kranal_do_send (lib_nal_t *nal, switch(type) { default: LBUG(); - + case PTL_MSG_REPLY: { /* reply's 'private' is the conn that received the GET_REQ */ conn = private; @@ -619,7 +629,7 @@ kranal_do_send (lib_nal_t *nal, } break; /* RDMA not expected */ } - + /* Incoming message consistent with immediate reply? */ if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) { CERROR("REPLY to "LPX64" bad msg type %x!!!\n", @@ -644,6 +654,9 @@ kranal_do_send (lib_nal_t *nal, kranal_rdma(tx, RANAL_MSG_GET_DONE, &conn->rac_rxmsg->ram_u.get.ragm_desc, nob, conn->rac_rxmsg->ram_u.get.ragm_cookie); + + /* flag matched by consuming rx message */ + kranal_consume_rxmsg(conn, NULL, 0); return PTL_OK; } @@ -697,10 +710,10 @@ kranal_do_send (lib_nal_t *nal, case PTL_MSG_PUT: if (kiov == NULL && /* not paged */ - nob <= RANAL_MAX_IMMEDIATE && /* small enough */ + nob <= RANAL_FMA_MAX_DATA && /* small enough */ nob <= kranal_tunables.kra_max_immediate) break; /* send IMMEDIATE */ - + tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ); if (tx == NULL) return PTL_NO_SPACE; @@ -719,11 +732,11 @@ kranal_do_send (lib_nal_t *nal, } LASSERT (kiov == NULL); - LASSERT (nob <= RANAL_MAX_IMMEDIATE); + LASSERT (nob <= RANAL_FMA_MAX_DATA); tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK || type == PTL_MSG_REPLY || - in_interrupt()), + in_interrupt()), RANAL_MSG_IMMEDIATE); if (tx == NULL) return PTL_NO_SPACE; @@ -733,7 +746,7 @@ kranal_do_send (lib_nal_t *nal, kranal_tx_done(tx, rc); return PTL_FAIL; } - + tx->tx_msg.ram_u.immediate.raim_hdr = *hdr; tx->tx_libmsg[0] = libmsg; kranal_launch_tx(tx, nid); @@ -753,9 +766,9 @@ kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, } ptl_err_t -kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, +kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, ptl_kiov_t *kiov, + unsigned int niov, ptl_kiov_t *kiov, size_t offset, size_t len) { return kranal_do_send(nal, private, cookie, @@ -774,17 +787,26 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, kra_tx_t *tx; void *buffer; int rc; - + LASSERT (mlen <= rlen); LASSERT (!in_interrupt()); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); + CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg); + + if (libmsg == NULL) { + /* GET or ACK or portals is discarding */ + LASSERT (mlen == 0); + lib_finalize(nal, NULL, libmsg, PTL_OK); + return PTL_OK; + } + switch(rxmsg->ram_type) { default: LBUG(); return PTL_FAIL; - + case RANAL_MSG_IMMEDIATE: if (mlen == 0) { buffer = NULL; @@ -809,21 +831,7 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL); return PTL_OK; - case RANAL_MSG_GET_REQ: - /* If the GET matched, we've already handled it in - * kranal_do_send which is called to send the REPLY. We're - * only called here to complete the GET receive (if we needed - * it which we don't, but I digress...) */ - LASSERT (libmsg == NULL); - lib_finalize(nal, NULL, libmsg, PTL_OK); - return PTL_OK; - case RANAL_MSG_PUT_REQ: - if (libmsg == NULL) { /* PUT didn't match... */ - lib_finalize(nal, NULL, libmsg, PTL_OK); - return PTL_OK; - } - tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK); if (tx == NULL) return PTL_NO_SPACE; @@ -836,19 +844,19 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, tx->tx_conn = conn; kranal_map_buffer(tx); - - tx->tx_msg.ram_u.putack.rapam_src_cookie = + + tx->tx_msg.ram_u.putack.rapam_src_cookie = conn->rac_rxmsg->ram_u.putreq.raprm_cookie; tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie; tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key; - tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = + tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen; tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */ kranal_post_fma(conn, tx); - + /* flag matched by consuming rx message */ kranal_consume_rxmsg(conn, NULL, 0); return PTL_OK; @@ -857,7 +865,7 @@ kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg, ptl_err_t kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, + unsigned int niov, struct iovec *iov, size_t offset, size_t mlen, size_t rlen) { return kranal_do_recv(nal, private, msg, niov, iov, NULL, @@ -866,7 +874,7 @@ kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, + unsigned int niov, ptl_kiov_t *kiov, size_t offset, size_t mlen, size_t rlen) { return kranal_do_recv(nal, private, msg, niov, NULL, kiov, @@ -906,6 +914,8 @@ kranal_check_conn_timeouts (kra_conn_t *conn) if (!conn->rac_close_sent && time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) { /* not sent in a while; schedule conn so scheduler sends a keepalive */ + CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n", + conn, conn->rac_peer->rap_nid); kranal_schedule_conn(conn); } @@ -913,14 +923,16 @@ kranal_check_conn_timeouts (kra_conn_t *conn) if (!conn->rac_close_recvd && time_after_eq(now, conn->rac_last_rx + timeout)) { - CERROR("Nothing received from "LPX64" within %lu seconds\n", + CERROR("%s received from "LPX64" within %lu seconds\n", + (conn->rac_state == RANAL_CONN_ESTABLISHED) ? + "Nothing" : "CLOSE not", conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ); return -ETIMEDOUT; } if (conn->rac_state != RANAL_CONN_ESTABLISHED) return 0; - + /* Check the conn's queues are moving. These are "belt+braces" checks, * in case of hardware/software errors that make this conn seem * responsive even though it isn't progressing its message queues. */ @@ -929,7 +941,7 @@ kranal_check_conn_timeouts (kra_conn_t *conn) list_for_each (ttmp, &conn->rac_fmaq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n", @@ -937,10 +949,10 @@ kranal_check_conn_timeouts (kra_conn_t *conn) return -ETIMEDOUT; } } - + list_for_each (ttmp, &conn->rac_rdmaq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n", @@ -948,10 +960,10 @@ kranal_check_conn_timeouts (kra_conn_t *conn) return -ETIMEDOUT; } } - + list_for_each (ttmp, &conn->rac_replyq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + if (time_after_eq(now, tx->tx_qtime + timeout)) { spin_unlock_irqrestore(&conn->rac_lock, flags); CERROR("tx on replyq for "LPX64" blocked %lu seconds\n", @@ -959,7 +971,7 @@ kranal_check_conn_timeouts (kra_conn_t *conn) return -ETIMEDOUT; } } - + spin_unlock_irqrestore(&conn->rac_lock, flags); return 0; } @@ -1005,12 +1017,12 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) case RANAL_CONN_ESTABLISHED: kranal_close_conn_locked(conn, -ETIMEDOUT); break; - + case RANAL_CONN_CLOSING: kranal_terminate_conn_locked(conn); break; } - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); kranal_conn_decref(conn); @@ -1048,24 +1060,24 @@ kranal_connd (void *arg) ras = list_entry(kranal_data.kra_connd_acceptq.next, kra_acceptsock_t, ras_list); list_del(&ras->ras_list); - + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); - CDEBUG(D_WARNING,"About to handshake someone\n"); + CDEBUG(D_NET,"About to handshake someone\n"); kranal_conn_handshake(ras->ras_sock, NULL); kranal_free_acceptsock(ras); - CDEBUG(D_WARNING,"Finished handshaking someone\n"); + CDEBUG(D_NET,"Finished handshaking someone\n"); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); did_something = 1; } - + if (!list_empty(&kranal_data.kra_connd_peers)) { peer = list_entry(kranal_data.kra_connd_peers.next, kra_peer_t, rap_connd_list); - + list_del_init(&peer->rap_connd_list); spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); @@ -1081,11 +1093,11 @@ kranal_connd (void *arg) set_current_state(TASK_INTERRUPTIBLE); add_wait_queue(&kranal_data.kra_connd_waitq, &wait); - + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); schedule (); - + set_current_state(TASK_RUNNING); remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); @@ -1099,14 +1111,14 @@ kranal_connd (void *arg) } void -kranal_update_reaper_timeout(long timeout) +kranal_update_reaper_timeout(long timeout) { unsigned long flags; LASSERT (timeout > 0); - + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - + if (timeout < kranal_data.kra_new_min_timeout) kranal_data.kra_new_min_timeout = timeout; @@ -1126,7 +1138,7 @@ kranal_reaper (void *arg) unsigned long next_check_time = jiffies; long next_min_timeout = MAX_SCHEDULE_TIMEOUT; long current_min_timeout = 1; - + kportal_daemonize("kranal_reaper"); kportal_blockallsigs(); @@ -1135,95 +1147,91 @@ kranal_reaper (void *arg) spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); while (!kranal_data.kra_shutdown) { + /* I wake up every 'p' seconds to check for timeouts on some + * more peers. I try to check every connection 'n' times + * within the global minimum of all keepalive and timeout + * intervals, to ensure I attend to every connection within + * (n+1)/n times its timeout intervals. */ + const int p = 1; + const int n = 3; + unsigned long min_timeout; + int chunk; /* careful with the jiffy wrap... */ timeout = (long)(next_check_time - jiffies); - if (timeout <= 0) { - - /* I wake up every 'p' seconds to check for - * timeouts on some more peers. I try to check - * every connection 'n' times within the global - * minimum of all keepalive and timeout intervals, - * to ensure I attend to every connection within - * (n+1)/n times its timeout intervals. */ - - const int p = 1; - const int n = 3; - unsigned long min_timeout; - int chunk; - - if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { - /* new min timeout set: restart min timeout scan */ - next_min_timeout = MAX_SCHEDULE_TIMEOUT; - base_index = conn_index - 1; - if (base_index < 0) - base_index = conn_entries - 1; - - if (kranal_data.kra_new_min_timeout < current_min_timeout) { - current_min_timeout = kranal_data.kra_new_min_timeout; - CWARN("Set new min timeout %ld\n", - current_min_timeout); - } - - kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; - } - min_timeout = current_min_timeout; - - spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, - flags); - - LASSERT (min_timeout > 0); - - /* Compute how many table entries to check now so I - * get round the whole table fast enough (NB I do - * this at fixed intervals of 'p' seconds) */ - chunk = conn_entries; - if (min_timeout > n * p) - chunk = (chunk * n * p) / min_timeout; - if (chunk == 0) - chunk = 1; - - for (i = 0; i < chunk; i++) { - kranal_reaper_check(conn_index, - &next_min_timeout); - conn_index = (conn_index + 1) % conn_entries; - } + if (timeout > 0) { + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); - next_check_time += p * HZ; + schedule_timeout(timeout); spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - if (((conn_index - chunk <= base_index && - base_index < conn_index) || - (conn_index - conn_entries - chunk <= base_index && - base_index < conn_index - conn_entries))) { - - /* Scanned all conns: set current_min_timeout... */ - if (current_min_timeout != next_min_timeout) { - current_min_timeout = next_min_timeout; - CWARN("Set new min timeout %ld\n", - current_min_timeout); - } - - /* ...and restart min timeout scan */ - next_min_timeout = MAX_SCHEDULE_TIMEOUT; - base_index = conn_index - 1; - if (base_index < 0) - base_index = conn_entries - 1; - } + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + continue; } - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { + /* new min timeout set: restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + + if (kranal_data.kra_new_min_timeout < current_min_timeout) { + current_min_timeout = kranal_data.kra_new_min_timeout; + CDEBUG(D_NET, "Set new min timeout %ld\n", + current_min_timeout); + } + + kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; + } + min_timeout = current_min_timeout; spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); - schedule_timeout(timeout); + LASSERT (min_timeout > 0); + + /* Compute how many table entries to check now so I get round + * the whole table fast enough given that I do this at fixed + * intervals of 'p' seconds) */ + chunk = conn_entries; + if (min_timeout > n * p) + chunk = (chunk * n * p) / min_timeout; + if (chunk == 0) + chunk = 1; + + for (i = 0; i < chunk; i++) { + kranal_reaper_check(conn_index, + &next_min_timeout); + conn_index = (conn_index + 1) % conn_entries; + } + + next_check_time += p * HZ; spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - set_current_state(TASK_RUNNING); - remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + if (((conn_index - chunk <= base_index && + base_index < conn_index) || + (conn_index - conn_entries - chunk <= base_index && + base_index < conn_index - conn_entries))) { + + /* Scanned all conns: set current_min_timeout... */ + if (current_min_timeout != next_min_timeout) { + current_min_timeout = next_min_timeout; + CDEBUG(D_NET, "Set new min timeout %ld\n", + current_min_timeout); + } + + /* ...and restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + } } kranal_thread_fini(); @@ -1243,8 +1251,10 @@ kranal_check_rdma_cq (kra_device_t *dev) for (;;) { rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type); - if (rrc == RAP_NOT_DONE) + if (rrc == RAP_NOT_DONE) { + CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id); return; + } LASSERT (rrc == RAP_SUCCESS); LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); @@ -1254,7 +1264,7 @@ kranal_check_rdma_cq (kra_device_t *dev) conn = kranal_cqid2conn_locked(cqid); if (conn == NULL) { /* Conn was destroyed? */ - CWARN("RDMA CQID lookup %d failed\n", cqid); + CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid); read_unlock(&kranal_data.kra_global_lock); continue; } @@ -1262,6 +1272,9 @@ kranal_check_rdma_cq (kra_device_t *dev) rrc = RapkRdmaDone(conn->rac_rihandle, &desc); LASSERT (rrc == RAP_SUCCESS); + CDEBUG(D_NET, "Completed %p\n", + list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list)); + spin_lock_irqsave(&conn->rac_lock, flags); LASSERT (!list_empty(&conn->rac_rdmaq)); @@ -1274,7 +1287,7 @@ kranal_check_rdma_cq (kra_device_t *dev) list_add_tail(&tx->tx_list, &conn->rac_fmaq); tx->tx_qtime = jiffies; - + spin_unlock_irqrestore(&conn->rac_lock, flags); /* Get conn's fmaq processed, now I've just put something @@ -1298,20 +1311,26 @@ kranal_check_fma_cq (kra_device_t *dev) for (;;) { rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type); - if (rrc != RAP_NOT_DONE) + if (rrc == RAP_NOT_DONE) { + CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id); return; - + } + LASSERT (rrc == RAP_SUCCESS); if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) { read_lock(&kranal_data.kra_global_lock); - + conn = kranal_cqid2conn_locked(cqid); - if (conn == NULL) - CWARN("FMA CQID lookup %d failed\n", cqid); - else + if (conn == NULL) { + CDEBUG(D_NET, "FMA CQID lookup %d failed\n", + cqid); + } else { + CDEBUG(D_NET, "FMA completed: %p CQID %d\n", + conn, cqid); kranal_schedule_conn(conn); + } read_unlock(&kranal_data.kra_global_lock); continue; @@ -1321,15 +1340,15 @@ kranal_check_fma_cq (kra_device_t *dev) CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { - + read_lock(&kranal_data.kra_global_lock); - + conns = &kranal_data.kra_conns[i]; list_for_each (tmp, conns) { - conn = list_entry(tmp, kra_conn_t, + conn = list_entry(tmp, kra_conn_t, rac_hashlist); - + if (conn->rac_device == dev) kranal_schedule_conn(conn); } @@ -1346,7 +1365,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, { int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; RAP_RETURN rrc; - + + CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n", + conn, msg, msg->ram_type, sync ? "(sync)" : "", + immediate, immediatenob); + LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX); LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? immediatenob <= RANAL_FMA_MAX_DATA : @@ -1356,11 +1379,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, msg->ram_seq = conn->rac_tx_seq; if (sync) - rrc = RapkFmaSyncSend(conn->rac_device->rad_handle, + rrc = RapkFmaSyncSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); else - rrc = RapkFmaSend(conn->rac_device->rad_handle, + rrc = RapkFmaSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); @@ -1372,14 +1395,14 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, conn->rac_last_tx = jiffies; conn->rac_tx_seq++; return 0; - + case RAP_NOT_DONE: return -EAGAIN; } } void -kranal_process_fmaq (kra_conn_t *conn) +kranal_process_fmaq (kra_conn_t *conn) { unsigned long flags; int more_to_do; @@ -1390,28 +1413,33 @@ kranal_process_fmaq (kra_conn_t *conn) /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now. * However I will be rescheduled some by an FMA completion event * when I eventually get some. - * NB 2. Sampling rac_state here, races with setting it elsewhere - * kranal_close_conn_locked. But it doesn't matter if I try to - * send a "real" message just as I start closing because I'll get - * scheduled to send the close anyway. */ + * NB 2. Sampling rac_state here races with setting it elsewhere. + * But it doesn't matter if I try to send a "real" message just + * as I start closing because I'll get scheduled to send the + * close anyway. */ + + /* Not racing with incoming message processing! */ + LASSERT (current == conn->rac_device->rad_scheduler); if (conn->rac_state != RANAL_CONN_ESTABLISHED) { if (!list_empty(&conn->rac_rdmaq)) { /* RDMAs in progress */ LASSERT (!conn->rac_close_sent); - - if (time_after_eq(jiffies, - conn->rac_last_tx + - conn->rac_keepalive)) { + + if (time_after_eq(jiffies, + conn->rac_last_tx + + conn->rac_keepalive * HZ)) { + CDEBUG(D_NET, "sending NOOP (rdma in progress)\n"); kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } return; } - + if (conn->rac_close_sent) return; + CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid); kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); if (rc != 0) @@ -1420,7 +1448,7 @@ kranal_process_fmaq (kra_conn_t *conn) conn->rac_close_sent = 1; if (!conn->rac_close_recvd) return; - + write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_CLOSING) @@ -1436,14 +1464,15 @@ kranal_process_fmaq (kra_conn_t *conn) spin_unlock_irqrestore(&conn->rac_lock, flags); - if (time_after_eq(jiffies, - conn->rac_last_tx + conn->rac_keepalive)) { + if (time_after_eq(jiffies, + conn->rac_last_tx + conn->rac_keepalive * HZ)) { + CDEBUG(D_NET, "sending NOOP (idle)\n"); kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); } return; } - + tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); list_del(&tx->tx_list); more_to_do = !list_empty(&conn->rac_fmaq); @@ -1451,20 +1480,26 @@ kranal_process_fmaq (kra_conn_t *conn) spin_unlock_irqrestore(&conn->rac_lock, flags); expect_reply = 0; + CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n", + tx, tx->tx_msg.ram_type, tx->tx_cookie); switch (tx->tx_msg.ram_type) { default: LBUG(); - + case RANAL_MSG_IMMEDIATE: + rc = kranal_sendmsg(conn, &tx->tx_msg, + tx->tx_buffer, tx->tx_nob); + expect_reply = 0; + break; + case RANAL_MSG_PUT_NAK: case RANAL_MSG_PUT_DONE: case RANAL_MSG_GET_NAK: case RANAL_MSG_GET_DONE: - rc = kranal_sendmsg(conn, &tx->tx_msg, - tx->tx_buffer, tx->tx_nob); + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); expect_reply = 0; break; - + case RANAL_MSG_PUT_REQ: tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); @@ -1481,7 +1516,7 @@ kranal_process_fmaq (kra_conn_t *conn) kranal_map_buffer(tx); tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie; tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key; - tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = + tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); @@ -1492,6 +1527,7 @@ kranal_process_fmaq (kra_conn_t *conn) if (rc == -EAGAIN) { /* I need credits to send this. Replace tx at the head of the * fmaq and I'll get rescheduled when credits appear */ + CDEBUG(D_NET, "EAGAIN on %p\n", conn); spin_lock_irqsave(&conn->rac_lock, flags); list_add(&tx->tx_list, &conn->rac_fmaq); spin_unlock_irqrestore(&conn->rac_lock, flags); @@ -1499,18 +1535,22 @@ kranal_process_fmaq (kra_conn_t *conn) } LASSERT (rc == 0); - + if (!expect_reply) { kranal_tx_done(tx, 0); } else { + /* LASSERT(current) above ensures this doesn't race with reply + * processing */ spin_lock_irqsave(&conn->rac_lock, flags); list_add_tail(&tx->tx_list, &conn->rac_replyq); tx->tx_qtime = jiffies; spin_unlock_irqrestore(&conn->rac_lock, flags); } - if (more_to_do) + if (more_to_do) { + CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn); kranal_schedule_conn(conn); + } } static inline void @@ -1529,23 +1569,36 @@ kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) { struct list_head *ttmp; kra_tx_t *tx; - + unsigned long flags; + + spin_lock_irqsave(&conn->rac_lock, flags); + list_for_each(ttmp, &conn->rac_replyq) { tx = list_entry(ttmp, kra_tx_t, tx_list); - + + CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n", + tx, tx->tx_msg.ram_type, tx->tx_cookie); + if (tx->tx_cookie != cookie) continue; - + if (tx->tx_msg.ram_type != type) { + spin_unlock_irqrestore(&conn->rac_lock, flags); CWARN("Unexpected type %x (%x expected) " "matched reply from "LPX64"\n", tx->tx_msg.ram_type, type, conn->rac_peer->rap_nid); return NULL; } + + list_del(&tx->tx_list); + spin_unlock_irqrestore(&conn->rac_lock, flags); + return tx; } - - CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid); + + spin_unlock_irqrestore(&conn->rac_lock, flags); + CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n", + type, cookie, conn->rac_peer->rap_nid); return NULL; } @@ -1562,12 +1615,19 @@ kranal_check_fma_rx (kra_conn_t *conn) if (rrc == RAP_NOT_DONE) return; - + + CDEBUG(D_NET, "RX on %p\n", conn); + LASSERT (rrc == RAP_SUCCESS); conn->rac_last_rx = jiffies; seq = conn->rac_rx_seq++; msg = (kra_msg_t *)prefix; + /* stash message for portals callbacks they'll NULL + * rac_rxmsg if they consume it */ + LASSERT (conn->rac_rxmsg == NULL); + conn->rac_rxmsg = msg; + if (msg->ram_magic != RANAL_MSG_MAGIC) { if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) { CERROR("Unexpected magic %08x from "LPX64"\n", @@ -1591,7 +1651,7 @@ kranal_check_fma_rx (kra_conn_t *conn) case RANAL_MSG_GET_REQ: kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc); break; - + default: break; } @@ -1608,7 +1668,7 @@ kranal_check_fma_rx (kra_conn_t *conn) msg->ram_srcnid, peer->rap_nid); goto out; } - + if (msg->ram_connstamp != conn->rac_peer_connstamp) { CERROR("Unexpected connstamp "LPX64"("LPX64 " expected) from "LPX64"\n", @@ -1616,7 +1676,7 @@ kranal_check_fma_rx (kra_conn_t *conn) peer->rap_nid); goto out; } - + if (msg->ram_seq != seq) { CERROR("Unexpected sequence number %d(%d expected) from " LPX64"\n", msg->ram_seq, seq, peer->rap_nid); @@ -1630,18 +1690,20 @@ kranal_check_fma_rx (kra_conn_t *conn) } if (conn->rac_close_recvd) { - CERROR("Unexpected message %d after CLOSE from "LPX64"\n", + CERROR("Unexpected message %d after CLOSE from "LPX64"\n", msg->ram_type, conn->rac_peer->rap_nid); goto out; } if (msg->ram_type == RANAL_MSG_CLOSE) { + CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid); conn->rac_close_recvd = 1; write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, 0); - else if (conn->rac_close_sent) + else if (conn->rac_state == RANAL_CONN_CLOSING && + conn->rac_close_sent) kranal_terminate_conn_locked(conn); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); @@ -1650,19 +1712,20 @@ kranal_check_fma_rx (kra_conn_t *conn) if (conn->rac_state != RANAL_CONN_ESTABLISHED) goto out; - - conn->rac_rxmsg = msg; /* stash message for portals callbacks */ - /* they'll NULL rac_rxmsg if they consume it */ + switch (msg->ram_type) { case RANAL_MSG_NOOP: /* Nothing to do; just a keepalive */ + CDEBUG(D_NET, "RX NOOP on %p\n", conn); break; - + case RANAL_MSG_IMMEDIATE: + CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn); lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn); break; - + case RANAL_MSG_PUT_REQ: + CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn); lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn); if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ @@ -1671,36 +1734,39 @@ kranal_check_fma_rx (kra_conn_t *conn) tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK); if (tx == NULL) break; - - tx->tx_msg.ram_u.completion.racm_cookie = + + tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.putreq.raprm_cookie; kranal_post_fma(conn, tx); break; case RANAL_MSG_PUT_NAK: + CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; - + case RANAL_MSG_PUT_ACK: + CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.putack.rapam_src_cookie); if (tx == NULL) break; kranal_rdma(tx, RANAL_MSG_PUT_DONE, - &msg->ram_u.putack.rapam_desc, + &msg->ram_u.putack.rapam_desc, msg->ram_u.putack.rapam_desc.rard_nob, msg->ram_u.putack.rapam_dst_cookie); break; case RANAL_MSG_PUT_DONE: + CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK, msg->ram_u.completion.racm_cookie); if (tx == NULL) @@ -1712,8 +1778,9 @@ kranal_check_fma_rx (kra_conn_t *conn) break; case RANAL_MSG_GET_REQ: + CDEBUG(D_NET, "RX GET_REQ on %p\n", conn); lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn); - + if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ break; @@ -1724,24 +1791,26 @@ kranal_check_fma_rx (kra_conn_t *conn) tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie; kranal_post_fma(conn, tx); break; - + case RANAL_MSG_GET_NAK: + CDEBUG(D_NET, "RX GET_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; - + case RANAL_MSG_GET_DONE: + CDEBUG(D_NET, "RX GET_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, 0); @@ -1757,24 +1826,26 @@ kranal_check_fma_rx (kra_conn_t *conn) } void -kranal_complete_closed_conn (kra_conn_t *conn) +kranal_complete_closed_conn (kra_conn_t *conn) { kra_tx_t *tx; LASSERT (conn->rac_state == RANAL_CONN_CLOSED); + LASSERT (list_empty(&conn->rac_list)); + LASSERT (list_empty(&conn->rac_hashlist)); while (!list_empty(&conn->rac_fmaq)) { tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); - + list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } - + LASSERT (list_empty(&conn->rac_rdmaq)); while (!list_empty(&conn->rac_replyq)) { tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); - + list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } @@ -1801,7 +1872,7 @@ kranal_scheduler (void *arg) while (!kranal_data.kra_shutdown) { /* Safe: kra_shutdown only set when quiescent */ - + if (busy_loops++ >= RANAL_RESCHED) { spin_unlock_irqrestore(&dev->rad_lock, flags); @@ -1821,7 +1892,7 @@ kranal_scheduler (void *arg) spin_lock_irqsave(&dev->rad_lock, flags); } - + if (!list_empty(&dev->rad_connq)) { /* Connection needs attention */ conn = list_entry(dev->rad_connq.next, @@ -1838,11 +1909,15 @@ kranal_scheduler (void *arg) kranal_complete_closed_conn(conn); kranal_conn_decref(conn); - + spin_lock_irqsave(&dev->rad_lock, flags); continue; } + /* recheck device callback fired before sleeping */ + if (dev->rad_ready) + continue; + add_wait_queue(&dev->rad_waitq, &wait); set_current_state(TASK_INTERRUPTIBLE);