
Write into the page cache and write back the data to the pNFS layout driver.
This allows the layout driver to utilize the Linux NFSv4 client 
writeback cache and Linux page cache.

---

 linux-2.6.14-pnfs-current-dhildebz/fs/nfs/pnfs.c           |  180 +++++++++++++
 linux-2.6.14-pnfs-current-dhildebz/fs/nfs/pnfs.h           |    4 
 linux-2.6.14-pnfs-current-dhildebz/fs/nfs/write.c          |  104 ++++---
 linux-2.6.14-pnfs-current-dhildebz/include/linux/nfs_xdr.h |    1 
 4 files changed, 248 insertions(+), 41 deletions(-)

diff -puN fs/nfs/pnfs.c~client-writeback fs/nfs/pnfs.c
--- linux-2.6.14-pnfs-current/fs/nfs/pnfs.c~client-writeback	2006-01-12 13:19:20.960921000 -0500
+++ linux-2.6.14-pnfs-current-dhildebz/fs/nfs/pnfs.c	2006-01-12 13:19:21.000921000 -0500
@@ -45,11 +45,18 @@
 #include <linux/nfs4_pnfs.h>
 #include <linux/nfs_fs.h>
 #include <linux/nfs_mount.h>
+#include <linux/nfs_page.h>
 
 #include "nfs4_fs.h"
 #include "pnfs.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PNFS
