-
Eric Barton authoredEric Barton authored
ralnd_cb.c 61.96 KiB
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (C) 2004 Cluster File Systems, Inc.
* Author: Eric Barton <eric@bartonsoftware.com>
*
* This file is part of Lustre, http://www.lustre.org.
*
* Lustre is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
* License as published by the Free Software Foundation.
*
* Lustre is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Lustre; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
*/
#include "ranal.h"
int
kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
{
/* I would guess that if kranal_get_peer (nid) == NULL,
and we're not routing, then 'nid' is very distant :) */
if ( nal->libnal_ni.ni_pid.nid == nid ) {
*dist = 0;
} else {
*dist = 1;
}
return 0;
}
void
kranal_device_callback(RAP_INT32 devid)
{
kra_device_t *dev;
int i;
unsigned long flags;
for (i = 0; i < kranal_data.kra_ndevs; i++) {
dev = &kranal_data.kra_devices[i];
if (dev->rad_id != devid)
continue;
spin_lock_irqsave(&dev->rad_lock, flags);
if (!dev->rad_ready) {
dev->rad_ready = 1;
wake_up(&dev->rad_waitq);
}
spin_unlock_irqrestore(&dev->rad_lock, flags);
return;
}
CWARN("callback for unknown device %d\n", devid);
}
void
kranal_schedule_conn(kra_conn_t *conn)
{
kra_device_t *dev = conn->rac_device;
unsigned long flags;
spin_lock_irqsave(&dev->rad_lock, flags);
if (!conn->rac_scheduled) {
kranal_conn_addref(conn); /* +1 ref for scheduler */
conn->rac_scheduled = 1;
list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
wake_up(&dev->rad_waitq);
}
spin_unlock_irqrestore(&dev->rad_lock, flags);
}
kra_tx_t *
kranal_get_idle_tx (int may_block)
{
unsigned long flags;
kra_tx_t *tx = NULL;
for (;;) {
spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
/* "normal" descriptor is free */
if (!list_empty(&kranal_data.kra_idle_txs)) {
tx = list_entry(kranal_data.kra_idle_txs.next,
kra_tx_t, tx_list);
break;
}
if (!may_block) {
/* may dip into reserve pool */
if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
CERROR("reserved tx desc pool exhausted\n");
break;
}
tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
kra_tx_t, tx_list);
break;
}
/* block for idle tx */
spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
wait_event(kranal_data.kra_idle_tx_waitq,
!list_empty(&kranal_data.kra_idle_txs));
}
if (tx != NULL) {
list_del(&tx->tx_list);
/* Allocate a new completion cookie. It might not be
* needed, but we've got a lock right now... */
tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
LASSERT (tx->tx_conn == NULL);
LASSERT (tx->tx_libmsg[0] == NULL);
LASSERT (tx->tx_libmsg[1] == NULL);
}
spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
return tx;
}
void
kranal_init_msg(kra_msg_t *msg, int type)
{
msg->ram_magic = RANAL_MSG_MAGIC;
msg->ram_version = RANAL_MSG_VERSION;
msg->ram_type = type;
msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
/* ram_connstamp gets set when FMA is sent */
}
kra_tx_t *
kranal_new_tx_msg (int may_block, int type)
{
kra_tx_t *tx = kranal_get_idle_tx(may_block);
if (tx == NULL)
return NULL;
kranal_init_msg(&tx->tx_msg, type);
return tx;
}
int
kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
int offset, int nob)
{
/* For now this is almost identical to kranal_setup_virt_buffer, but we
* could "flatten" the payload into a single contiguous buffer ready
* for sending direct over an FMA if we ever needed to. */
LASSERT (nob > 0);
LASSERT (niov > 0);
LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
while (offset >= iov->iov_len) {
offset -= iov->iov_len;
niov--;
iov++;
LASSERT (niov > 0);
}
if (nob > iov->iov_len - offset) {
CERROR("Can't handle multiple vaddr fragments\n");
return -EMSGSIZE;
}
tx->tx_buftype = RANAL_BUF_IMMEDIATE;
tx->tx_nob = nob;
tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
return 0;
}
int
kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
int offset, int nob)
{
LASSERT (nob > 0);
LASSERT (niov > 0);
LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
while (offset >= iov->iov_len) {
offset -= iov->iov_len;
niov--;
iov++;
LASSERT (niov > 0);
}
if (nob > iov->iov_len - offset) {
CERROR("Can't handle multiple vaddr fragments\n");
return -EMSGSIZE;
}
tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
tx->tx_nob = nob;
tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
return 0;
}
int
kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
int offset, int nob)
{
RAP_PHYS_REGION *phys = tx->tx_phys;
int resid;
CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
LASSERT (nob > 0);
LASSERT (nkiov > 0);
LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
while (offset >= kiov->kiov_len) {
offset -= kiov->kiov_len;
nkiov--;
kiov++;
LASSERT (nkiov > 0);
}
tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
tx->tx_nob = nob;
tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
phys->Address = kranal_page2phys(kiov->kiov_page);
phys->Length = PAGE_SIZE;
phys++;
resid = nob - (kiov->kiov_len - offset);
while (resid > 0) {
kiov++;
nkiov--;
LASSERT (nkiov > 0);
if (kiov->kiov_offset != 0 ||
((resid > PAGE_SIZE) &&
kiov->kiov_len < PAGE_SIZE)) {
/* Can't have gaps */
CERROR("Can't make payload contiguous in I/O VM:"
"page %d, offset %d, len %d \n",
phys - tx->tx_phys,
kiov->kiov_offset, kiov->kiov_len);
return -EINVAL;
}
if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
return -EMSGSIZE;
}
phys->Address = kranal_page2phys(kiov->kiov_page);
phys->Length = PAGE_SIZE;
phys++;
resid -= PAGE_SIZE;
}
tx->tx_phys_npages = phys - tx->tx_phys;
return 0;
}
static inline int
kranal_setup_buffer (kra_tx_t *tx, int niov,
struct iovec *iov, ptl_kiov_t *kiov,
int offset, int nob)
{
LASSERT ((iov == NULL) != (kiov == NULL));
if (kiov != NULL)
return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
}
void
kranal_map_buffer (kra_tx_t *tx)
{
kra_conn_t *conn = tx->tx_conn;
kra_device_t *dev = conn->rac_device;
RAP_RETURN rrc;
LASSERT (current == dev->rad_scheduler);
switch (tx->tx_buftype) {
default:
LBUG();
case RANAL_BUF_NONE:
case RANAL_BUF_IMMEDIATE:
case RANAL_BUF_PHYS_MAPPED:
case RANAL_BUF_VIRT_MAPPED:
break;
case RANAL_BUF_PHYS_UNMAPPED:
rrc = RapkRegisterPhys(dev->rad_handle,
tx->tx_phys, tx->tx_phys_npages,
dev->rad_ptag, &tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
break;
case RANAL_BUF_VIRT_UNMAPPED:
rrc = RapkRegisterMemory(dev->rad_handle,
tx->tx_buffer, tx->tx_nob,
dev->rad_ptag, &tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
break;
}
}
void
kranal_unmap_buffer (kra_tx_t *tx)
{
kra_device_t *dev;
RAP_RETURN rrc;
switch (tx->tx_buftype) {
default:
LBUG();
case RANAL_BUF_NONE:
case RANAL_BUF_IMMEDIATE:
case RANAL_BUF_PHYS_UNMAPPED:
case RANAL_BUF_VIRT_UNMAPPED:
break;
case RANAL_BUF_PHYS_MAPPED:
LASSERT (tx->tx_conn != NULL);
dev = tx->tx_conn->rac_device;
LASSERT (current == dev->rad_scheduler);
rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
dev->rad_ptag, &tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
break;
case RANAL_BUF_VIRT_MAPPED:
LASSERT (tx->tx_conn != NULL);
dev = tx->tx_conn->rac_device;
LASSERT (current == dev->rad_scheduler);
rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
dev->rad_ptag, &tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
break;
}
}
void
kranal_tx_done (kra_tx_t *tx, int completion)
{
ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
unsigned long flags;
int i;
LASSERT (!in_interrupt());
kranal_unmap_buffer(tx);
for (i = 0; i < 2; i++) {
/* tx may have up to 2 libmsgs to finalise */
if (tx->tx_libmsg[i] == NULL)
continue;
lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
tx->tx_libmsg[i] = NULL;
}
tx->tx_buftype = RANAL_BUF_NONE;
tx->tx_msg.ram_type = RANAL_MSG_NONE;
tx->tx_conn = NULL;
spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
if (tx->tx_isnblk) {
list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
} else {
list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
wake_up(&kranal_data.kra_idle_tx_waitq);
}
spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
}
kra_conn_t *
kranal_find_conn_locked (kra_peer_t *peer)
{
struct list_head *tmp;
/* just return the first connection */
list_for_each (tmp, &peer->rap_conns) {
return list_entry(tmp, kra_conn_t, rac_list);
}
return NULL;
}
void
kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
{
unsigned long flags;
tx->tx_conn = conn;
spin_lock_irqsave(&conn->rac_lock, flags);
list_add_tail(&tx->tx_list, &conn->rac_fmaq);
tx->tx_qtime = jiffies;
spin_unlock_irqrestore(&conn->rac_lock, flags);
kranal_schedule_conn(conn);
}
void
kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
{
unsigned long flags;
kra_peer_t *peer;
kra_conn_t *conn;
unsigned long now;
rwlock_t *g_lock = &kranal_data.kra_global_lock;
/* If I get here, I've committed to send, so I complete the tx with
* failure on any problems */
LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
read_lock(g_lock);
peer = kranal_find_peer_locked(nid);
if (peer == NULL) {
read_unlock(g_lock);
kranal_tx_done(tx, -EHOSTUNREACH);
return;
}
conn = kranal_find_conn_locked(peer);
if (conn != NULL) {
kranal_post_fma(conn, tx);
read_unlock(g_lock);
return;
}
/* Making one or more connections; I'll need a write lock... */
read_unlock(g_lock);
write_lock_irqsave(g_lock, flags);
peer = kranal_find_peer_locked(nid);
if (peer == NULL) {
write_unlock_irqrestore(g_lock, flags);
kranal_tx_done(tx, -EHOSTUNREACH);
return;
}
conn = kranal_find_conn_locked(peer);
if (conn != NULL) {
/* Connection exists; queue message on it */
kranal_post_fma(conn, tx);
write_unlock_irqrestore(g_lock, flags);
return;
}
LASSERT (peer->rap_persistence > 0);
if (!peer->rap_connecting) {
LASSERT (list_empty(&peer->rap_tx_queue));
now = CURRENT_TIME;
if (now < peer->rap_reconnect_time) {
write_unlock_irqrestore(g_lock, flags);
kranal_tx_done(tx, -EHOSTUNREACH);
return;
}
peer->rap_connecting = 1;
kranal_peer_addref(peer); /* extra ref for connd */
spin_lock(&kranal_data.kra_connd_lock);
list_add_tail(&peer->rap_connd_list,
&kranal_data.kra_connd_peers);
wake_up(&kranal_data.kra_connd_waitq);
spin_unlock(&kranal_data.kra_connd_lock);
}
/* A connection is being established; queue the message... */
list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
write_unlock_irqrestore(g_lock, flags);
}
void
kranal_rdma(kra_tx_t *tx, int type,
kra_rdma_desc_t *sink, int nob, __u64 cookie)
{
kra_conn_t *conn = tx->tx_conn;
RAP_RETURN rrc;
unsigned long flags;
LASSERT (kranal_tx_mapped(tx));
LASSERT (nob <= sink->rard_nob);
LASSERT (nob <= tx->tx_nob);
/* No actual race with scheduler sending CLOSE (I'm she!) */
LASSERT (current == conn->rac_device->rad_scheduler);
memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
tx->tx_rdma_desc.DstPtr = sink->rard_addr;
tx->tx_rdma_desc.DstKey = sink->rard_key;
tx->tx_rdma_desc.Length = nob;
tx->tx_rdma_desc.AppPtr = tx;
/* prep final completion message */
kranal_init_msg(&tx->tx_msg, type);
tx->tx_msg.ram_u.completion.racm_cookie = cookie;
if (nob == 0) { /* Immediate completion */
kranal_post_fma(conn, tx);
return;
}
LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
LASSERT (rrc == RAP_SUCCESS);
spin_lock_irqsave(&conn->rac_lock, flags);
list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
tx->tx_qtime = jiffies;
spin_unlock_irqrestore(&conn->rac_lock, flags);
}
int
kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
{
__u32 nob_received = nob;
RAP_RETURN rrc;
LASSERT (conn->rac_rxmsg != NULL);
rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
&nob_received, sizeof(kra_msg_t));
LASSERT (rrc == RAP_SUCCESS);
conn->rac_rxmsg = NULL;
if (nob_received != nob) {
CWARN("Expected %d immediate bytes but got %d\n",
nob, nob_received);
return -EPROTO;
}
return 0;
}
ptl_err_t
kranal_do_send (lib_nal_t *nal,
void *private,
lib_msg_t *libmsg,
ptl_hdr_t *hdr,
int type,
ptl_nid_t nid,
ptl_pid_t pid,
unsigned int niov,
struct iovec *iov,
ptl_kiov_t *kiov,
size_t offset,
size_t nob)
{
kra_conn_t *conn;
kra_tx_t *tx;
int rc;
/* NB 'private' is different depending on what we're sending.... */
CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
" pid %d\n", nob, niov, nid , pid);
LASSERT (nob == 0 || niov > 0);
LASSERT (niov <= PTL_MD_MAX_IOV);
LASSERT (!in_interrupt());
/* payload is either all vaddrs or all pages */
LASSERT (!(kiov != NULL && iov != NULL));
switch(type) {
default:
LBUG();
case PTL_MSG_REPLY: {
/* reply's 'private' is the conn that received the GET_REQ */
conn = private;
LASSERT (conn->rac_rxmsg != NULL);
if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
if (nob > RANAL_FMA_MAX_DATA) {
CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
nob, nid);
return PTL_FAIL;
}
break; /* RDMA not expected */
}
/* Incoming message consistent with immediate reply? */
if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
nid, conn->rac_rxmsg->ram_type);
return PTL_FAIL;
}
tx = kranal_get_idle_tx(0);
if (tx == NULL)
return PTL_FAIL;
rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
if (rc != 0) {
kranal_tx_done(tx, rc);
return PTL_FAIL;
}
tx->tx_conn = conn;
tx->tx_libmsg[0] = libmsg;
kranal_map_buffer(tx);
kranal_rdma(tx, RANAL_MSG_GET_DONE,
&conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
conn->rac_rxmsg->ram_u.get.ragm_cookie);
return PTL_OK;
}
case PTL_MSG_GET:
if (kiov == NULL && /* not paged */
nob <= RANAL_FMA_MAX_DATA && /* small enough */
nob <= kranal_tunables.kra_max_immediate)
break; /* send IMMEDIATE */
tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
if (tx == NULL)
return PTL_NO_SPACE;
rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
if (rc != 0) {
kranal_tx_done(tx, rc);
return PTL_FAIL;
}
tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
if (tx->tx_libmsg[1] == NULL) {
CERROR("Can't create reply for GET to "LPX64"\n", nid);
kranal_tx_done(tx, rc);
return PTL_FAIL;
}
tx->tx_libmsg[0] = libmsg;
tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
/* rest of tx_msg is setup just before it is sent */
kranal_launch_tx(tx, nid);
return PTL_OK;
case PTL_MSG_ACK:
LASSERT (nob == 0);
break;
case PTL_MSG_PUT:
if (kiov == NULL && /* not paged */
nob <= RANAL_MAX_IMMEDIATE && /* small enough */
nob <= kranal_tunables.kra_max_immediate)
break; /* send IMMEDIATE */
tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
if (tx == NULL)
return PTL_NO_SPACE;
rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
if (rc != 0) {
kranal_tx_done(tx, rc);
return PTL_FAIL;
}
tx->tx_libmsg[0] = libmsg;
tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
/* rest of tx_msg is setup just before it is sent */
kranal_launch_tx(tx, nid);
return PTL_OK;
}
LASSERT (kiov == NULL);
LASSERT (nob <= RANAL_MAX_IMMEDIATE);
tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
type == PTL_MSG_REPLY ||
in_interrupt()),
RANAL_MSG_IMMEDIATE);
if (tx == NULL)
return PTL_NO_SPACE;
rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
if (rc != 0) {
kranal_tx_done(tx, rc);
return PTL_FAIL;
}
tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
tx->tx_libmsg[0] = libmsg;
kranal_launch_tx(tx, nid);
return PTL_OK;
}
ptl_err_t
kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
unsigned int niov, struct iovec *iov,
size_t offset, size_t len)
{
return kranal_do_send(nal, private, cookie,
hdr, type, nid, pid,
niov, iov, NULL,
offset, len);
}
ptl_err_t
kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
unsigned int niov, ptl_kiov_t *kiov,
size_t offset, size_t len)
{
return kranal_do_send(nal, private, cookie,
hdr, type, nid, pid,
niov, NULL, kiov,
offset, len);
}
ptl_err_t
kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
size_t offset, size_t mlen, size_t rlen)
{
kra_conn_t *conn = private;
kra_msg_t *rxmsg = conn->rac_rxmsg;
kra_tx_t *tx;
void *buffer;
int rc;
LASSERT (mlen <= rlen);
LASSERT (!in_interrupt());
/* Either all pages or all vaddrs */
LASSERT (!(kiov != NULL && iov != NULL));
switch(rxmsg->ram_type) {
default:
LBUG();
return PTL_FAIL;
case RANAL_MSG_IMMEDIATE:
if (mlen == 0) {
buffer = NULL;
} else if (kiov != NULL) {
CERROR("Can't recv immediate into paged buffer\n");
return PTL_FAIL;
} else {
LASSERT (niov > 0);
while (offset >= iov->iov_len) {
offset -= iov->iov_len;
iov++;
niov--;
LASSERT (niov > 0);
}
if (mlen > iov->iov_len - offset) {
CERROR("Can't handle immediate frags\n");
return PTL_FAIL;
}
buffer = ((char *)iov->iov_base) + offset;
}
rc = kranal_consume_rxmsg(conn, buffer, mlen);
lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
return PTL_OK;
case RANAL_MSG_GET_REQ:
/* If the GET matched, we've already handled it in
* kranal_do_send which is called to send the REPLY. We're
* only called here to complete the GET receive (if we needed
* it which we don't, but I digress...) */
LASSERT (libmsg == NULL);
lib_finalize(nal, NULL, libmsg, PTL_OK);
return PTL_OK;
case RANAL_MSG_PUT_REQ:
if (libmsg == NULL) { /* PUT didn't match... */
lib_finalize(nal, NULL, libmsg, PTL_OK);
return PTL_OK;
}
tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
if (tx == NULL)
return PTL_NO_SPACE;
rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
if (rc != 0) {
kranal_tx_done(tx, rc);
return PTL_FAIL;
}
tx->tx_conn = conn;
kranal_map_buffer(tx);
tx->tx_msg.ram_u.putack.rapam_src_cookie =
conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
(__u64)((unsigned long)tx->tx_buffer);
tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
kranal_post_fma(conn, tx);
/* flag matched by consuming rx message */
kranal_consume_rxmsg(conn, NULL, 0);
return PTL_OK;
}
}
ptl_err_t
kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
unsigned int niov, struct iovec *iov,
size_t offset, size_t mlen, size_t rlen)
{
return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
offset, mlen, rlen);
}
ptl_err_t
kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
unsigned int niov, ptl_kiov_t *kiov,
size_t offset, size_t mlen, size_t rlen)
{
return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
offset, mlen, rlen);
}
int
kranal_thread_start (int(*fn)(void *arg), void *arg)
{
long pid = kernel_thread(fn, arg, 0);
if (pid < 0)
return(int)pid;
atomic_inc(&kranal_data.kra_nthreads);
return 0;
}
void
kranal_thread_fini (void)
{
atomic_dec(&kranal_data.kra_nthreads);
}
int
kranal_check_conn_timeouts (kra_conn_t *conn)
{
kra_tx_t *tx;
struct list_head *ttmp;
unsigned long flags;
long timeout;
unsigned long now = jiffies;
LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
conn->rac_state == RANAL_CONN_CLOSING);
if (!conn->rac_close_sent &&
time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
/* not sent in a while; schedule conn so scheduler sends a keepalive */
kranal_schedule_conn(conn);
}
timeout = conn->rac_timeout * HZ;
if (!conn->rac_close_recvd &&
time_after_eq(now, conn->rac_last_rx + timeout)) {
CERROR("Nothing received from "LPX64" within %lu seconds\n",
conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
return -ETIMEDOUT;
}
if (conn->rac_state != RANAL_CONN_ESTABLISHED)
return 0;
/* Check the conn's queues are moving. These are "belt+braces" checks,
* in case of hardware/software errors that make this conn seem
* responsive even though it isn't progressing its message queues. */
spin_lock_irqsave(&conn->rac_lock, flags);
list_for_each (ttmp, &conn->rac_fmaq) {
tx = list_entry(ttmp, kra_tx_t, tx_list);
if (time_after_eq(now, tx->tx_qtime + timeout)) {
spin_unlock_irqrestore(&conn->rac_lock, flags);
CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
return -ETIMEDOUT;
}
}
list_for_each (ttmp, &conn->rac_rdmaq) {
tx = list_entry(ttmp, kra_tx_t, tx_list);
if (time_after_eq(now, tx->tx_qtime + timeout)) {
spin_unlock_irqrestore(&conn->rac_lock, flags);
CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
return -ETIMEDOUT;
}
}
list_for_each (ttmp, &conn->rac_replyq) {
tx = list_entry(ttmp, kra_tx_t, tx_list);
if (time_after_eq(now, tx->tx_qtime + timeout)) {
spin_unlock_irqrestore(&conn->rac_lock, flags);
CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
return -ETIMEDOUT;
}
}
spin_unlock_irqrestore(&conn->rac_lock, flags);
return 0;
}
void
kranal_reaper_check (int idx, unsigned long *min_timeoutp)
{
struct list_head *conns = &kranal_data.kra_conns[idx];
struct list_head *ctmp;
kra_conn_t *conn;
unsigned long flags;
int rc;
again:
/* NB. We expect to check all the conns and not find any problems, so
* we just use a shared lock while we take a look... */
read_lock(&kranal_data.kra_global_lock);
list_for_each (ctmp, conns) {
conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
if (conn->rac_timeout < *min_timeoutp )
*min_timeoutp = conn->rac_timeout;
if (conn->rac_keepalive < *min_timeoutp )
*min_timeoutp = conn->rac_keepalive;
rc = kranal_check_conn_timeouts(conn);
if (rc == 0)
continue;
kranal_conn_addref(conn);
read_unlock(&kranal_data.kra_global_lock);
CERROR("Conn to "LPX64", cqid %d timed out\n",
conn->rac_peer->rap_nid, conn->rac_cqid);
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
switch (conn->rac_state) {
default:
LBUG();
case RANAL_CONN_ESTABLISHED:
kranal_close_conn_locked(conn, -ETIMEDOUT);
break;
case RANAL_CONN_CLOSING:
kranal_terminate_conn_locked(conn);
break;
}
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
kranal_conn_decref(conn);
/* start again now I've dropped the lock */
goto again;
}
read_unlock(&kranal_data.kra_global_lock);
}
int
kranal_connd (void *arg)
{
char name[16];
wait_queue_t wait;
unsigned long flags;
kra_peer_t *peer;
snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
kportal_daemonize(name);
kportal_blockallsigs();
init_waitqueue_entry(&wait, current);
spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
while (!kranal_data.kra_shutdown) {
/* Safe: kra_shutdown only set when quiescent */
if (!list_empty(&kranal_data.kra_connd_peers)) {
peer = list_entry(kranal_data.kra_connd_peers.next,
kra_peer_t, rap_connd_list);
list_del_init(&peer->rap_connd_list);
spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
kranal_connect(peer);
kranal_peer_decref(peer);
spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
continue;
}
set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
schedule ();
set_current_state(TASK_RUNNING);
remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
}
spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
kranal_thread_fini();
return 0;
}
void
kranal_update_reaper_timeout(long timeout)
{
unsigned long flags;
LASSERT (timeout > 0);
spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
if (timeout < kranal_data.kra_new_min_timeout)
kranal_data.kra_new_min_timeout = timeout;
spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
}
int
kranal_reaper (void *arg)
{
wait_queue_t wait;
unsigned long flags;
long timeout;
int i;
int conn_entries = kranal_data.kra_conn_hash_size;
int conn_index = 0;
int base_index = conn_entries - 1;
unsigned long next_check_time = jiffies;
long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
long current_min_timeout = 1;
kportal_daemonize("kranal_reaper");
kportal_blockallsigs();
init_waitqueue_entry(&wait, current);
spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
while (!kranal_data.kra_shutdown) {
/* careful with the jiffy wrap... */
timeout = (long)(next_check_time - jiffies);
if (timeout <= 0) {
/* I wake up every 'p' seconds to check for
* timeouts on some more peers. I try to check
* every connection 'n' times within the global
* minimum of all keepalive and timeout intervals,
* to ensure I attend to every connection within
* (n+1)/n times its timeout intervals. */
const int p = 1;
const int n = 3;
unsigned long min_timeout;
int chunk;
if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
/* new min timeout set: restart min timeout scan */
next_min_timeout = MAX_SCHEDULE_TIMEOUT;
base_index = conn_index - 1;
if (base_index < 0)
base_index = conn_entries - 1;
if (kranal_data.kra_new_min_timeout < current_min_timeout) {
current_min_timeout = kranal_data.kra_new_min_timeout;
CWARN("Set new min timeout %ld\n",
current_min_timeout);
}
kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
}
min_timeout = current_min_timeout;
spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
flags);
LASSERT (min_timeout > 0);
/* Compute how many table entries to check now so I
* get round the whole table fast enough (NB I do
* this at fixed intervals of 'p' seconds) */
chunk = conn_entries;
if (min_timeout > n * p)
chunk = (chunk * n * p) / min_timeout;
if (chunk == 0)
chunk = 1;
for (i = 0; i < chunk; i++) {
kranal_reaper_check(conn_index,
&next_min_timeout);
conn_index = (conn_index + 1) % conn_entries;
}
next_check_time += p * HZ;
spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
if (((conn_index - chunk <= base_index &&
base_index < conn_index) ||
(conn_index - conn_entries - chunk <= base_index &&
base_index < conn_index - conn_entries))) {
/* Scanned all conns: set current_min_timeout... */
if (current_min_timeout != next_min_timeout) {
current_min_timeout = next_min_timeout;
CWARN("Set new min timeout %ld\n",
current_min_timeout);
}
/* ...and restart min timeout scan */
next_min_timeout = MAX_SCHEDULE_TIMEOUT;
base_index = conn_index - 1;
if (base_index < 0)
base_index = conn_entries - 1;
}
}
set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
schedule_timeout(timeout);
spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
set_current_state(TASK_RUNNING);
remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
}
kranal_thread_fini();
return 0;
}
void
kranal_check_rdma_cq (kra_device_t *dev)
{
kra_conn_t *conn;
kra_tx_t *tx;
RAP_RETURN rrc;
unsigned long flags;
RAP_RDMA_DESCRIPTOR *desc;
__u32 cqid;
__u32 event_type;
for (;;) {
rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
if (rrc == RAP_NOT_DONE)
return;
LASSERT (rrc == RAP_SUCCESS);
LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
read_lock(&kranal_data.kra_global_lock);
conn = kranal_cqid2conn_locked(cqid);
if (conn == NULL) {
/* Conn was destroyed? */
CWARN("RDMA CQID lookup %d failed\n", cqid);
read_unlock(&kranal_data.kra_global_lock);
continue;
}
rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
LASSERT (rrc == RAP_SUCCESS);
spin_lock_irqsave(&conn->rac_lock, flags);
LASSERT (!list_empty(&conn->rac_rdmaq));
tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
list_del(&tx->tx_list);
LASSERT(desc->AppPtr == (void *)tx);
LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
list_add_tail(&tx->tx_list, &conn->rac_fmaq);
tx->tx_qtime = jiffies;
spin_unlock_irqrestore(&conn->rac_lock, flags);
/* Get conn's fmaq processed, now I've just put something
* there */
kranal_schedule_conn(conn);
read_unlock(&kranal_data.kra_global_lock);
}
}
void
kranal_check_fma_cq (kra_device_t *dev)
{
kra_conn_t *conn;
RAP_RETURN rrc;
__u32 cqid;
__u32 event_type;
struct list_head *conns;
struct list_head *tmp;
int i;
for (;;) {
rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
if (rrc != RAP_NOT_DONE)
return;
LASSERT (rrc == RAP_SUCCESS);
if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
read_lock(&kranal_data.kra_global_lock);
conn = kranal_cqid2conn_locked(cqid);
if (conn == NULL)
CWARN("FMA CQID lookup %d failed\n", cqid);
else
kranal_schedule_conn(conn);
read_unlock(&kranal_data.kra_global_lock);
continue;
}
/* FMA CQ has overflowed: check ALL conns */
CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
read_lock(&kranal_data.kra_global_lock);
conns = &kranal_data.kra_conns[i];
list_for_each (tmp, conns) {
conn = list_entry(tmp, kra_conn_t,
rac_hashlist);
if (conn->rac_device == dev)
kranal_schedule_conn(conn);
}
/* don't block write lockers for too long... */
read_unlock(&kranal_data.kra_global_lock);
}
}
}
int
kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
void *immediate, int immediatenob)
{
int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
RAP_RETURN rrc;
LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
immediatenob <= RANAL_FMA_MAX_DATA :
immediatenob == 0);
msg->ram_connstamp = conn->rac_my_connstamp;
msg->ram_seq = conn->rac_tx_seq;
if (sync)
rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
immediate, immediatenob,
msg, sizeof(*msg));
else
rrc = RapkFmaSend(conn->rac_device->rad_handle,
immediate, immediatenob,
msg, sizeof(*msg));
switch (rrc) {
default:
LBUG();
case RAP_SUCCESS:
conn->rac_last_tx = jiffies;
conn->rac_tx_seq++;
return 0;
case RAP_NOT_DONE:
return -EAGAIN;
}
}
void
kranal_process_fmaq (kra_conn_t *conn)
{
unsigned long flags;
int more_to_do;
kra_tx_t *tx;
int rc;
int expect_reply;
/* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
* However I will be rescheduled some by a rad_fma_cq event when
* I eventually get some.
* NB 2. Sampling rac_state here, races with setting it elsewhere
* kranal_close_conn_locked. But it doesn't matter if I try to
* send a "real" message just as I start closing because I'll get
* scheduled to send the close anyway. */
if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
if (!list_empty(&conn->rac_rdmaq)) {
/* RDMAs in progress */
LASSERT (!conn->rac_close_sent);
if (time_after_eq(jiffies,
conn->rac_last_tx +
conn->rac_keepalive)) {
kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
}
return;
}
if (conn->rac_close_sent)
return;
kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
if (rc != 0)
return;
conn->rac_close_sent = 1;
if (!conn->rac_close_recvd)
return;
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
if (conn->rac_state == RANAL_CONN_CLOSING)
kranal_terminate_conn_locked(conn);
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
return;
}
spin_lock_irqsave(&conn->rac_lock, flags);
if (list_empty(&conn->rac_fmaq)) {
spin_unlock_irqrestore(&conn->rac_lock, flags);
if (time_after_eq(jiffies,
conn->rac_last_tx + conn->rac_keepalive)) {
kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
}
return;
}
tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
list_del(&tx->tx_list);
more_to_do = !list_empty(&conn->rac_fmaq);
spin_unlock_irqrestore(&conn->rac_lock, flags);
expect_reply = 0;
switch (tx->tx_msg.ram_type) {
default:
LBUG();
case RANAL_MSG_IMMEDIATE:
case RANAL_MSG_PUT_NAK:
case RANAL_MSG_PUT_DONE:
case RANAL_MSG_GET_NAK:
case RANAL_MSG_GET_DONE:
rc = kranal_sendmsg(conn, &tx->tx_msg,
tx->tx_buffer, tx->tx_nob);
expect_reply = 0;
break;
case RANAL_MSG_PUT_REQ:
tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
kranal_map_buffer(tx);
expect_reply = 1;
break;
case RANAL_MSG_PUT_ACK:
rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
expect_reply = 1;
break;
case RANAL_MSG_GET_REQ:
kranal_map_buffer(tx);
tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
(__u64)((unsigned long)tx->tx_buffer);
tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
expect_reply = 1;
break;
}
if (rc == -EAGAIN) {
/* I need credits to send this. Replace tx at the head of the
* fmaq and I'll get rescheduled when credits appear */
spin_lock_irqsave(&conn->rac_lock, flags);
list_add(&tx->tx_list, &conn->rac_fmaq);
spin_unlock_irqrestore(&conn->rac_lock, flags);
return;
}
LASSERT (rc == 0);
if (!expect_reply) {
kranal_tx_done(tx, 0);
} else {
spin_lock_irqsave(&conn->rac_lock, flags);
list_add_tail(&tx->tx_list, &conn->rac_replyq);
tx->tx_qtime = jiffies;
spin_unlock_irqrestore(&conn->rac_lock, flags);
}
if (more_to_do)
kranal_schedule_conn(conn);
}
static inline void
kranal_swab_rdma_desc (kra_rdma_desc_t *d)
{
__swab64s(&d->rard_key.Key);
__swab16s(&d->rard_key.Cookie);
__swab16s(&d->rard_key.MdHandle);
__swab32s(&d->rard_key.Flags);
__swab64s(&d->rard_addr.AddressBits);
__swab32s(&d->rard_nob);
}
kra_tx_t *
kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
{
struct list_head *ttmp;
kra_tx_t *tx;
list_for_each(ttmp, &conn->rac_replyq) {
tx = list_entry(ttmp, kra_tx_t, tx_list);
if (tx->tx_cookie != cookie)
continue;
if (tx->tx_msg.ram_type != type) {
CWARN("Unexpected type %x (%x expected) "
"matched reply from "LPX64"\n",
tx->tx_msg.ram_type, type,
conn->rac_peer->rap_nid);
return NULL;
}
}
CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
return NULL;
}
void
kranal_check_fma_rx (kra_conn_t *conn)
{
unsigned long flags;
__u32 seq;
kra_tx_t *tx;
kra_msg_t *msg;
void *prefix;
RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
kra_peer_t *peer = conn->rac_peer;
if (rrc == RAP_NOT_DONE)
return;
LASSERT (rrc == RAP_SUCCESS);
conn->rac_last_rx = jiffies;
seq = conn->rac_rx_seq++;
msg = (kra_msg_t *)prefix;
if (msg->ram_magic != RANAL_MSG_MAGIC) {
if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
CERROR("Unexpected magic %08x from "LPX64"\n",
msg->ram_magic, peer->rap_nid);
goto out;
}
__swab32s(&msg->ram_magic);
__swab16s(&msg->ram_version);
__swab16s(&msg->ram_type);
__swab64s(&msg->ram_srcnid);
__swab64s(&msg->ram_connstamp);
__swab32s(&msg->ram_seq);
/* NB message type checked below; NOT here... */
switch (msg->ram_type) {
case RANAL_MSG_PUT_ACK:
kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
break;
case RANAL_MSG_GET_REQ:
kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
break;
default:
break;
}
}
if (msg->ram_version != RANAL_MSG_VERSION) {
CERROR("Unexpected protocol version %d from "LPX64"\n",
msg->ram_version, peer->rap_nid);
goto out;
}
if (msg->ram_srcnid != peer->rap_nid) {
CERROR("Unexpected peer "LPX64" from "LPX64"\n",
msg->ram_srcnid, peer->rap_nid);
goto out;
}
if (msg->ram_connstamp != conn->rac_peer_connstamp) {
CERROR("Unexpected connstamp "LPX64"("LPX64
" expected) from "LPX64"\n",
msg->ram_connstamp, conn->rac_peer_connstamp,
peer->rap_nid);
goto out;
}
if (msg->ram_seq != seq) {
CERROR("Unexpected sequence number %d(%d expected) from "
LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
goto out;
}
if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
/* This message signals RDMA completion... */
rrc = RapkFmaSyncWait(conn->rac_rihandle);
LASSERT (rrc == RAP_SUCCESS);
}
if (conn->rac_close_recvd) {
CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
msg->ram_type, conn->rac_peer->rap_nid);
goto out;
}
if (msg->ram_type == RANAL_MSG_CLOSE) {
conn->rac_close_recvd = 1;
write_lock_irqsave(&kranal_data.kra_global_lock, flags);
if (conn->rac_state == RANAL_CONN_ESTABLISHED)
kranal_close_conn_locked(conn, 0);
else if (conn->rac_close_sent)
kranal_terminate_conn_locked(conn);
write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
goto out;
}
if (conn->rac_state != RANAL_CONN_ESTABLISHED)
goto out;
conn->rac_rxmsg = msg; /* stash message for portals callbacks */
/* they'll NULL rac_rxmsg if they consume it */
switch (msg->ram_type) {
case RANAL_MSG_NOOP:
/* Nothing to do; just a keepalive */
break;
case RANAL_MSG_IMMEDIATE:
lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
break;
case RANAL_MSG_PUT_REQ:
lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
break;
tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
if (tx == NULL)
break;
tx->tx_msg.ram_u.completion.racm_cookie =
msg->ram_u.putreq.raprm_cookie;
kranal_post_fma(conn, tx);
break;
case RANAL_MSG_PUT_NAK:
tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
msg->ram_u.completion.racm_cookie);
if (tx == NULL)
break;
LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
kranal_tx_done(tx, -ENOENT); /* no match */
break;
case RANAL_MSG_PUT_ACK:
tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
msg->ram_u.putack.rapam_src_cookie);
if (tx == NULL)
break;
kranal_rdma(tx, RANAL_MSG_PUT_DONE,
&msg->ram_u.putack.rapam_desc,
msg->ram_u.putack.rapam_desc.rard_nob,
msg->ram_u.putack.rapam_dst_cookie);
break;
case RANAL_MSG_PUT_DONE:
tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
msg->ram_u.completion.racm_cookie);
if (tx == NULL)
break;
LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
kranal_tx_done(tx, 0);
break;
case RANAL_MSG_GET_REQ:
lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
break;
tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
if (tx == NULL)
break;
tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
kranal_post_fma(conn, tx);
break;
case RANAL_MSG_GET_NAK:
tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
msg->ram_u.completion.racm_cookie);
if (tx == NULL)
break;
LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
kranal_tx_done(tx, -ENOENT); /* no match */
break;
case RANAL_MSG_GET_DONE:
tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
msg->ram_u.completion.racm_cookie);
if (tx == NULL)
break;
LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
kranal_tx_done(tx, 0);
break;
}
out:
if (conn->rac_rxmsg != NULL)
kranal_consume_rxmsg(conn, NULL, 0);
/* check again later */
kranal_schedule_conn(conn);
}
void
kranal_complete_closed_conn (kra_conn_t *conn)
{
kra_tx_t *tx;
LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
while (!list_empty(&conn->rac_fmaq)) {
tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
list_del(&tx->tx_list);
kranal_tx_done(tx, -ECONNABORTED);
}
LASSERT (list_empty(&conn->rac_rdmaq));
while (!list_empty(&conn->rac_replyq)) {
tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
list_del(&tx->tx_list);
kranal_tx_done(tx, -ECONNABORTED);
}
}
int
kranal_scheduler (void *arg)
{
kra_device_t *dev = (kra_device_t *)arg;
wait_queue_t wait;
char name[16];
kra_conn_t *conn;
unsigned long flags;
int busy_loops = 0;
snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
kportal_daemonize(name);
kportal_blockallsigs();
dev->rad_scheduler = current;
init_waitqueue_entry(&wait, current);
spin_lock_irqsave(&dev->rad_lock, flags);
while (!kranal_data.kra_shutdown) {
/* Safe: kra_shutdown only set when quiescent */
if (busy_loops++ >= RANAL_RESCHED) {
spin_unlock_irqrestore(&dev->rad_lock, flags);
our_cond_resched();
busy_loops = 0;
spin_lock_irqsave(&dev->rad_lock, flags);
}
if (dev->rad_ready) {
/* Device callback fired since I last checked it */
dev->rad_ready = 0;
spin_unlock_irqrestore(&dev->rad_lock, flags);
kranal_check_rdma_cq(dev);
kranal_check_fma_cq(dev);
spin_lock_irqsave(&dev->rad_lock, flags);
}
if (!list_empty(&dev->rad_connq)) {
/* Connection needs attention */
conn = list_entry(dev->rad_connq.next,
kra_conn_t, rac_schedlist);
list_del_init(&conn->rac_schedlist);
LASSERT (conn->rac_scheduled);
conn->rac_scheduled = 0;
spin_unlock_irqrestore(&dev->rad_lock, flags);
kranal_check_fma_rx(conn);
kranal_process_fmaq(conn);
if (conn->rac_state == RANAL_CONN_CLOSED)
kranal_complete_closed_conn(conn);
kranal_conn_decref(conn);
spin_lock_irqsave(&dev->rad_lock, flags);
continue;
}
add_wait_queue(&dev->rad_waitq, &wait);
set_current_state(TASK_INTERRUPTIBLE);
spin_unlock_irqrestore(&dev->rad_lock, flags);
busy_loops = 0;
schedule();
set_current_state(TASK_RUNNING);
remove_wait_queue(&dev->rad_waitq, &wait);
spin_lock_irqsave(&dev->rad_lock, flags);
}
spin_unlock_irqrestore(&dev->rad_lock, flags);
dev->rad_scheduler = NULL;
kranal_thread_fini();
return 0;
}
lib_nal_t kranal_lib = {
libnal_data: &kranal_data, /* NAL private data */
libnal_send: kranal_send,
libnal_send_pages: kranal_send_pages,
libnal_recv: kranal_recv,
libnal_recv_pages: kranal_recv_pages,
libnal_dist: kranal_dist
};