Copyright (c) 2006 The Regents of the University of Michigan
All rights reserved.

Permission is granted to use, copy, create derivative works and
redistribute this software and such derivative works for any purpose,
so long as the name of the University of Michigan is not used in
any advertising or publicity pertaining to the use or distribution
of this software without specific, written prior authorization.  If
the above copyright notice or any other identification of the
university of michigan is included in any copy of any portion of
this software, then the disclaimer below must also be included.

This software is provided as is, without representation from the
University of Michigan as to its fitness for any purpose, and without
warranty by the university of michigan of any kind, either express
or implied, including without limitation the implied warranties of
merchantability and fitness for a particular purpose.  The Regents
of the University of Michigan shall not be liable for any damages,
including special, indirect, incidental, or consequential damages,
with respect to any claim arising out or in connection with the use
of the software, even if it has been or is hereafter advised of the
possibility of such damages.

Signed-off-by: dunlapg@umich.edu

diff -purN -x dat -x config -x '*~' linux-2.6.9/fs/Kconfig linux-2.6.9-RPCRDMA/fs/Kconfig
--- linux-2.6.9/fs/Kconfig	2004-10-18 14:54:32.000000000 -0700
+++ linux-2.6.9-RPCRDMA/fs/Kconfig	2005-06-01 13:58:14.000000000 -0700
@@ -1514,6 +1514,15 @@ config NFSD_TCP
 	  TCP connections usually perform better than the default UDP when
 	  the network is lossy or congested.  If unsure, say Y.
 
+config NFSD_RDMA
+	tristate "Provide NFS server over RDMA support"
+	select SUNRPC_RDMA
+	default n
+	help
+	  If you want your NFS server to support RDMA connections, say Y here.
+	  Automatically selects SUNRPC_RDMA as well.
+	  If unsure, say N.
+
 config ROOT_NFS
 	bool "Root file system on NFS"
 	depends on NFS_FS=y && IP_PNP
@@ -1545,6 +1554,9 @@ config SUNRPC
 
 config SUNRPC_GSS
 	tristate
+	
+config SUNRPC_RDMA
+	tristate
 
 config RPCSEC_GSS_KRB5
 	tristate "Secure RPC: Kerberos V mechanism (EXPERIMENTAL)"
diff -purN -x dat -x config -x '*~' linux-2.6.9/fs/nfsd/nfssvc.c linux-2.6.9-RPCRDMA/fs/nfsd/nfssvc.c
--- linux-2.6.9/fs/nfsd/nfssvc.c	2004-10-18 14:54:31.000000000 -0700
+++ linux-2.6.9-RPCRDMA/fs/nfsd/nfssvc.c	2005-06-01 12:48:38.000000000 -0700
@@ -101,15 +101,25 @@ nfsd_svc(unsigned short port, int nrserv
 		nfsd_serv = svc_create(&nfsd_program, NFSD_BUFSIZE);
 		if (nfsd_serv == NULL)
 			goto out;
+		dprintk("nfsd: calling svc_makesock for UDP\n");
 		error = svc_makesock(nfsd_serv, IPPROTO_UDP, port);
 		if (error < 0)
 			goto failure;
 
 #ifdef CONFIG_NFSD_TCP
+		dprintk("nfsd: calling svc_makesock for TCP\n");
 		error = svc_makesock(nfsd_serv, IPPROTO_TCP, port);
 		if (error < 0)
 			goto failure;
 #endif
+/* XXX There should be an condition on CONFIG_NFSD_RDMA here */
+		/* XXX we need a new protocol constant for RDMA */
+		dprintk("nfsd: calling svc_makesock for RDMA\n");
+		error = svc_makesock(nfsd_serv, IPPROTO_MAX + 1, port);
+/* 		error = svc_makexprt(nfsd_serv, IPPROTO_MAX + 1, port); */
+		if (error < 0)
+			goto failure;
+
 		do_gettimeofday(&nfssvc_boot);		/* record boot time */
 	} else
 		nfsd_serv->sv_nrthreads++;
diff -purN -x dat -x config -x '*~' linux-2.6.9/include/linux/sunrpc/svcsock.h linux-2.6.9-RPCRDMA/include/linux/sunrpc/svcsock.h
--- linux-2.6.9/include/linux/sunrpc/svcsock.h	2004-10-18 14:53:45.000000000 -0700
+++ linux-2.6.9-RPCRDMA/include/linux/sunrpc/svcsock.h	2005-06-01 12:51:37.000000000 -0700
@@ -1,7 +1,7 @@
 /*
  * linux/include/linux/sunrpc/svcsock.h
  *
- * RPC server socket I/O.
+ * RPC server transport-independent I/O.
  *
  * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
  */