+
+extern void nfs_writepage_release(struct nfs_page *req);
+extern void nfs_inode_remove_request(struct nfs_page *req);
+extern void nfs_mark_request_commit(struct nfs_page *req);
+extern void nfs_writeback_done_full(struct nfs_write_data *data, int status);
+
 /* Locking:
  *
  * pnfs_spinlock:
@@ -344,6 +351,172 @@ out:
 	return result;
 }
 
+/* Retrieve and return whether the layout driver wants I/O requests
+ * to first travel through NFS I/O rpocessing functions and the page
+ * cache.
+ * TODO: Create separate policy interface.
+ */
+int
+use_page_cache(struct inode *inode)
+{
+	struct nfs_server* nfss = NFS_SERVER(inode);
+	struct nfs_inode* nfsi = NFS_I(inode);
+	int use_pagecache = 0;
+
+	if (nfss->rpc_ops->version == 4 &&
+	    nfss->pnfs_curr_ld)
+	{
+		use_pagecache = nfss->pnfs_curr_ld->ld_policy_ops->use_pagecache(nfsi->current_layout, inode);
+		dprintk("%s val: %Zd\n",__FUNCTION__, use_pagecache);
+
+		if (use_pagecache > 0)
+			return 1;
+	}
+	return 0;
+}
+
+/* Return if there exists a layoutdriver for this file system.
+ * This is utilized in the paging system to determine if
+ * it should utilize NFSv4 or pNFS I/O.
+ * Provides the ability to determine this based on size of read or write,
+ * although I don't do this for now.
+ */
+int
+use_pnfs_io(struct inode *inode,unsigned int count)
+{
+	return use_page_cache(inode);
+}
+
+/*
+ * Handle a write reply that flushed part of a page.
+ */
+static void
+pnfs_writeback_done_partial(struct nfs_write_data *data, int status)
+{
+	struct nfs_page		*req = data->req;
+	struct page		*page = req->wb_page;
+
+	dprintk("%s (%s/%Ld %d@%Ld)",
+		__FUNCTION__,
+		req->wb_context->dentry->d_inode->i_sb->s_id,
+		(long long)NFS_FILEID(req->wb_context->dentry->d_inode),
+		req->wb_bytes,
+		(long long)req_offset(req));
+
+	if (status < 0) {
+		dprintk(", error = %d\n", status);
+		ClearPageUptodate(page);
+		SetPageError(page);
+		req->wb_context->error = status;
+	} else {
+		/* DH: I'm skipping some info here that requires checking
+		 * the write verifier (since I don't have one).  I'm also
+		 * ignoring the stable/unstable write question
+		 * with pNFS layout drivers. (look at nfs_writeback_done_partial)
+		 */
+		dprintk(" OK\n");
+	}
+
+	if (atomic_dec_and_test(&req->wb_complete))
+	{
+		dprintk("%s: calling nfs_writeback_release\n",__FUNCTION__);
+		nfs_writepage_release(req);
+	}
+}
+
+/*
+ * Handle a write reply that flushed part of a page.
+ */
+void pnfs_writeback_done(struct nfs_write_data *data, int status)
+{
+	dprintk("%s: Begin\n",__FUNCTION__);
+
+	if (data->ispartial)
+	{
+		pnfs_writeback_done_partial(data, status);
+	}
+	else
+	{
+		nfs_writeback_done_full(data, status);
+	}
+}
+
+/*
+ * Call the appropriate parallel I/O subsystem write function.
+ * If no I/O device driver exists, or one does match the returned
+ * fstype, then call regular NFS processing.
+ * TODO: Is wdata->how and wdata->args.stable always the same value?
+ */
+int
+pnfs_writepages(struct nfs_write_data* wdata)
+{
+	struct nfs_writeargs *args = &wdata->args;
+	struct inode *inode = wdata->inode;
+	int numpages, status = -EIO, pgcount=0, temp;
+	struct nfs_server* nfss = NFS_SERVER(inode);
+	struct nfs_inode* nfsi = NFS_I(inode);
+
+	dprintk("%s: Writing ino:%lu %u@%llu\n", __FUNCTION__, inode->i_ino, args->count, args->offset);
+
+	/* Step 1: Retrieve and set layout if not allready cached */
+	if ((status = virtual_update_layout(inode,
+					    args->context,
+					    args->count,
+					    args->offset,
+					    FMODE_WRITE)))
+	{
+		goto out;
+	}
+
+        /* find out the number of pages
+	 * TODO: Is this necessary?  Does the layout driver need the # of pages?
+	 */
+	pgcount = args->pgbase + args->count;
+	temp = pgcount % PAGE_CACHE_SIZE;
+	numpages = pgcount / PAGE_CACHE_SIZE;
+	if (temp != 0)
+	{
+		numpages++;
+	}
+	dprintk("%s: Using %d pages\n",__FUNCTION__, numpages);
+
+	/* Step 2: Execute the write with the layout driver
+	 */
+	dprintk("%s: Calling layout driver write\n",__FUNCTION__);
+	if (nfss->pnfs_curr_ld->ld_io_ops && nfss->pnfs_curr_ld->ld_io_ops->write_pagelist)
+	{
+		status = nfss->pnfs_curr_ld->ld_io_ops->write_pagelist(nfsi->current_layout,
+								       inode,
+								       args->pages,
+								       args->pgbase,
+								       numpages,
+								       (loff_t)args->offset,
+								       args->count,
+								       args->stable,
+								       (void*)wdata);
+		/*  Step 3: Mark the inode as dirty, requiring a fsync via the layout driver */
+		if (status)
+			nfsi->pnfs_dirty = 1;
+	}
+	else
+	{
+		status = -EIO;
+		goto out;
+	}
+
+	/* Step 4: Finish writing back the page list */
+	dprintk("%s: Calling writeback_done\n",__FUNCTION__);
+	/* TODO: NFS is async and so can't free the data structure yet.
+	 * Should this be a policy decision?
+	 */
+	if (args->stable != NFS_FILE_SYNC && nfss->pnfs_curr_ld->id != LAYOUT_NFSV4_FILES)
+		pnfs_writeback_done(wdata, status);
+
+ out:
+	dprintk("%s: End Status %d\n",__FUNCTION__, status);
+	return status;
+}
+
 /*
  * Call the appropriate parallel I/O subsystem read function.
  * If no I/O device driver exists, or one does match the returned
@@ -448,6 +621,13 @@ pnfs_file_write(struct file* filp, const
 			dentry->d_parent->d_name.name, dentry->d_name.name,
 			inode->i_ino, (unsigned long) count, (unsigned long) *pos);
 
+	/* Go through NFS operations if using page cache */
+	if (use_page_cache(inode))
+	{
+		/* Using NFS page cache */
+		return do_sync_write(filp,buf,count,pos);
+	}
+
 	/* Need to adjust write param if this is an append, etc */
 	generic_write_checks(filp,pos,&count,isblk);
 
diff -puN fs/nfs/write.c~client-writeback fs/nfs/write.c
--- linux-2.6.14-pnfs-current/fs/nfs/write.c~client-writeback	2006-01-12 13:19:20.971922000 -0500
+++ linux-2.6.14-pnfs-current-dhildebz/fs/nfs/write.c	2006-01-12 13:21:08.583467000 -0500
@@ -63,6 +63,7 @@
 #include <linux/smp_lock.h>
 
 #include "delegation.h"
+#include "pnfs.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PAGECACHE
 
@@ -77,7 +78,7 @@ static struct nfs_page * nfs_update_requ
 					    struct page *,
 					    unsigned int, unsigned int);
 static void nfs_writeback_done_partial(struct nfs_write_data *, int);
