/*
Galaxy Filesystem Toolkit
Copyright (C) 2005 Chad Yoshikawa
http://www.ececs.uc.edu/~yoshikco
yoshikco@ececs.uc.edu
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package galaxy.nfs;
import galaxy.java.*;
import galaxy.nfs.mount.*;
import galaxy.nfs.v2.*;
import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
import java.util.Properties;
import java.util.Date;
import java.util.Vector;
import java.io.IOException;
import java.net.*;
import org.acplt.oncrpc.*;
/**
* A filesystem extension which talks to a NFS v2 server
*
* @author <a href="mailto:yoshikco@ececs.uc.edu">Chad Yoshikawa</a>
* @version 1.0
*/
public class NFSFileSystem implements galaxy.java.FileSystem, mount,nfs {
private String myNFSMountPointName;
private boolean myCachingFlag;
private String myNFSServerName;
private mountClient myMountClient;
private nfs_fh myRootFileHandle;
private int myNFSUID, myNFSGID;
// a pool of client connections to the NFS server
// we do this because each client is completely synchronized w.r.t. each NFS call
// so we can get no parallelism without this.
private int MAX_NFS_CLIENTS=4;
private nfsClient[] myNFSClientPool;
// The index into the pool...very simple stuff.
private int myCurrentNFSClient=0;
private FileHandleCache myCache;
private AttrCache myAttrCache;
/**
* Create a new filesystem that connects to an NFS filesystem
*
*/
public NFSFileSystem() {
}
/**
* Do cleanup work such as unmounting the filesystem
*
*/
public void Destroy() {
// now cleanup and unmount the directory
Util.Debug("Destroy","Shutting down the NFS server...");
try {
myMountClient.MOUNTPROC_UMNT_1(new dirpath(myNFSMountPointName));
// Cleanup resources
for (int i=0;i<MAX_NFS_CLIENTS;i++) {
myNFSClientPool[i].close();
myNFSClientPool[i] = null;
}
myMountClient.close();
myMountClient = null;
} catch (Exception ex) {
;;
}
}
/**
* Initialize the filesystem. Mounts the remote filesystem and gets the
* root filehandle. Currently, we only use the uid and gid to set ownership
* on newly created files, not for authenticating. This should be easy to
* fix, however, using the setAuth method on the UDP RPC client.
*
* Therefore, the remote filesystem is assumed to be exported using
* anonuid,anonguid and the insecure options, e.g.:
* <pre>/dir1/exported_directory client_hostname(rw,insecure,all_squash,anonuid=501,anongid=501)
beowulf.linc.uc.edu(rw,insecure,all_squash,anonuid=501,anongid=501)
ececs-lincy.rhod.uc.edu(rw,insecure,all_squash,anonuid=501,anongid=501)</pre>
*
* @param props a <code>Properties</code> value which contains the following mappings:
* <ul>
* <li> mount_point The mount point on the server
* <li> server The server's DNS name
* <li> uid The uid of the NFS user.
* <li> gid The gid of the NFS user.
* <li> caching set to "true" if the attribute and filehandle caches should be used (the default)
* </ul>
*/
public void Init(Properties props) {
myNFSMountPointName = (String)props.getProperty("mount_point");
myNFSServerName = (String)props.getProperty("server");
myNFSUID = Integer.parseInt(props.getProperty("uid"));
myNFSGID = Integer.parseInt(props.getProperty("gid"));
myCachingFlag = Boolean.valueOf(props.getProperty("caching","true")).booleanValue(); // must be either true or anything else=false
if (myCachingFlag) {
myCache = new FileHandleCache();
myAttrCache = new AttrCache();
} else {
myCache = new NullFileHandleCache();
myAttrCache = new NullAttrCache();
}
try {
MAX_NFS_CLIENTS = Integer.parseInt(props.getProperty("n"));
} catch (Exception ex) {
MAX_NFS_CLIENTS = 4; // default
}
try {
// Speak tcp to mountd...
OncRpcClient tcp_mount_client =
new OncRpcTcpClient(InetAddress.getByName(myNFSServerName),
MOUNTPROG, MOUNTVERS, 0/*port - 0 means use portmapper*/);
// Speak udp to nfsd? Might try the same with tcp...
myMountClient = new mountClient(tcp_mount_client);
InitializeNFSClientPool();
dirpath path = new dirpath(myNFSMountPointName);
fhstatus status = myMountClient.MOUNTPROC_MNT_1(path);
if (status.fhs_status==0) {
Util.Debug("Init","Success -- got the filehandle for "+path.value);
myRootFileHandle = Util.ConvertMountHandleToNfsHandle(status.fhs_fhandle);
} else {
Util.Warn("Init","Failed -- couldn't get the filehandle for "+path.value+":"+status.fhs_status);
}
} catch (Exception ex) {
Util.Warn("Init",myNFSServerName+":"+myNFSMountPointName,ex);
}
}
private void InitializeNFSClientPool() {
myNFSClientPool = new nfsClient[MAX_NFS_CLIENTS];
try {
for (int i=0;i<MAX_NFS_CLIENTS;i++) {
// OncRpcClient udp_nfs_client =
// new OncRpcUdpClient(InetAddress.getByName(myNFSServerName),
// NFS_PROGRAM, NFS_VERSION, 0/*port - 0 means use portmapper*/,
// 32768); // use a larger (than default 8k) client buffer size
OncRpcUdpClient udp_nfs_client =
new OncRpcUdpClient(InetAddress.getByName(myNFSServerName),
NFS_PROGRAM, NFS_VERSION, 0/*port - 0 means use portmapper*/,
32768); // use a larger (than default 8k) client buffer size
// udp_nfs_client.setAuth(auth);
// make the re-xmission mode exponential backoff
udp_nfs_client.setRetransmissionMode(OncRpcUdpRetransmissionMode.EXPONENTIAL);
udp_nfs_client.setRetransmissionTimeout(800); /* .8 seconds like the SFU */
udp_nfs_client.setTimeout(30000); /* 30 seconds total timeout */
myNFSClientPool[i] = new nfsClient(udp_nfs_client);
}
} catch (Exception ex) {
Driver.logger.fatal("Could not create nfs clients:"+ex);
}
myCurrentNFSClient = 0;
}
/**
* Loop through the pool, returning a different client for every request
* Right now, don't bother with knowing which client is currently busy or not...
*
* @return a <code>nfsClient</code> value
*/
private synchronized nfsClient GetNFSClientFromPool() {
myCurrentNFSClient = (myCurrentNFSClient+1)%MAX_NFS_CLIENTS;
return myNFSClientPool[myCurrentNFSClient];
}
public galaxy.java.FileInfo[] ListFiles(String original_path) {
galaxy.java.FileInfo[] ret = null;
try {
// We need to traverse down the path, going down to the last
// directory, then list that directory...
nfs_fh dir_handle = _GetDirectoryHandle(original_path);
String[] list = _GetDirectoryEntries(dir_handle);
list = Util.RemoveSpecialFiles(list);
ret = new galaxy.java.FileInfo[list.length];
for (int i=0;i<list.length;i++) {
String filename = list[i];
try {
String fq_filename = Util.CatPath(original_path,filename);
ret[i] = _Stat(dir_handle,filename,fq_filename);
} catch (Exception ex) {
Util.Warn("ListFiles",original_path,ex);
}
}
} catch (Exception ex) {
Util.Warn("ListFiles",original_path,ex);
}
return (ret);
} // end ListFiles
/**
* Get the nfs filehandle of a named directory
*
* @param path an absolute path that starts with a slash and ends with a slash
* @return a <code>nfs_fh</code> value
*/
protected nfs_fh _GetDirectoryHandle(String path) {
nfs_fh current = null;
// First remove any leading or trailling slashes
try {
path = Util.RemoveSlashFromStartOfPath(path);
path = Util.RemoveSlashFromEndOfPath(path);
if (path.equals("")) {
return (myRootFileHandle);
} else {
// Break up the path into pieces.
String[] parts = path.split(GALAXY_PATH_SEPARATOR);
// Now for each part, do a lookup, starting with the root
current = myRootFileHandle;
for (int i=0;i<parts.length;i++) {
current = _LookupFileHandle(current,parts[i],true);
}
} // end else
} catch (Exception ex) {
Util.Warn("_GetDirectoryHandle",path,ex);
}
return (current);
}
protected nfs_fh _LookupFileHandle(nfs_fh dir, String filename, boolean cached) {
nfs_fh ret = null;
// check the cache first
if (cached && ((ret = myCache.Lookup(dir,filename))!=null)) {
return (ret);
}
try {
nfsClient myNFSClient = GetNFSClientFromPool();
diropargs args = new diropargs();
args.dir = dir;
args.name = new filename(filename);
diropres result = myNFSClient.NFSPROC_LOOKUP_2(args);
if (result.status == nfsstat.NFS_OK) {
diropokres okresult = result.diropres;
ret = okresult.file;
// now put this into the cache...
myCache.Put(dir,filename,ret);
myAttrCache.Put(ret,okresult.attributes);
} else {
Util.Warn("Error in lookup:"+result.status,filename);
}
} catch (Exception ex) {
Util.Warn("_LookupFileHandle",filename,ex);
}
return (ret);
}
public FileInfo Stat(String original_path) {
nfs_fh dir_handle = _GetDirectoryHandle(Util.GetPath(original_path));
FileInfo ret = new FileInfo();
try {
ret = _Stat(dir_handle,Util.GetFilename(original_path),original_path);
} catch (Exception ex) {
Util.Warn("Stat",original_path,ex);
}
return (ret);
}
/**
* Stat a file within a directory and return some information
*
* @param dir The directory the file is contained in
* @param filename The filename of the file or "" if the caller wants to stat the dir handle
* @param path The original path the file is in (logicall). This is the absolute path of the file
* without the filename, using GALAXY_PATH_SEPARATORS between path components.
* @return a <code>FileInfo</code> value representing the file
*/
private FileInfo _Stat(nfs_fh dir, String filename, String path) {
FileInfo ret = new FileInfo();
try {
fattr attr=null;
nfsClient myNFSClient = GetNFSClientFromPool();
// first lookup the handle
if (!filename.equals("")) {
dir = _LookupFileHandle(dir,filename,true);
}
if ((attr = myAttrCache.Lookup(dir))==null) {
attrstat result = myNFSClient.NFSPROC_GETATTR_2(dir);
if (result.status == nfsstat.NFS_OK) {
attr = result.attributes;
myAttrCache.Put(dir,attr);
} else {
Util.Debug("_Stat","Error - got "+result.status+" when performing getattributes on file "+path);
}
}
ret.CreateTime = Util.ConvertTime(attr.ctime);
ret.LastAccessTime = Util.ConvertTime(attr.atime);
ret.LastModifiedTime = Util.ConvertTime(attr.mtime);
ret.FileName = filename;
ret.FilePath = path;
ret.FolderFlag = (attr.type == ftype.NFDIR);
if (ret.FolderFlag) {
Util.Debug("_Stat","File with filename="+filename+" and path="+path+" is a folder");
}
if (!ret.FolderFlag) {
ret.Size = (long)attr.size;
} else {
// folders don't have file sizes
ret.Size = 0;
}
} catch (Exception ex) {
Util.Warn("_Stat",path,ex);
}
return (ret);
}
public boolean CreateDirectory(String path) {
boolean ret = false;
try {
nfsClient myNFSClient = GetNFSClientFromPool();
// just the root path, not the filename
nfs_fh dir_handle = _GetDirectoryHandle(Util.GetPath(path));
createargs args = new createargs();
args.where = new diropargs();
args.where.dir = dir_handle;
args.where.name = new filename(Util.GetFilename(path));
args.attributes = new sattr();
args.attributes.mtime = args.attributes.atime = Util.Now();
args.attributes.uid = myNFSUID;
args.attributes.gid = myNFSGID; // <--- what are we going to set these to be?
args.attributes.size = 0;
args.attributes.mode = NFSMODE_DIR | 0755; // <--- probably should make this a default mode var
diropres result = myNFSClient.NFSPROC_MKDIR_2(args);
ret = (result.status == nfsstat.NFS_OK);
if (!ret) {
Util.Debug("CreateDirectory","Result status is "+result.status);
} else {
myCache.Put(dir_handle,args.where.name.value,result.diropres.file);
myAttrCache.Put(result.diropres.file,result.diropres.attributes);
}
} catch (Exception ex) {
Util.Warn("CreateDirectory",path,ex);
}
return ret;
}
// Copy file directly to NFS from Windows
public boolean CopyFile(String windows_src, String galaxy_dst) {
boolean ret = false;
try {
Util.Warn("CopyFile","Not Implemented Yet");
} catch (Exception ex) {
}
return (ret);
}
public boolean CreateFile(String path) {
boolean ret = false;
try {
nfsClient myNFSClient = GetNFSClientFromPool();
// just the root path, not the filename
nfs_fh dir_handle = _GetDirectoryHandle(Util.GetPath(path));
createargs args = new createargs();
args.where = new diropargs();
args.where.dir = dir_handle;
args.where.name = new filename(Util.GetFilename(path));
args.attributes = new sattr();
args.attributes.mtime = args.attributes.atime = Util.Now();
args.attributes.uid = myNFSUID;
args.attributes.gid = myNFSGID; // <--- what are we going to set these to be?
args.attributes.size = 0;
args.attributes.mode = NFSMODE_REG | 0644; // <--- probably should make this a default mode var
diropres result = myNFSClient.NFSPROC_CREATE_2(args);
ret = (result.status == nfsstat.NFS_OK);
if (!ret) {
Util.Debug("CreateFile","Result status is "+result.status);
} else {
myCache.Put(dir_handle,args.where.name.value,result.diropres.file);
myAttrCache.Put(result.diropres.file,result.diropres.attributes);
}
} catch (Exception ex) {
Util.Warn("CreateFile",path,ex);
}
return ret;
}
public boolean Delete(String path) {
boolean ret = false;
try {
FileInfo info = Stat(path);
if (info.FolderFlag) {
ret = _DeleteFolder(path,true);
// Util.Debug("Delete","Recursive delete not currently supported: "+path+" not deleted");
} else {
ret = _DeleteFile(path);
}
} catch (Exception ex) {
Util.Warn("Delete",path,ex);
}
return ret;
}
public boolean _DeleteFolder(String path, boolean recursive) {
boolean ret = false;
if (!recursive) {
ret = _DeleteEmptyFolder(path);
} else {
// recurse down thru the directory hierarchy
// and remove on the way up so that we are sure to delete empty folders
ret = _DeleteRecursively(path);
}
return (ret);
}
protected boolean _DeleteRecursively(String path) {
boolean ret = true;
FileInfo [] infos = ListFiles(path);
for (int i=0;i<infos.length;i++) {
if (infos[i].FolderFlag) {
ret = ret && _DeleteRecursively(infos[i].FilePath);
} else {
ret = ret && _DeleteFile(infos[i].FilePath);
}
}
// Now I am empty
ret = ret && _DeleteEmptyFolder(path);
// ret is the boolean and of all the return values.
// rather than stop at the first error, we give a best effort when deleting...
return (ret);
}
protected boolean _DeleteEmptyFolder(String path) {
boolean ret = false;
try {
nfsClient myNFSClient = GetNFSClientFromPool();
diropargs args = new diropargs();
args.dir = _GetDirectoryHandle(Util.GetPath(path));
args.name = new filename(Util.GetFilename(path));
int ret_val = myNFSClient.NFSPROC_RMDIR_2(args);
if (ret_val == nfsstat.NFS_OK) {
ret = true;
myCache.Invalidate(args.dir,args.name.value);
myAttrCache.Invalidate(args.dir);
} else {
Util.Debug("_DeleteEmptyFolder","Could not remove folder "+path+" due to error "+ret_val);
}
} catch (Exception ex) {
Util.Warn("_DeleteEmptyFolder",path,ex);
}
return (ret);
}
protected boolean _DeleteFile(String path) {
boolean ret = false;
try {
nfsClient myNFSClient = GetNFSClientFromPool();
diropargs args = new diropargs();
args.dir = _GetDirectoryHandle(Util.GetPath(path));
args.name = new filename(Util.GetFilename(path));
int ret_val = myNFSClient.NFSPROC_REMOVE_2(args);
myCache.Invalidate(args.dir,args.name.value);
myAttrCache.Invalidate(args.dir);
if (ret_val == nfsstat.NFS_OK) {
ret = true;
} else {
Util.Debug("_DeleteFile","Could not remove file "+path+" due to error "+ret_val);
}
} catch (Exception ex) {
Util.Warn("_Delete",path,ex);
}
return (ret);
}
/**
* Read up to NFS_MAXDATA bytes and return them
*
* @deprecated Replaced by {@link galaxy.nfs.NFSFileSystem#Read} instead.
* @param path the fully-qualified filename
* @param offset offset into the file
* @param count how many bytes to read
* @return an array of bytes less than or equal to count
*/
public byte[] OldRead(String path, int offset, int count) {
byte[] ret = null;
nfsClient myNFSClient = GetNFSClientFromPool();
try {
readargs args = new readargs();
args.offset = offset;
args.count = Math.min(count,nfs.NFS_MAXDATA);
args.file = _GetDirectoryHandle(path);
readres result = myNFSClient.NFSPROC_READ_2(args);
if (result.status == nfsstat.NFS_OK) {
readokres reply = result.reply;
// ignore the returned attributes...
ret = reply.data;
} else {
Util.Debug("Read","Result from reading file="+path+" at offset="+offset+" for count="+count+" bytes:"+result.status);
}
} catch (Exception ex) {
Util.Warn("Read",path,ex);
}
return (ret);
}
/// New multithreaded read...
public byte[] Read(String path, int offset, int count) {
// if we're being asked for less than the max, don't bother using a thread to read the data
if (count < nfs.NFS_MAXDATA) {
return OldRead(path,offset,count);
}
byte[] ret = null;
int bytes_read = 0;
int bytes_to_read = count;
try {
// This holds my threads for writing
Vector threads = new Vector();
// vectors are synchronized, see http://java.sun.com/j2se/1.4.2/docs/api/java/util/Vector.html
// This holds the set of byte[] that threads will return to us.
byte[][] bufs = new byte[(int)(Math.ceil((double)count/(double)nfs.NFS_MAXDATA))][];
// Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Getting directory handle");
nfs_fh the_file = _GetDirectoryHandle(path);
// Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Done getting directory handle");
int thread_number=0;
while (bytes_read < count) {
readargs args = new readargs();
args.file = the_file;
// get a different connection for every write to try to parallelize the writes
nfsClient myNFSClient = GetNFSClientFromPool();
args.offset = bytes_read+offset;
args.count = Math.min(bytes_to_read,nfs.NFS_MAXDATA);
// Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Calling AsyncRead...");
AsyncRead(myNFSClient,args,path,threads,bufs,thread_number++);
// Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Done calling AsyncRead");
bytes_read+= args.count;
bytes_to_read-= args.count;
}
// Make sure all reads complete...
SynchronizeAllReads(threads);
ret = CombineAllReads(bufs);
} catch (Exception ex) {
Util.Warn("Write",path,ex);
}
return (ret);
}
// Can we save a memory copy here?
protected byte[] CombineAllReads(byte[][] bufs) {
byte[] ret =null;
int total_size = 0;
for (int i=0;i<bufs.length;i++) {
byte[] buffer = bufs[i];
if (buffer!=null) {
total_size += buffer.length;
}
}
ret = new byte[total_size];
int current_pos = 0;
for (int i=0;i<bufs.length;i++) {
byte[] buffer = bufs[i];
if (buffer!=null) {
System.arraycopy(buffer,0,ret,current_pos,buffer.length);
current_pos+=buffer.length;
}
}
return (ret);
}
protected void AsyncRead(nfsClient client, readargs args, String path, Vector threads, byte[][] bufs, int my_id) {
// spawn a thread to do the work...
ReaderThread rt = new ReaderThread(client, args, path,bufs,my_id);
threads.add(rt);
rt.start();
}
class ReaderThread extends Thread {
public nfsClient myNFSClient;
public readargs myArgs;
public String myPath;
public int myID;
public byte[][] myBufs;
public ReaderThread(nfsClient client, readargs args, String path, byte[][] bufs, int id) {
myNFSClient = client;
myArgs = args;
myPath = path;
myBufs = bufs;
myID = id;
}
public void run() {
try {
readres result = myNFSClient.NFSPROC_READ_2(myArgs);
if (result.status == nfsstat.NFS_OK) {
myAttrCache.Put(myArgs.file,result.reply.attributes);
readokres reply = result.reply;
// set the buffer...
myBufs[myID] = reply.data;
} else {
Util.Warn("ReaderThread.run","Result from reading file="+myPath+" at offset="+myArgs.offset+" for count="+myArgs.count+" bytes:"+result.status);
}
} catch (Exception ex) {
Util.Warn("ReaderThread.run",myPath,ex);
}
}
} // end inner class
/**
* Write data to the file, fragmenting into NFS_MAXDATA chunks.
* << NO THREADING>>
* @deprecated Use {@link galaxy.nfs.NFSFileSystem#Write} instead.
*
* @param path the fq filename
* @param offset offset into the file
* @param count how many bytes to write
* @param buffer the buffer of count bytes
* @return how many bytes were successfully written
*/
public int OldWrite(String path, int offset, int count, byte[] buffer) {
int ret = 0;
int bytes_written = 0;
int bytes_to_write = count;
try {
nfsClient myNFSClient = GetNFSClientFromPool();
writeargs args = new writeargs();
Util.TraceTime("NFSFileSystem.Write: Getting directory handle");
args.file = _GetDirectoryHandle(path);
Util.TraceTime("NFSFileSystem.Write: Done getting directory handle");
while (bytes_written < count) {
args.offset = bytes_written+offset;
byte[] smaller_buffer;
// optimize for special case where the buffer doesn't need to be copied
if ((bytes_written == 0) && (bytes_to_write <= nfs.NFS_MAXDATA)) {
// no memory copy needed
args.data = buffer;
} else {
smaller_buffer = new byte[Math.min(bytes_to_write,nfs.NFS_MAXDATA)];
Util.TraceTime("NFSFileSystem.Write: Starting memory copy for "+smaller_buffer.length+" bytes");
// Get rid of this memory copy...
System.arraycopy(buffer,bytes_written,smaller_buffer,0,smaller_buffer.length);
Util.TraceTime("NFSFileSystem.Write: Done with memory copy");
args.data = smaller_buffer;
}
Util.TraceTime("WriterThread: Calling NFSPROC_WRITE_2");
attrstat result = myNFSClient.NFSPROC_WRITE_2(args);
Util.TraceTime("WriterThread: Done calling NFSPROC_WRITE_2");
if (result.status == nfsstat.NFS_OK) {
bytes_written+= args.data.length;
bytes_to_write-= args.data.length;
myAttrCache.Put(args.file,result.attributes);
} else {
Util.Debug("Write","Error writing to file "+path+":"+result.status);
break;
}
}
} catch (Exception ex) {
Util.Warn("Write",path,ex);
}
return (bytes_written); // # of bytes written
}
/**
* Write data to the file using multiple threads and taking advantage of the pool of nfs clients
* << MULTITHREADED VERSION >>
*
* @param path the fq filename
* @param offset offset into the file
* @param count how many bytes to write
* @param buffer the buffer of count bytes
* @return how many bytes were successfully written
*/
public int Write(String path, int offset, int count, byte[] buffer) {
int ret = 0;
int bytes_written = 0;
int bytes_to_write = count;
try {
// This holds my threads for writing
Vector threads = new Vector();
Util.TraceTime("NFSFileSystem.Write: Getting directory handle");
nfs_fh the_file = _GetDirectoryHandle(path);
Util.TraceTime("NFSFileSystem.Write: Done getting directory handle");
while (bytes_written < count) {
writeargs args = new writeargs();
args.file = the_file;
// get a different connection for every write to try to parallelize the writes
nfsClient myNFSClient = GetNFSClientFromPool();
args.offset = bytes_written+offset;
byte[] smaller_buffer;
// optimize for special case where the buffer doesn't need to be copied
if ((bytes_written == 0) && (bytes_to_write <= nfs.NFS_MAXDATA)) {
// no memory copy needed
args.data = buffer;
} else {
smaller_buffer = new byte[Math.min(bytes_to_write,nfs.NFS_MAXDATA)];
Util.TraceTime("NFSFileSystem.Write: Starting memory copy for "+smaller_buffer.length+" bytes");
// Get rid of this memory copy...
System.arraycopy(buffer,bytes_written,smaller_buffer,0,smaller_buffer.length);
Util.TraceTime("NFSFileSystem.Write: Done with memory copy");
args.data = smaller_buffer;
}
Util.TraceTime("NFSFileSystem.Write: Calling AsyncWrite...");
AsyncWrite(myNFSClient,args,path,threads);
//This next call is always synchronous...so we have to change this to be a threaded call
//attrstat result = myNFSClient.NFSPROC_WRITE_2(args);
Util.TraceTime("NFSFileSystem.Write: Done calling AsyncWrite");
bytes_written+= args.data.length;
bytes_to_write-= args.data.length;
}
// Make sure all writes complete...
Util.TraceTime("NFSFileSystem.Write: Starting call to SyncAllWrites");
SynchronizeAllWrites(threads);
Util.TraceTime("NFSFileSystem.Write: Done with call to SyncAllWrites");
} catch (Exception ex) {
Util.Warn("Write",path,ex);
}
return (bytes_written); // # of bytes written
}
protected void AsyncWrite(nfsClient client, writeargs args, String path, Vector threads) {
// spawn a thread to do the work...
WriterThread wt = new WriterThread(client, args, path);
threads.add(wt);
wt.start();
}
/**
* Gather all threads and return the total # of bytes written
*
* @return the total # of bytes written
*/
protected void SynchronizeAllWrites(Vector threads) {
for (int i=0;i<threads.size();i++) {
WriterThread t = (WriterThread)threads.get(i);
// wait for this thread to die
try {
t.join();
} catch (InterruptedException ex) {
// what to do here?
Driver.logger.warn("Thread "+t+" was interrupted while trying to write bytes to file "+t.myPath);
}
}
threads.clear();
}
protected void SynchronizeAllReads(Vector threads) {
for (int i=0;i<threads.size();i++) {
ReaderThread t = (ReaderThread)threads.get(i);
// wait for this thread to die
try {
t.join();
} catch (InterruptedException ex) {
// what to do here?
Driver.logger.warn("Reader Thread "+t+" was interrupted while trying to read bytes from file "+t.myPath);
}
}
threads.clear();
}
class WriterThread extends java.lang.Thread {
public nfsClient myNFSClient;
public writeargs myArgs;
public String myPath;
public WriterThread(nfsClient client, writeargs args, String path) {
myNFSClient = client;
myArgs = args;
myPath = path;
}
public void run() {
try {
Util.TraceTime("WriterThread: Calling NFSPROC_WRITE_2");
attrstat result = myNFSClient.NFSPROC_WRITE_2(myArgs);
Util.TraceTime("WriterThread: Done calling NFSPROC_WRITE_2");
if (result.status != nfsstat.NFS_OK) {
// should we retry???
Util.Warn("Write","Error writing to file "+myPath+":"+result.status);
} else {
myAttrCache.Put(myArgs.file,result.attributes);
}
} catch (Exception ex) {
Util.Warn("NFSFileSystem.WriterThread.run",myPath,ex);
}
// now the thread should die...
}
}
////////////////////////////////////////////////////////////////////////////////////
// HELPER METHODS
////////////////////////////////////////////////////////////////////////////////////
/**
* Describe <code>ListDirectoryEntries</code> method here.
*
* @param dir a <code>fhandle</code> value
*/
protected String[] _GetDirectoryEntries(nfs_fh dir) {
String[] ret = null;
readdirargs args = new readdirargs();
args.dir = dir;
args.count = NFS_MAXDATA; // get up to this many bytes
args.cookie = new nfscookie();
args.cookie.value = new byte[] {0,0,0,0}; // this means give me the first directory entry
Vector entries = new Vector();
try {
nfsClient myNFSClient = GetNFSClientFromPool();
readdirres result = null;
do {
result = myNFSClient.NFSPROC_READDIR_2(args);
if (result.status == nfsstat.NFS_OK) {
args.cookie = _AddDirEntries(result.reply.entries,entries);
}
} while (result!=null && (result.status == nfsstat.NFS_OK) && (!result.reply.eof));
ret = new String[entries.size()];
entries.copyInto(ret);
} catch (Exception ex) {
Util.Warn("_GetDirectoryEntries",Util.ConvertFileHandleToString(dir),ex);
}
return (ret);
}
// Add entries to the vector -- just strings
private nfscookie _AddDirEntries(entry entries, Vector vec) {
nfscookie ret = null;
entry current = entries;
while (current!=null) {
vec.add(current.name.value);
ret = current.cookie;
current = current.nextentry;
}
return (ret);
}
public void Command(String s) {
if (s.equals("trace")) {
Util.PrintTrace();
}
}
} // end class