@@ -13,6 +13,7 @@
 
 /*
  * RPC server socket.
+ * @deprecated switch to svc_xprt
  */
 struct svc_sock {
 	struct list_head	sk_ready;	/* list of ready sockets */
@@ -53,9 +54,54 @@ struct svc_sock {
 };
 
 /*
+ * RPC transport instance
+ */
+struct svc_xprt {
+	struct list_head	sk_ready;	/* list of ready sockets */
+	struct list_head	sk_list;	/* list of all sockets */
+
+	void *			sx_data;	/* transport-specific data */
+
+	struct socket *		sk_sock;	/* berkeley socket layer */
+	struct sock *		sk_sk;		/* INET layer */
+
+	struct svc_serv *	sk_server;	/* service for this socket */
+	unsigned int		sk_inuse;	/* use count */
+	unsigned long		sk_flags;
+#define	SK_BUSY		0			/* enqueued/receiving */
+#define	SK_CONN		1			/* conn pending */
+#define	SK_CLOSE	2			/* dead or dying */
+#define	SK_DATA		3			/* data pending */
+#define	SK_TEMP		4			/* temp (TCP) socket */
+#define	SK_DEAD		6			/* socket closed */
+#define	SK_CHNGBUF	7			/* need to change snd/rcv buffer sizes */
+#define	SK_DEFERRED	8			/* request on sk_deferred */
+
+	int			sk_reserved;	/* space on outq that is reserved */
+
+	struct list_head	sk_deferred;	/* deferred requests that need to
+						 * be revisted */
+	struct semaphore        sk_sem;		/* to serialize sending data */
+
+	int			(*sx_recvfrom)(struct svc_rqst *rqstp);
+	int			(*sx_sendto)(struct svc_rqst *rqstp);
+
+	/* We keep the old state_change and data_ready CB's here */
+	void			(*sk_ostate)(struct sock *);
+	void			(*sk_odata)(struct sock *, int bytes);
+	void			(*sk_owspace)(struct sock *);
+
+	/* private TCP part */
+	int			sk_reclen;	/* length of record */
+	int			sk_tcplen;	/* current read length */
+	time_t			sk_lastrecv;	/* time of last received request */
+};
+
+/*
  * Function prototypes.
  */
 int		svc_makesock(struct svc_serv *, int, unsigned short);
+int		svc_makexprt(struct svc_serv *, int, unsigned short);
 void		svc_delete_socket(struct svc_sock *);
 int		svc_recv(struct svc_serv *, struct svc_rqst *, long);
 int		svc_send(struct svc_rqst *);
diff -purN -x dat -x config -x '*~' linux-2.6.9/include/linux/sunrpc/svcxprt_rdma.h linux-2.6.9-RPCRDMA/include/linux/sunrpc/svcxprt_rdma.h
--- linux-2.6.9/include/linux/sunrpc/svcxprt_rdma.h	1969-12-31 16:00:00.000000000 -0800
+++ linux-2.6.9-RPCRDMA/include/linux/sunrpc/svcxprt_rdma.h	2005-06-01 12:51:37.000000000 -0700
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2006 The Regents of the University of Michigan
+ * All rights reserved.
+ *
+ * Permission is granted to use, copy, create derivative works and
+ * redistribute this software and such derivative works for any purpose,
+ * so long as the name of the University of Michigan is not used in
+ * any advertising or publicity pertaining to the use or distribution
+ * of this software without specific, written prior authorization.  If
+ * the above copyright notice or any other identification of the
+ * university of michigan is included in any copy of any portion of
+ * this software, then the disclaimer below must also be included.
+ *
+ * This software is provided as is, without representation from the
+ * University of Michigan as to its fitness for any purpose, and without
+ * warranty by the university of michigan of any kind, either express
+ * or implied, including without limitation the implied warranties of
+ * merchantability and fitness for a particular purpose.  The Regents
+ * of the University of Michigan shall not be liable for any damages,
+ * including special, indirect, incidental, or consequential damages,
+ * with respect to any claim arising out or in connection with the use
+ * of the software, even if it has been or is hereafter advised of the
+ * possibility of such damages.
+ *
+ * RDMA-specific functions for RPC to call
+ */
+ 
+int svc_create_rdma_xprt(struct svc_serv *serv, int protocol, unsigned short port);
diff -purN -x dat -x config -x '*~' linux-2.6.9/include/linux/sunrpc/svcxprt_rdma_kdapl.h linux-2.6.9-RPCRDMA/include/linux/sunrpc/svcxprt_rdma_kdapl.h
--- linux-2.6.9/include/linux/sunrpc/svcxprt_rdma_kdapl.h	1969-12-31 16:00:00.000000000 -0800
+++ linux-2.6.9-RPCRDMA/include/linux/sunrpc/svcxprt_rdma_kdapl.h	2005-06-01 12:51:37.000000000 -0700
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2006 The Regents of the University of Michigan
+ * All rights reserved.
+ *
+ * Permission is granted to use, copy, create derivative works and
+ * redistribute this software and such derivative works for any purpose,
+ * so long as the name of the University of Michigan is not used in
+ * any advertising or publicity pertaining to the use or distribution
+ * of this software without specific, written prior authorization.  If
+ * the above copyright notice or any other identification of the
+ * university of michigan is included in any copy of any portion of
+ * this software, then the disclaimer below must also be included.
+ *
+ * This software is provided as is, without representation from the
+ * University of Michigan as to its fitness for any purpose, and without
+ * warranty by the university of michigan of any kind, either express
+ * or implied, including without limitation the implied warranties of
+ * merchantability and fitness for a particular purpose.  The Regents
+ * of the University of Michigan shall not be liable for any damages,
+ * including special, indirect, incidental, or consequential damages,
+ * with respect to any claim arising out or in connection with the use
+ * of the software, even if it has been or is hereafter advised of the
+ * possibility of such damages.
+ *
+ * Defines types and functions used for the implementation of an RDMA RPC
+ * transport.
+ *
+ * This header file depends on the inclusion of the following others:
+ * - dat/kdat.h: KDAT API types
+ */
+
+/* 
+ * Public Service Point & associated structures
+ */
+struct rdma_psp {
+	DAT_PSP_HANDLE rp_psp_handle;
+	DAT_EVD_HANDLE rp_conn_evd_handle;
+	DAT_EVD_HANDLE rp_dto_evd_handle;
+};
+
+/*
+ * Interface Adapter & associated structures
+ */
+struct rdma_ia {
+	unsigned int		ri_initialized;
+	DAT_IA_HANDLE		ri_ia_handle;		
+	DAT_EVD_HANDLE		ri_async_evd_handle;    
+	DAT_IA_ATTR		ri_ia_attr;		
+	DAT_PROVIDER_ATTR       ri_pv_attr;		
+	struct rdma_psp		ri_psp;
+	DAT_PZ_HANDLE		ri_pz_handle;
+/*	
+	int		ri_memreg_strategy;
+	DAT_LMR_HANDLE	ri_bind_mem;
+#if RPCRDMA_DEBUG
+	DAT_RMR_CONTEXT	ri_bind_rmr;
+#endif
+	DAT_LMR_TRIPLET	ri_bind_iov;
+*/	
+};
+
+int rdma_init_ia(struct rdma_ia * ia, const DAT_NAME_PTR ia_name, DAT_COUNT evd_min_qlen);
+		 
+int rdma_create_pub_svc_point(struct rdma_ia * ia, DAT_CONN_QUAL connection_qualifier);	 
+
diff -purN -x dat -x config -x '*~' linux-2.6.9/net/sunrpc/Makefile linux-2.6.9-RPCRDMA/net/sunrpc/Makefile
--- linux-2.6.9/net/sunrpc/Makefile	2004-10-18 14:53:09.000000000 -0700
+++ linux-2.6.9-RPCRDMA/net/sunrpc/Makefile	2005-06-09 10:51:27.601998400 -0700
@@ -10,6 +10,8 @@ sunrpc-y := clnt.o xprt.o sched.o \
 	    auth.o auth_null.o auth_unix.o \
 	    svc.o svcsock.o svcauth.o svcauth_unix.o \
 	    pmap_clnt.o timer.o xdr.o \
-	    sunrpc_syms.o cache.o rpc_pipe.o
+	    sunrpc_syms.o cache.o rpc_pipe.o \
+	    svcxprt_rdma.o svcxprt_rdma_kdapl.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
 sunrpc-$(CONFIG_SYSCTL) += sysctl.o
+sunrpc-$(CONFIG_SUNRPC_RDMA) += svcxprt_rdma_kdapl.o svcxprt_rdma.o
diff -purN -x dat -x config -x '*~' linux-2.6.9/net/sunrpc/svcsock.c linux-2.6.9-RPCRDMA/net/sunrpc/svcsock.c
--- linux-2.6.9/net/sunrpc/svcsock.c	2004-10-18 14:53:13.000000000 -0700
+++ linux-2.6.9-RPCRDMA/net/sunrpc/svcsock.c	2005-06-01 12:48:51.000000000 -0700
@@ -44,6 +44,10 @@
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/stats.h>
 
+/* XXX move to module? */
+#include <linux/sunrpc/svcxprt_rdma.h>
+
+
 /* SMP locking strategy:
  *
  * 	svc_serv->sv_lock protects most stuff for that service.
@@ -1475,17 +1479,54 @@ svc_delete_socket(struct svc_sock *svsk)
 
 /*
  * Make a socket for nfsd and lockd
+ * @deprecated switch to svc_makexprt
  */
 int
 svc_makesock(struct svc_serv *serv, int protocol, unsigned short port)
 {
 	struct sockaddr_in	sin;
 
-	dprintk("svc: creating socket proto = %d\n", protocol);
-	sin.sin_family      = AF_INET;
-	sin.sin_addr.s_addr = INADDR_ANY;
-	sin.sin_port        = htons(port);
-	return svc_create_socket(serv, protocol, &sin);
+/* 	dprintk("svc: creating socket proto = %d\n", protocol); */
+/* 	sin.sin_family      = AF_INET; */
+/* 	sin.sin_addr.s_addr = INADDR_ANY; */
+/* 	sin.sin_port        = htons(port); */
+/* 	return svc_create_socket(serv, protocol, &sin); */
+
+	
+	if (protocol == IPPROTO_UDP || protocol == IPPROTO_TCP) {
+		dprintk("svc: creating socket proto = %d\n", protocol);
+		sin.sin_family      = AF_INET;
+		sin.sin_addr.s_addr = INADDR_ANY;
+		sin.sin_port        = htons(port);
+		return svc_create_socket(serv, protocol, &sin);
+	} else { /* XXX need RDMA protocol constant */
+		dprintk("svc: creating RDMA transport\n");
+		return svc_create_rdma_xprt(serv, protocol, htons(port));
+	}
+}
+
+/*
+ * Make a transport instance for nfsd and lockd
+ * @param serv the RPC service this instance will belong to
+ * @param protocol the protocol for the instance
+ * @param port the port to listen on
+ * @return 0 on success, negative value for errors
+ */
+int
+svc_makexprt(struct svc_serv *serv, int protocol, unsigned short port)
+{
+	struct sockaddr_in	sin;
+	
+	if (protocol == IPPROTO_UDP || protocol == IPPROTO_TCP) {
+		dprintk("svc: creating socket proto = %d\n", protocol);
+		sin.sin_family      = AF_INET;
+		sin.sin_addr.s_addr = INADDR_ANY;
+		sin.sin_port        = htons(port);
+		return svc_create_socket(serv, protocol, &sin);
+	} else { /* XXX need RDMA protocol constant */
+		dprintk("svc: creating RDMA transport\n");
+		return svc_create_rdma_xprt(serv, protocol, port);
+	}
 }
 
 /*
diff -purN -x dat -x config -x '*~' linux-2.6.9/net/sunrpc/svcxprt_rdma.c linux-2.6.9-RPCRDMA/net/sunrpc/svcxprt_rdma.c
--- linux-2.6.9/net/sunrpc/svcxprt_rdma.c	1969-12-31 16:00:00.000000000 -0800
+++ linux-2.6.9-RPCRDMA/net/sunrpc/svcxprt_rdma.c	2005-06-07 14:56:49.000000000 -0700
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2006 The Regents of the University of Michigan
+ * All rights reserved.
+ *
+ * Permission is granted to use, copy, create derivative works and
+ * redistribute this software and such derivative works for any purpose,
+ * so long as the name of the University of Michigan is not used in
+ * any advertising or publicity pertaining to the use or distribution
+ * of this software without specific, written prior authorization.  If
+ * the above copyright notice or any other identification of the
+ * university of michigan is included in any copy of any portion of
+ * this software, then the disclaimer below must also be included.
+ *
+ * This software is provided as is, without representation from the
+ * University of Michigan as to its fitness for any purpose, and without
+ * warranty by the university of michigan of any kind, either express
+ * or implied, including without limitation the implied warranties of
+ * merchantability and fitness for a particular purpose.  The Regents
+ * of the University of Michigan shall not be liable for any damages,
+ * including special, indirect, incidental, or consequential damages,
+ * with respect to any claim arising out or in connection with the use
+ * of the software, even if it has been or is hereafter advised of the
+ * possibility of such damages.
+ *
+ * RDMA Interface layer for RPC
+ * This file uses kDAPL-specific knowledge to implement an abstract transport
+ * implementation for RPC to use without needing to include kDAPL information.
+ */
+
+#include <linux/sunrpc/svc.h>
+#include <linux/sunrpc/svcsock.h> /* change to svcxprt.h later */
+#include <dat/kdat.h>
+#include <linux/sunrpc/svcxprt_rdma.h>
+#include <linux/sunrpc/svcxprt_rdma_kdapl.h>
+#include <linux/sunrpc/debug.h>
+
+# define RPCDBG_FACILITY     RPCDBG_XPRT
+
+static struct rdma_ia default_ia = { .ri_initialized = 0 };
+
+/* XXX these are hard-wired for now */
+static const DAT_NAME_PTR ia_name = "ib0";
+static DAT_COUNT evd_min_queue_length = 4;
+
+/*
+ * Create an RDMA kDAPL public service point (PSP) and associate it with the
+ * RPC service
+ * @param serv the RPC service the PSP will belong to
+ * @param protocol currently unused
+ * @param port the port to listen on in network byte-order
+ * @return 0 on success,
+ *        -1 if the interface adapter can't be initialized,
+ *        -2 if the public service point can't be created
+ */
+int
+svc_create_rdma_xprt(struct svc_serv *serv, int protocol, unsigned short port)
+{
+	DAT_CONN_QUAL connection_qualifier = (DAT_CONN_QUAL) htons(port);
+	struct svc_xprt *xprt;
+	
+	dprintk("svc_create_rdma_xprt: default_ia.ri_initialized = %d\n",
+	        default_ia.ri_initialized);
+
+	if (!default_ia.ri_initialized) {
+		if (rdma_init_ia(&default_ia, ia_name, evd_min_queue_length)) {
+			return -1;
+		}
+	}
+	
+	if (rdma_create_pub_svc_point(&default_ia, connection_qualifier)) {
+		return -2;
+	}
+	
+	if (!(xprt = kmalloc(sizeof(*xprt), GFP_KERNEL))) {
+		return -ENOMEM;
+	}
+	memset(xprt, 0, sizeof(*xprt));
+	
+	xprt->sx_data = &default_ia;
+	
+	return 0;
+}
diff -purN -x dat -x config -x '*~' linux-2.6.9/net/sunrpc/svcxprt_rdma_kdapl.c linux-2.6.9-RPCRDMA/net/sunrpc/svcxprt_rdma_kdapl.c
--- linux-2.6.9/net/sunrpc/svcxprt_rdma_kdapl.c	1969-12-31 16:00:00.000000000 -0800
+++ linux-2.6.9-RPCRDMA/net/sunrpc/svcxprt_rdma_kdapl.c	2005-06-07 15:15:20.000000000 -0700
@@ -0,0 +1,373 @@
+/*
+ * Copyright (c) 2006 The Regents of the University of Michigan
+ * All rights reserved.
+ *
+ * Permission is granted to use, copy, create derivative works and
+ * redistribute this software and such derivative works for any purpose,
+ * so long as the name of the University of Michigan is not used in
+ * any advertising or publicity pertaining to the use or distribution
+ * of this software without specific, written prior authorization.  If
+ * the above copyright notice or any other identification of the
+ * university of michigan is included in any copy of any portion of
+ * this software, then the disclaimer below must also be included.
+ *
+ * This software is provided as is, without representation from the
+ * University of Michigan as to its fitness for any purpose, and without
+ * warranty by the university of michigan of any kind, either express
+ * or implied, including without limitation the implied warranties of
+ * merchantability and fitness for a particular purpose.  The Regents
+ * of the University of Michigan shall not be liable for any damages,
+ * including special, indirect, incidental, or consequential damages,
+ * with respect to any claim arising out or in connection with the use
+ * of the software, even if it has been or is hereafter advised of the
+ * possibility of such damages.
+ */
+
+#include <dat/kdat.h>
+#include <linux/module.h>
+#include <linux/sunrpc/svcxprt_rdma_kdapl.h>
+#include <linux/sunrpc/debug.h>
+#include <asm/io.h> /* memset() */
+
+# define RPCDBG_FACILITY     RPCDBG_XPRT
+
+/**
+ * async_evd_upcall - Upcall for "catastropic" errors on the interface adapter
+ * @instance_data: pointer to the &rdma_ia struct
+ * @event: the event that triggered the upcall
+ * @more_events: whether there are additional events on the EVD
+ **/
+static void
+async_evd_upcall(DAT_PVOID instance_data,
+                 const DAT_EVENT *event,
+		 DAT_BOOLEAN more_events)
+{
+	printk("async_evd_upcall: got event # %x\n", event->event_number);
+}
+
+/**
+ * dto_event_upcall - Handles data transfer events
+ *
+ * @instance_data: pointer to the rdma_ia struct
+ * @event: the event that triggered the upcall
+ * @more_events: whether there are additional events on the EVD
+ *
+ * This upcall handles DTO, (recv, send, bind and unbind) events.
+ * It is reentrant but has been specified using DAT_UPCALL_SINGLE_INSTANCE
+ * in order to maintain ordering of receives to keep server credits.
+ * It must also be prepared to be called from interrupt context,
+ * so it must not block or perform blocking calls.
+ *
+ * It is the responsibility of the scheduled tasklet to return
+ * recv buffers to the pool. NOTE: this affects synchronization of
+ * connection shutdown. That is, the structures required for
+ * the completion of the reply handler must remain intact until
+ * all memory has been reclaimed. There is some work here TBD.
+ *
+ * Note that send events are suppressed and do not result in an upcall.
+ **/
+static void
+dto_event_upcall(DAT_PVOID instance_data,
+		 const DAT_EVENT *event, DAT_BOOLEAN more_events)
+{
+	printk("dto_event_upcall: got event # %x\n", event->event_number);
+}
+
+/**
+ * connection_event_upcall - Handles Connection related events for a service point
+ * @instance_data: pointer to the rdma_ia struct
+ * @event: the event that triggered the upcall
+ * @more_events: whether there are additional events on the EVD
+ **/
+static void
+connection_event_upcall(DAT_PVOID instance_data,
+			const DAT_EVENT *event,
+			DAT_BOOLEAN more_events)
+{
+	DAT_RETURN datstatus;
+	struct rdma_ia *ia = (struct rdma_ia *)instance_data;
+	DAT_CR_HANDLE cr_handle = (DAT_CR_HANDLE) event->event_data.cr_arrival_event_data.cr_handle;
+	DAT_CR_PARAM cr_param;
+	DAT_EP_PARAM ep_param;
+
+	switch (event->event_number) {
+	case DAT_CONNECTION_REQUEST_EVENT:
+		dprintk("%s: got DAT_CONNECTION_REQUEST_EVENT\n", __FUNCTION__);
+		datstatus = dat_cr_query(cr_handle, DAT_CR_FIELD_ALL, &cr_param);
+		if (datstatus != DAT_SUCCESS) {
+			printk("%s: cr_query failed %x\n", __FUNCTION__, datstatus);
+			return;
+		}
+
+		memset(&ep_param, 0, sizeof ep_param);
+		ep_param.pz_handle = ia->ri_pz_handle;
+		ep_param.recv_evd_handle = ia->ri_psp.rp_dto_evd_handle;
+		ep_param.request_evd_handle = ia->ri_psp.rp_dto_evd_handle;
+		ep_param.connect_evd_handle = ia->ri_psp.rp_conn_evd_handle;
+
+		datstatus = dat_ep_modify(cr_param.local_ep_handle,
+					  DAT_EP_FIELD_PZ_HANDLE |
+					  DAT_EP_FIELD_RECV_EVD_HANDLE |
+					  DAT_EP_FIELD_REQUEST_EVD_HANDLE |
+					  DAT_EP_FIELD_CONNECT_EVD_HANDLE,
+					  &ep_param);
+		if (datstatus != DAT_SUCCESS) {
+			printk("%s: dat_ep_modify failed %x\n", __FUNCTION__, datstatus);
+			return;
+		}
+
+
+		datstatus = dat_cr_accept(cr_handle, DAT_HANDLE_NULL, (DAT_COUNT) 0, NULL);
+		if (datstatus != DAT_SUCCESS) {
+			dprintk("%s: dat_cr_accept failed\n", __FUNCTION__);
+			if (datstatus == DAT_INVALID_HANDLE) {
+				dprintk("\treason: DAT_INVALID_HANDLE\n");
+			} else if (datstatus == DAT_INVALID_PARAMETER) {
+				dprintk("\treason: DAT_INVALID_PARAMETER\n");		
+			} else {
+				dprintk("\treason: unknown (%x)\n", datstatus);		
+			}
+		} else {
+			dprintk("%s: dat_cr_accept SUCCEEDED!\n", __FUNCTION__);
+		}
+		return;
+	case DAT_CONNECTION_EVENT_ESTABLISHED:
+		dprintk("%s: got DAT_CONNECTION_EVENT_ESTABLISHED; ignoring\n", __FUNCTION__);
+		return;
+	case DAT_CONNECTION_EVENT_DISCONNECTED:
+		dprintk("%s: got DAT_CONNECTION_EVENT_DISCONNECTED; ignoring\n", __FUNCTION__);
+		return;
+	default:
+		dprintk("%s: got unknown event # %x; ignoring\n", __FUNCTION__, event->event_number);
+		return;
+	}
+
+}
+
+/*
+ * Cleans up an EVD to allow it to be reclaimed. Modifys the upcall policy
+ * prior to calling dat_evd_free to ensure graceful retirement.
+ * @param evd_handle the handle of the EVD to be retired
+ */
+static void
+free_evd(DAT_EVD_HANDLE evd_handle) {
+	DAT_RETURN datstatus;
+	
+	/* XXX do we need to synchronize this? */
+	datstatus = dat_evd_modify_upcall(evd_handle,
+	                                  DAT_UPCALL_TEARDOWN,
+					  &DAT_UPCALL_NULL); /* XXX use DAT_UPCALL_SAME in v1.2 */
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: free_evd: dat_evd_modify_upcall failed\n", __FILE__, __LINE__);
+		return;
+	}
+
+	datstatus = dat_evd_free(evd_handle);
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: free_evd: dat_evd_free failed\n", __FILE__, __LINE__);
+	}
+}
+
+/**
+ * rdma_close_ia - close interface adapter after cleaning up associated objects
+ * @rdma_ia: pointer to interface adapter struct to close
+ **/
+static void
+rdma_close_ia(struct rdma_ia * ia)
+{
+	DAT_RETURN datstatus;
+	struct rdma_psp * psp = &ia->ri_psp;
+	
+	dprintk("in rdma_ia_close\n");
+	
+	/* XXX free pre-bound memory */
+	
+	if (ia->ri_pz_handle != DAT_HANDLE_NULL) {
+		datstatus = dat_pz_free(ia->ri_pz_handle);
+		if (datstatus == DAT_SUCCESS) {
+			ia->ri_pz_handle = DAT_HANDLE_NULL;
+		} else {
+			dprintk("%s:%d: rdma_ia_close: dat_pz_free failed\n", __FILE__, __LINE__);
+		}
+	}
+	
+	/* XXX do some cleanup with ASYNC EVD handle? */
+	
+	if (psp->rp_psp_handle != DAT_HANDLE_NULL) {
+	
+		free_evd(psp->rp_conn_evd_handle);
+		free_evd(psp->rp_dto_evd_handle);
+		
+		datstatus = dat_psp_free(psp->rp_psp_handle);
+		if (datstatus == DAT_SUCCESS) {
+			psp->rp_psp_handle = DAT_HANDLE_NULL;
+		} else {
+			dprintk("%s:%d: rdma_ia_close: dat_psp_free failed\n", __FILE__, __LINE__);			
+		}
+	}
+
+	if (ia->ri_ia_handle != DAT_HANDLE_NULL) {
+		datstatus = dat_ia_close(ia->ri_ia_handle, DAT_CLOSE_GRACEFUL_FLAG);
+		if (datstatus == DAT_SUCCESS) {
+			ia->ri_ia_handle = DAT_HANDLE_NULL;
+		} else if (datstatus == DAT_INVALID_STATE) {
+			dprintk("rdma_ia_close: graceful close failed, trying abrupt\n");
+			datstatus = dat_ia_close(ia->ri_ia_handle, DAT_CLOSE_ABRUPT_FLAG);
+		} else {
+			dprintk("%s:%d: rdma_ia_close: dat_ia_close failed\n", __FILE__, __LINE__);		
+		}
+	}	
+}
+
+/**
+ * rdma_init_ia - open and initialize the interface adapter
+ * @rdma_ia: pointer to IA struct that will be written into by this function
+ * @ia_name: the IA name, e.g. "ib0"
+ * @evd_min_qlen: minimum length for the asynchronous event dispatcher queue
+ **/
+int
+rdma_init_ia(struct rdma_ia * ia,
+             const DAT_NAME_PTR ia_name,
+	     DAT_COUNT evd_min_qlen)
+{
+	DAT_RETURN		datstatus;
+	DAT_UPCALL_OBJECT       upcall_obj;
+	
+	dprintk("rdma_init_ia: initializing %s\n", ia_name);
+
+	if (ia->ri_initialized) {
+		dprintk("%s:%d: rdma_init: adapter already initialized\n", __FILE__, __LINE__);
+		return 2;
+	}
+
+	memset(ia, 0, sizeof *ia);
+	ia->ri_async_evd_handle = DAT_HANDLE_NULL;
+	
+	datstatus = dat_ia_open(ia_name,
+	                        evd_min_qlen,
+				&ia->ri_async_evd_handle,
+				&ia->ri_ia_handle);
+				
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: rdma_init_ia: dat_ia_open failed (%x)\n", __FILE__, __LINE__, datstatus);
+		goto out;
+	}
+	
+	/* XXX need to set up upcall function */
+	
+	upcall_obj.instance_data = ia;
+	upcall_obj.upcall_func = async_evd_upcall;
+	datstatus = dat_evd_modify_upcall(ia->ri_async_evd_handle,
+	                                  DAT_UPCALL_SINGLE_INSTANCE,
+					  &upcall_obj);
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: rdma_ia_init: dat_evd_modify_upcall failed\n", __FILE__, __LINE__);
+		/* not fatal, don't goto out */
+	}
+	
+	datstatus = dat_ia_query(ia->ri_ia_handle,
+	                         &ia->ri_async_evd_handle,
+				 DAT_IA_ALL, &ia->ri_ia_attr,
+				 DAT_PROVIDER_FIELD_ALL, &ia->ri_pv_attr);
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: rdma_ia_init: dat_ia_query failed\n", __FILE__, __LINE__);
+		goto out;
+	}
+	
+	/* Create protection zone */
+
+ 	datstatus = dat_pz_create(ia->ri_ia_handle, &ia->ri_pz_handle);
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: rdma_ia_init: dat_pz_create failed\n", __FILE__, __LINE__);
+		goto out;
+	} else {
+		dprintk("rdma_init_ia: PZ created successfully!\n");
+	}
+
+	/* XXX preregister memory here */
+	
+out:
+	if (datstatus == DAT_SUCCESS) {
+		dprintk("rdma_init_ia: success, handle =  %p\n", ia->ri_ia_handle);
+		ia->ri_initialized = 1;
+		return 0;
+	} else if (ia->ri_ia_handle != DAT_HANDLE_NULL) {
+		rdma_close_ia(ia); /* sets handle to NULL */
+	}
+	dprintk("rdma_init_ia: failure\n");
+	return -1;
+}
+
+/**
+ * rdma_create_pub_svc_point - Create a public service point and add upcall
+ * @rdma_ia: pointer to interface adapter struct to create PSP on
+ * @connection_qualifier: the connection qualifier to use for
+ *        the service point
+ *	
+ * Creates a public service point, creates an EVD for it with appropriate
+ * upcall attached for connection request events.
+ * Returns 0 on success.
+ **/
+int
+rdma_create_pub_svc_point(struct rdma_ia * ia,
+                          DAT_CONN_QUAL connection_qualifier)
+{
+	DAT_RETURN datstatus;
+	DAT_UPCALL_OBJECT conn_upcall_obj;
+	DAT_UPCALL_OBJECT dto_upcall_obj;
+	struct rdma_psp * psp = &ia->ri_psp;
+
+	dprintk("%s: IA handle =  %p\n", __FUNCTION__, ia->ri_ia_handle);
+	dprintk("%s: psp = %p %d\n", __FUNCTION__, psp, (u32) connection_qualifier);
+	
+	/* create EVD for handling Connection Requested Events */
+	
+	conn_upcall_obj.instance_data = ia;
+	conn_upcall_obj.upcall_func = connection_event_upcall;
+	
+	datstatus = dat_evd_kcreate(ia->ri_ia_handle,
+	                            4,
+				    DAT_UPCALL_SINGLE_INSTANCE,
+				    &conn_upcall_obj,
+				    DAT_EVD_CR_FLAG | DAT_EVD_CONNECTION_FLAG,
+				    &psp->rp_conn_evd_handle);
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: %s: dat_evd_kcreate failed\n", __FILE__, __LINE__, __FUNCTION__);
+		return -1;
+	}
+
+	dprintk("%s: Created Connection EVD =  %p\n", __FUNCTION__, psp->rp_conn_evd_handle);
+
+	/* create EVD for handling DTO Events */
+	dto_upcall_obj.instance_data = ia;
+	dto_upcall_obj.upcall_func = dto_event_upcall;
+
+	datstatus = dat_evd_kcreate(ia->ri_ia_handle,
+				    8,
+				    DAT_UPCALL_SINGLE_INSTANCE,
+				    &dto_upcall_obj,
+				    DAT_EVD_DTO_FLAG | DAT_EVD_RMR_BIND_FLAG,
+				    &psp->rp_dto_evd_handle);
+	if (datstatus != DAT_SUCCESS) {
+		free_evd(psp->rp_conn_evd_handle);
+		printk("%s: dat_evd_kcreate dto_evd failed\n %x", __FILE__, datstatus);
+		return -1;
+	}
+
+	dprintk("%s: Created DTO EVD =  %p\n", __FUNCTION__, psp->rp_dto_evd_handle);
+
+	datstatus = dat_psp_create(ia->ri_ia_handle,
+	                           connection_qualifier, /* XXX */
+				   psp->rp_conn_evd_handle,
+				   DAT_PSP_PROVIDER_FLAG,
+				   &psp->rp_psp_handle);
+	if (datstatus != DAT_SUCCESS) {
+		dprintk("%s:%d: dat_psp_create failed (%x)\n", __FUNCTION__, __LINE__, datstatus);
+		free_evd(psp->rp_conn_evd_handle);
+		free_evd(psp->rp_dto_evd_handle);
+		return -1;
+	}
+	
+	dprintk("%s: success\n", __FUNCTION__);
+	return 0;
+}