-static void nfs_writeback_done_full(struct nfs_write_data *, int);
+void nfs_writeback_done_full(struct nfs_write_data *, int);
 static int nfs_wait_on_write_congestion(struct address_space *, int);
 static int nfs_wait_on_requests(struct inode *, unsigned long, unsigned int);
 static int nfs_flush_inode(struct inode *inode, unsigned long idx_start,
@@ -104,7 +105,7 @@ static inline void nfs_commit_free(struc
 	mempool_free(p, nfs_commit_mempool);
 }
 
-static void nfs_writedata_release(struct rpc_task *task)
+void nfs_writedata_release(struct rpc_task *task)
 {
 	struct nfs_write_data	*wdata = (struct nfs_write_data *)task->tk_calldata;
 	nfs_writedata_free(wdata);
@@ -195,7 +196,15 @@ static int nfs_writepage_sync(struct nfs
 			wdata->args.count = count;
 		wdata->args.offset = page_offset(page) + wdata->args.pgbase;
 
-		result = NFS_PROTO(inode)->write(wdata);
+                /* Switch between pNFS and NFSv4 I/O */
+		if (!use_pnfs_io(inode, count))
+		{
+			result = NFS_PROTO(inode)->write(wdata);
+		}
+		else
+		{
+			result = pnfs_writepages(wdata);
+		}
 
 		if (result < 0) {
 			/* Must mark the page invalid after I/O error */
@@ -389,7 +398,7 @@ static int nfs_inode_add_request(struct 
 /*
  * Insert a write request into an inode
  */
-static void nfs_inode_remove_request(struct nfs_page *req)
+void nfs_inode_remove_request(struct nfs_page *req)
 {
 	struct inode *inode = req->wb_context->dentry->d_inode;
 	struct nfs_inode *nfsi = NFS_I(inode);
@@ -469,7 +478,7 @@ nfs_dirty_request(struct nfs_page *req)
 /*
  * Add a request to the inode's commit list.
  */
-static void
+void
 nfs_mark_request_commit(struct nfs_page *req)
 {
 	struct inode *inode = req->wb_context->dentry->d_inode;
@@ -806,7 +815,7 @@ done:
 	return status;
 }
 
-static void nfs_writepage_release(struct nfs_page *req)
+void nfs_writepage_release(struct nfs_page *req)
 {
 	end_page_writeback(req->wb_page);
 
@@ -842,13 +851,24 @@ static inline int flush_task_priority(in
 	return RPC_PRIORITY_NORMAL;
 }
 
+static void nfs_execute_write(struct nfs_write_data *data)
+{
+	struct rpc_clnt *clnt = NFS_CLIENT(data->inode);
+	sigset_t oldset;
+	rpc_clnt_sigmask(clnt, &oldset);
+	lock_kernel();
+	rpc_execute(&data->task);
+	unlock_kernel();
+	rpc_clnt_sigunmask(clnt, &oldset);
+}
+
 /*
  * Set up the argument/result storage required for the RPC call.
  */
-static void nfs_write_rpcsetup(struct nfs_page *req,
-		struct nfs_write_data *data,
-		unsigned int count, unsigned int offset,
-		int how)
+void nfs_write_call(struct nfs_page *req,
+		      struct nfs_write_data *data,
+		      unsigned int count, unsigned int offset,
+		      int how)
 {
 	struct inode		*inode;
 
@@ -871,32 +891,32 @@ static void nfs_write_rpcsetup(struct nf
 	data->res.verf    = &data->verf;
 	nfs_fattr_init(&data->fattr);
 
-	NFS_PROTO(inode)->write_setup(data, how);
-
-	data->task.tk_priority = flush_task_priority(how);
-	data->task.tk_cookie = (unsigned long)inode;
-	data->task.tk_calldata = data;
-	/* Release requests */
-	data->task.tk_release = nfs_writedata_release;
-
-	dprintk("NFS: %4d initiated write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
-		data->task.tk_pid,
-		inode->i_sb->s_id,
-		(long long)NFS_FILEID(inode),
-		count,
-		(unsigned long long)data->args.offset);
-}
+	/* Only create an rpc request if utilizing NFSv4 I/O */
+	if (!use_pnfs_io(inode, count))
+	{
+		NFS_PROTO(inode)->write_setup(data, how);
+		data->task.tk_priority = flush_task_priority(how);
+		data->task.tk_cookie = (unsigned long)inode;
+		data->task.tk_calldata = data;
+		/* Release requests */
+		/* This is only called for RPC, not pNFS */
+		data->task.tk_release = nfs_writedata_release;
 
-static void nfs_execute_write(struct nfs_write_data *data)
-{
-	struct rpc_clnt *clnt = NFS_CLIENT(data->inode);
-	sigset_t oldset;
+		nfs_execute_write(data);
 
-	rpc_clnt_sigmask(clnt, &oldset);
-	lock_kernel();
-	rpc_execute(&data->task);
-	unlock_kernel();
-	rpc_clnt_sigunmask(clnt, &oldset);
+		dprintk("NFS: %4d initiated write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
+			data->task.tk_pid,
+			inode->i_sb->s_id,
+			(long long)NFS_FILEID(inode),
+			count,
+			(unsigned long long)data->args.offset);
+	}
+	else
+	{
+                /* Redefine the type of competion required for pNFS. */
+		data->complete = pnfs_writeback_done;
+		pnfs_writepages(data);
+	}
 }
 
 /*
@@ -938,16 +958,16 @@ static int nfs_flush_multi(struct list_h
 
 		data->pagevec[0] = page;
 		data->complete = nfs_writeback_done_partial;
+		data->ispartial = 1;
 
 		if (nbytes > wsize) {
-			nfs_write_rpcsetup(req, data, wsize, offset, how);
+			nfs_write_call(req, data, wsize, offset, how);
 			offset += wsize;
 			nbytes -= wsize;
 		} else {
-			nfs_write_rpcsetup(req, data, nbytes, offset, how);
+			nfs_write_call(req, data, nbytes, offset, how);
 			nbytes = 0;
 		}
-		nfs_execute_write(data);
 	} while (nbytes != 0);
 
 	return 0;
@@ -978,6 +998,9 @@ static int nfs_flush_one(struct list_hea
 	struct nfs_write_data	*data;
 	unsigned int		count;
 
+	/* Ignore this case for pNFS since the wsize will be as big
+	 * as possible
+	 */
 	if (NFS_SERVER(inode)->wsize < PAGE_CACHE_SIZE)
 		return nfs_flush_multi(head, inode, how);
 
@@ -999,10 +1022,11 @@ static int nfs_flush_one(struct list_hea
 	req = nfs_list_entry(data->pages.next);
 
 	data->complete = nfs_writeback_done_full;
+	data->ispartial = 0;
+
 	/* Set up the argument struct */
-	nfs_write_rpcsetup(req, data, count, 0, how);
+	nfs_write_call(req, data, count, 0, how);
 
-	nfs_execute_write(data);
 	return 0;
  out_bad:
 	while (!list_empty(head)) {
@@ -1087,7 +1111,7 @@ static void nfs_writeback_done_partial(s
  *	  writebacks since the page->count is kept > 1 for as long
  *	  as the page has a write request pending.
  */
-static void nfs_writeback_done_full(struct nfs_write_data *data, int status)
+void nfs_writeback_done_full(struct nfs_write_data *data, int status)
 {
 	struct nfs_page		*req;
 	struct page		*page;
diff -puN include/linux/nfs_xdr.h~client-writeback include/linux/nfs_xdr.h
--- linux-2.6.14-pnfs-current/include/linux/nfs_xdr.h~client-writeback	2006-01-12 13:19:20.977921000 -0500
+++ linux-2.6.14-pnfs-current-dhildebz/include/linux/nfs_xdr.h	2006-01-12 13:19:21.015921000 -0500
@@ -713,6 +713,7 @@ struct nfs_write_data {
 	unsigned long		timestamp;	/* For lease renewal */
 #endif
 	void (*complete) (struct nfs_write_data *, int);
+	unsigned int            ispartial;
 };
 
 struct nfs_access_entry;
diff -puN fs/nfs/pnfs.h~client-writeback fs/nfs/pnfs.h
--- linux-2.6.14-pnfs-current/fs/nfs/pnfs.h~client-writeback	2006-01-12 13:20:07.922725000 -0500
+++ linux-2.6.14-pnfs-current-dhildebz/fs/nfs/pnfs.h	2006-01-12 13:20:22.768003000 -0500
@@ -16,5 +16,7 @@ void set_pnfs_layoutdriver(struct super_
 void unmount_pnfs_layoutdriver(struct super_block *sb);
 ssize_t pnfs_file_write(struct file* filp, const char __user *buf, size_t count, loff_t* pos);
 ssize_t pnfs_file_read(struct file* filp, char __user *buf, size_t count, loff_t* pos);
-
+int use_pnfs_io(struct inode *inode,unsigned int count);
+int pnfs_writepages(struct nfs_write_data *wdata);
+void pnfs_writeback_done(struct nfs_write_data *data, int status);
 #endif /* FS_NFS_PNFS_H */
_
