Click here to Skip to main content
15,892,697 members
Articles / Programming Languages / C++

A Namespace Extension Toolkit

Rate me:
Please Sign up or sign in to vote.
4.78/5 (10 votes)
21 Mar 2006CPOL12 min read 84.9K   1.7K   42  
This article shows you how to build your own Windows Explorer interfaces to custom data.
/*
  Galaxy Filesystem Toolkit
  Copyright (C) 2005 Chad Yoshikawa
  http://www.ececs.uc.edu/~yoshikco
  yoshikco@ececs.uc.edu

  This program is free software; you can redistribute it and/or
  modify it under the terms of the GNU General Public License
  as published by the Free Software Foundation; either version 2
  of the License, or (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
*/
package galaxy.nfs;

import galaxy.java.*;
import galaxy.nfs.mount.*;
import galaxy.nfs.v2.*;

import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;

import java.util.Properties;
import java.util.Date;
import java.util.Vector;
import java.io.IOException;
import java.net.*;

import org.acplt.oncrpc.*;


/**
 * A filesystem extension which talks to a NFS v2 server
 *
 * @author <a href="mailto:yoshikco@ececs.uc.edu">Chad Yoshikawa</a>
 * @version 1.0
 */
public class NFSFileSystem implements galaxy.java.FileSystem, mount,nfs  {
    private String myNFSMountPointName;
    private boolean myCachingFlag;
    private String myNFSServerName;
    private mountClient myMountClient;
    private nfs_fh myRootFileHandle;
    private int myNFSUID, myNFSGID;
    // a pool of client connections to the NFS server
    // we do this because each client is completely synchronized w.r.t. each NFS call
    // so we can get no parallelism without this.
    private int MAX_NFS_CLIENTS=4;
    private nfsClient[] myNFSClientPool;
    // The index into the pool...very simple stuff.
    private int myCurrentNFSClient=0;

    private FileHandleCache myCache;
    private AttrCache myAttrCache;

    /**
     * Create a new filesystem that connects to an NFS filesystem
     *
     */
    public NFSFileSystem() {
    }


    /**
     * Do cleanup work such as unmounting the filesystem
     *
     */
    public void Destroy() {
        // now cleanup and unmount the directory
        Util.Debug("Destroy","Shutting down the NFS server...");
        try {
            myMountClient.MOUNTPROC_UMNT_1(new dirpath(myNFSMountPointName));
            // Cleanup resources
            for (int i=0;i<MAX_NFS_CLIENTS;i++) {
                myNFSClientPool[i].close();
                myNFSClientPool[i] = null;
            }
            myMountClient.close();
            myMountClient = null;
        } catch (Exception ex) {
            ;;
        }
    }

    /**
     * Initialize the filesystem.  Mounts the remote filesystem and gets the
     * root filehandle.  Currently, we only use the uid and gid to set ownership
     * on newly created files, not for authenticating.  This should be easy to
     * fix, however, using the setAuth method on the UDP RPC client.
     *
     * Therefore, the remote filesystem is assumed to be exported using
     * anonuid,anonguid and the insecure options, e.g.:
     * <pre>/dir1/exported_directory client_hostname(rw,insecure,all_squash,anonuid=501,anongid=501)
     beowulf.linc.uc.edu(rw,insecure,all_squash,anonuid=501,anongid=501)
     ececs-lincy.rhod.uc.edu(rw,insecure,all_squash,anonuid=501,anongid=501)</pre>
     *
     * @param props a <code>Properties</code> value which contains the following mappings:
     * <ul>
     * <li> mount_point The mount point on the server
     * <li> server The server's DNS name
     * <li> uid The uid of the NFS user.  
     * <li> gid The gid of the NFS user.
     * <li> caching set to "true" if the attribute and filehandle caches should be used (the default)
     * </ul>
     */
    public void Init(Properties props) {
        myNFSMountPointName = (String)props.getProperty("mount_point");
        myNFSServerName = (String)props.getProperty("server");
        myNFSUID  = Integer.parseInt(props.getProperty("uid"));
        myNFSGID  = Integer.parseInt(props.getProperty("gid"));
        myCachingFlag = Boolean.valueOf(props.getProperty("caching","true")).booleanValue(); // must be either true or anything else=false

        if (myCachingFlag) {
            myCache = new FileHandleCache();
            myAttrCache = new AttrCache();
        } else {
            myCache = new NullFileHandleCache();
            myAttrCache = new NullAttrCache();
        }

        try {
            MAX_NFS_CLIENTS  = Integer.parseInt(props.getProperty("n"));
        } catch (Exception ex) {
            MAX_NFS_CLIENTS  = 4; // default
        }

        try {
            // Speak tcp to mountd...
            OncRpcClient tcp_mount_client =
                new OncRpcTcpClient(InetAddress.getByName(myNFSServerName),
                    MOUNTPROG, MOUNTVERS,  0/*port - 0 means use portmapper*/);
            // Speak udp to nfsd?  Might try the same with tcp...
            myMountClient = new mountClient(tcp_mount_client);
            InitializeNFSClientPool();
            dirpath path = new dirpath(myNFSMountPointName);
            fhstatus status = myMountClient.MOUNTPROC_MNT_1(path);
            if (status.fhs_status==0) {
                Util.Debug("Init","Success -- got the filehandle for "+path.value);
                myRootFileHandle = Util.ConvertMountHandleToNfsHandle(status.fhs_fhandle);
            } else {
                Util.Warn("Init","Failed -- couldn't get the filehandle for "+path.value+":"+status.fhs_status);
            }
        } catch (Exception ex) {
            Util.Warn("Init",myNFSServerName+":"+myNFSMountPointName,ex);
        }
    }

    private void InitializeNFSClientPool() {
        myNFSClientPool = new nfsClient[MAX_NFS_CLIENTS];

        try {
            for (int i=0;i<MAX_NFS_CLIENTS;i++) {
                // OncRpcClient udp_nfs_client =
                //                 new OncRpcUdpClient(InetAddress.getByName(myNFSServerName),
                //                     NFS_PROGRAM, NFS_VERSION,  0/*port - 0 means use portmapper*/,
                //                     32768); // use a larger (than default 8k) client buffer size
                OncRpcUdpClient udp_nfs_client =
                    new OncRpcUdpClient(InetAddress.getByName(myNFSServerName),
                        NFS_PROGRAM, NFS_VERSION,  0/*port - 0 means use portmapper*/,
                        32768); // use a larger (than default 8k) client buffer size
                // udp_nfs_client.setAuth(auth);
                // make the re-xmission mode exponential backoff
                udp_nfs_client.setRetransmissionMode(OncRpcUdpRetransmissionMode.EXPONENTIAL);
                udp_nfs_client.setRetransmissionTimeout(800); /* .8 seconds like the SFU */
                udp_nfs_client.setTimeout(30000); /* 30 seconds total timeout */
                myNFSClientPool[i] = new nfsClient(udp_nfs_client);
            }
        } catch (Exception ex) {
            Driver.logger.fatal("Could not create nfs clients:"+ex);
        }
        myCurrentNFSClient = 0;
    }


    /**
     * Loop through the pool, returning a different client for every request
     * Right now, don't bother with knowing which client is currently busy or not...
     *
     * @return a <code>nfsClient</code> value
     */
    private synchronized nfsClient GetNFSClientFromPool() {
        myCurrentNFSClient = (myCurrentNFSClient+1)%MAX_NFS_CLIENTS;
        return myNFSClientPool[myCurrentNFSClient];
    }


    public galaxy.java.FileInfo[] ListFiles(String original_path) {
        galaxy.java.FileInfo[] ret = null;
        try {
            // We need to traverse down the path, going down to the last
            // directory, then list that directory...
            nfs_fh dir_handle = _GetDirectoryHandle(original_path);
            String[] list = _GetDirectoryEntries(dir_handle);
            list = Util.RemoveSpecialFiles(list);
            ret = new galaxy.java.FileInfo[list.length];
            for (int i=0;i<list.length;i++) {
                String filename = list[i];
                try {
                    String fq_filename = Util.CatPath(original_path,filename);
                    ret[i] = _Stat(dir_handle,filename,fq_filename);
                } catch (Exception ex) {
                    Util.Warn("ListFiles",original_path,ex);
                }
            }
        } catch (Exception ex) {
            Util.Warn("ListFiles",original_path,ex);
        }
        return (ret);
    } // end ListFiles



    /**
     * Get the nfs filehandle of a named directory
     *
     * @param path an absolute path that starts with a slash and ends with a slash
     * @return a <code>nfs_fh</code> value
     */
    protected nfs_fh _GetDirectoryHandle(String path) {
        nfs_fh current = null;
        // First remove any leading or trailling slashes
        try {
            path = Util.RemoveSlashFromStartOfPath(path);
            path = Util.RemoveSlashFromEndOfPath(path);
            if (path.equals("")) {
                return (myRootFileHandle);
            } else {
                // Break up the path into pieces.
                String[] parts = path.split(GALAXY_PATH_SEPARATOR);
                // Now for each part, do a lookup, starting with the root
                current = myRootFileHandle;
                for (int i=0;i<parts.length;i++) {
                    current = _LookupFileHandle(current,parts[i],true);
                }
            } // end else
        } catch (Exception ex) {
            Util.Warn("_GetDirectoryHandle",path,ex);
        }
        return (current);
    }

    protected nfs_fh _LookupFileHandle(nfs_fh dir, String filename, boolean cached) {
        nfs_fh ret = null;

        // check the cache first
        if (cached && ((ret = myCache.Lookup(dir,filename))!=null)) {
            return (ret);
        }

        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            diropargs args = new diropargs();
            args.dir = dir;
            args.name = new filename(filename);
            diropres result = myNFSClient.NFSPROC_LOOKUP_2(args);
            if (result.status == nfsstat.NFS_OK) {
                diropokres okresult = result.diropres;
                ret = okresult.file;
                // now put this into the cache...
                myCache.Put(dir,filename,ret);
                myAttrCache.Put(ret,okresult.attributes);
            } else {
                Util.Warn("Error in lookup:"+result.status,filename);
            }
        } catch (Exception ex) {
            Util.Warn("_LookupFileHandle",filename,ex);
        }

        return (ret);
    }

    public FileInfo Stat(String original_path) {
        nfs_fh dir_handle = _GetDirectoryHandle(Util.GetPath(original_path));
        FileInfo ret = new FileInfo();
        try {
            ret = _Stat(dir_handle,Util.GetFilename(original_path),original_path);
        } catch (Exception ex) {
            Util.Warn("Stat",original_path,ex);
        }
        return (ret);
    }


    /**
     * Stat a file within a directory and return some information
     *
     * @param dir The directory the file is contained in
     * @param filename The filename of the file or "" if the caller wants to stat the dir handle
     * @param path The original path the file is in (logicall). This is the absolute path of the file
     * without the filename, using GALAXY_PATH_SEPARATORS between path components.
     * @return a <code>FileInfo</code> value representing the file
     */
    private FileInfo _Stat(nfs_fh dir, String filename, String path) {
        FileInfo ret = new FileInfo();
        try {
            fattr attr=null;
            nfsClient myNFSClient = GetNFSClientFromPool();

            // first lookup the handle
            if (!filename.equals("")) {
                dir = _LookupFileHandle(dir,filename,true);
            }

            if ((attr = myAttrCache.Lookup(dir))==null) {
                attrstat result = myNFSClient.NFSPROC_GETATTR_2(dir);
                if (result.status == nfsstat.NFS_OK) {
                    attr = result.attributes;
                    myAttrCache.Put(dir,attr);
                } else {
                    Util.Debug("_Stat","Error - got "+result.status+" when performing getattributes on file "+path);
                }
            }
            ret.CreateTime = Util.ConvertTime(attr.ctime);
            ret.LastAccessTime = Util.ConvertTime(attr.atime);
            ret.LastModifiedTime = Util.ConvertTime(attr.mtime);
            ret.FileName = filename;
            ret.FilePath = path;
            ret.FolderFlag = (attr.type == ftype.NFDIR);
            if (ret.FolderFlag) {
                Util.Debug("_Stat","File with filename="+filename+" and path="+path+" is a folder");
            }
            if (!ret.FolderFlag) {
                ret.Size = (long)attr.size;
            } else {
                // folders don't have file sizes
                ret.Size = 0;
            }
        } catch (Exception ex) {
            Util.Warn("_Stat",path,ex);
        }
        return (ret);
    }

    public boolean CreateDirectory(String path) {
        boolean ret = false;
        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            // just the root path, not the filename
            nfs_fh dir_handle = _GetDirectoryHandle(Util.GetPath(path));
            createargs args = new createargs();
            args.where = new diropargs();
            args.where.dir = dir_handle;
            args.where.name = new filename(Util.GetFilename(path));
            args.attributes = new sattr();
            args.attributes.mtime = args.attributes.atime = Util.Now();
            args.attributes.uid = myNFSUID;
            args.attributes.gid = myNFSGID; // <--- what are we going to set these to be?
            args.attributes.size = 0;
            args.attributes.mode = NFSMODE_DIR | 0755; // <--- probably should make this a default mode var
            diropres result = myNFSClient.NFSPROC_MKDIR_2(args);

            ret = (result.status == nfsstat.NFS_OK);
            if (!ret) {
                Util.Debug("CreateDirectory","Result status is "+result.status);
            } else {
                myCache.Put(dir_handle,args.where.name.value,result.diropres.file);
                myAttrCache.Put(result.diropres.file,result.diropres.attributes);
            }
        } catch (Exception ex) {
            Util.Warn("CreateDirectory",path,ex);
        }
        return ret;
    }


    // Copy file directly to NFS from Windows
    public boolean CopyFile(String windows_src, String galaxy_dst) {
        boolean ret = false;
        try {
            Util.Warn("CopyFile","Not Implemented Yet");
        } catch (Exception ex) {
        }
        return (ret);
    }

    public boolean CreateFile(String path) {
        boolean ret = false;
        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            // just the root path, not the filename
            nfs_fh dir_handle = _GetDirectoryHandle(Util.GetPath(path));
            createargs args = new createargs();
            args.where = new diropargs();
            args.where.dir = dir_handle;
            args.where.name = new filename(Util.GetFilename(path));
            args.attributes = new sattr();
            args.attributes.mtime = args.attributes.atime = Util.Now();
            args.attributes.uid = myNFSUID;
            args.attributes.gid = myNFSGID; // <--- what are we going to set these to be?
            args.attributes.size = 0;
            args.attributes.mode = NFSMODE_REG | 0644; // <--- probably should make this a default mode var
            diropres result = myNFSClient.NFSPROC_CREATE_2(args);

            ret = (result.status == nfsstat.NFS_OK);
            if (!ret) {
                Util.Debug("CreateFile","Result status is "+result.status);
            } else {
                myCache.Put(dir_handle,args.where.name.value,result.diropres.file);
                myAttrCache.Put(result.diropres.file,result.diropres.attributes);
            }
        } catch (Exception ex) {
            Util.Warn("CreateFile",path,ex);
        }
        return ret;
    }


    public boolean Delete(String path) {
        boolean ret = false;
        try {
            FileInfo info = Stat(path);
            if (info.FolderFlag) {
                ret = _DeleteFolder(path,true);
                // Util.Debug("Delete","Recursive delete not currently supported: "+path+" not deleted");
            } else {
                ret = _DeleteFile(path);
            }
        } catch (Exception ex) {
            Util.Warn("Delete",path,ex);
        }
        return ret;
    }

    public boolean _DeleteFolder(String path, boolean recursive) {
        boolean ret = false;

        if (!recursive) {
            ret = _DeleteEmptyFolder(path);
        } else {
            // recurse down thru the directory hierarchy
            // and remove on the way up so that we are sure to delete empty folders
            ret = _DeleteRecursively(path);
        }

        return (ret);
    }


    protected boolean _DeleteRecursively(String path) {
        boolean ret = true;
        FileInfo [] infos = ListFiles(path);
        for (int i=0;i<infos.length;i++) {
            if (infos[i].FolderFlag) {
                ret = ret &&  _DeleteRecursively(infos[i].FilePath);
            } else {
                ret = ret && _DeleteFile(infos[i].FilePath);
            }
        }
        // Now I am empty
        ret =  ret && _DeleteEmptyFolder(path);

        // ret is the boolean and of all the return values.
        // rather than stop at the first error, we give a best effort when deleting...
        return (ret);
    }

    protected boolean _DeleteEmptyFolder(String path) {
        boolean ret = false;
        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            diropargs args = new diropargs();
            args.dir = _GetDirectoryHandle(Util.GetPath(path));
            args.name = new filename(Util.GetFilename(path));
            int ret_val = myNFSClient.NFSPROC_RMDIR_2(args);
            if (ret_val == nfsstat.NFS_OK) {
                ret = true;
                myCache.Invalidate(args.dir,args.name.value);
                myAttrCache.Invalidate(args.dir);
            } else {
                Util.Debug("_DeleteEmptyFolder","Could not remove folder "+path+" due to error "+ret_val);
            }
        } catch (Exception ex) {
            Util.Warn("_DeleteEmptyFolder",path,ex);
        }
        return (ret);
    }

    protected boolean _DeleteFile(String path) {
        boolean ret = false;
        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            diropargs args = new diropargs();
            args.dir = _GetDirectoryHandle(Util.GetPath(path));
            args.name = new filename(Util.GetFilename(path));
            int ret_val = myNFSClient.NFSPROC_REMOVE_2(args);
            myCache.Invalidate(args.dir,args.name.value);
            myAttrCache.Invalidate(args.dir);
            if (ret_val == nfsstat.NFS_OK) {
                ret = true;
            } else {
                Util.Debug("_DeleteFile","Could not remove file "+path+" due to error "+ret_val);
            }
        } catch (Exception ex) {
            Util.Warn("_Delete",path,ex);
        }
        return (ret);
    }



    /**
     * Read up to NFS_MAXDATA bytes and return them
     *
     * @deprecated Replaced by {@link galaxy.nfs.NFSFileSystem#Read} instead.
     * @param path the fully-qualified filename
     * @param offset offset into the file
     * @param count how many bytes to read
     * @return an array of bytes less than or equal to count
     */
    public byte[] OldRead(String path, int offset, int count) {
        byte[] ret = null;
        nfsClient myNFSClient = GetNFSClientFromPool();
        try {
            readargs args = new readargs();
            args.offset = offset;
            args.count = Math.min(count,nfs.NFS_MAXDATA);
            args.file = _GetDirectoryHandle(path);
            readres result = myNFSClient.NFSPROC_READ_2(args);
            if (result.status == nfsstat.NFS_OK) {
                readokres reply = result.reply;
                // ignore the returned attributes...
                ret = reply.data;
            } else {
                Util.Debug("Read","Result from reading file="+path+" at offset="+offset+" for count="+count+" bytes:"+result.status);
            }
        } catch (Exception ex) {
            Util.Warn("Read",path,ex);
        }
        return (ret);
    }


    /// New multithreaded read...
    public byte[] Read(String path, int offset, int count) {
        // if we're being asked for less than the max, don't bother using a thread to read the data
        if (count < nfs.NFS_MAXDATA) {
            return OldRead(path,offset,count);
        }
        byte[] ret = null;
        int bytes_read = 0;
        int bytes_to_read = count;
        try {
            // This holds my threads for writing
            Vector threads = new Vector();
            // vectors are synchronized, see http://java.sun.com/j2se/1.4.2/docs/api/java/util/Vector.html
            // This holds the set of byte[] that threads will return to us.
            byte[][] bufs = new byte[(int)(Math.ceil((double)count/(double)nfs.NFS_MAXDATA))][];
            // Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Getting directory handle");
            nfs_fh the_file = _GetDirectoryHandle(path);
            // Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Done getting directory handle");
            int thread_number=0;
            while (bytes_read < count) {
                readargs args = new readargs();
                args.file = the_file;
                // get a different connection for every write to try to parallelize the writes
                nfsClient myNFSClient = GetNFSClientFromPool();
                args.offset = bytes_read+offset;
                args.count = Math.min(bytes_to_read,nfs.NFS_MAXDATA);
                // Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Calling AsyncRead...");
                AsyncRead(myNFSClient,args,path,threads,bufs,thread_number++);
                // Driver.logger.info(Driver.timer.getRawTime()+" NFSFileSystem.Read: Done calling AsyncRead");

                bytes_read+= args.count;
                bytes_to_read-= args.count;
            }
            // Make sure all reads complete...
            SynchronizeAllReads(threads);
            ret = CombineAllReads(bufs);
        } catch (Exception ex) {
            Util.Warn("Write",path,ex);
        }
        return (ret);
    }


    // Can we save a memory copy here?
    protected byte[] CombineAllReads(byte[][] bufs) {
        byte[] ret =null;
        int total_size = 0;
        for (int i=0;i<bufs.length;i++) {
            byte[] buffer = bufs[i];
            if (buffer!=null) {
                total_size += buffer.length;
            }
        }
        ret = new byte[total_size];
        int current_pos = 0;
        for (int i=0;i<bufs.length;i++) {
            byte[] buffer = bufs[i];
            if (buffer!=null) {
                System.arraycopy(buffer,0,ret,current_pos,buffer.length);
                current_pos+=buffer.length;
            }
        }
        return (ret);
    }

    protected void AsyncRead(nfsClient client, readargs args, String path, Vector threads, byte[][] bufs, int my_id) {
        // spawn a thread to do the work...
        ReaderThread rt = new ReaderThread(client, args, path,bufs,my_id);
        threads.add(rt);
        rt.start();
    }

    class ReaderThread extends Thread {
        public nfsClient myNFSClient;
        public readargs myArgs;
        public String myPath;
        public int myID;
        public byte[][] myBufs;

        public ReaderThread(nfsClient client, readargs args, String path, byte[][] bufs, int id) {
            myNFSClient = client;
            myArgs = args;
            myPath = path;
            myBufs = bufs;
            myID = id;
        }

        public void run() {
            try {
                readres result = myNFSClient.NFSPROC_READ_2(myArgs);
                if (result.status == nfsstat.NFS_OK) {
                    myAttrCache.Put(myArgs.file,result.reply.attributes);
                    readokres reply = result.reply;
                    // set the buffer...
                    myBufs[myID] = reply.data;
                } else {
                    Util.Warn("ReaderThread.run","Result from reading file="+myPath+" at offset="+myArgs.offset+" for count="+myArgs.count+" bytes:"+result.status);
                }
            } catch (Exception ex) {
                Util.Warn("ReaderThread.run",myPath,ex);
            }
        }
    } // end inner class


    /**
     * Write data to the file, fragmenting into NFS_MAXDATA chunks.
     * << NO THREADING>>
     * @deprecated Use {@link galaxy.nfs.NFSFileSystem#Write} instead.
     *
     * @param path the fq filename
     * @param offset offset into the file
     * @param count how many bytes to write
     * @param buffer the buffer of count bytes
     * @return how many bytes were successfully written
     */
    public int OldWrite(String path, int offset, int count, byte[] buffer) {
        int ret = 0;
        int bytes_written = 0;
        int bytes_to_write = count;
        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            writeargs args = new writeargs();
            Util.TraceTime("NFSFileSystem.Write: Getting directory handle");
            args.file = _GetDirectoryHandle(path);
            Util.TraceTime("NFSFileSystem.Write: Done getting directory handle");
            while (bytes_written < count) {
                args.offset = bytes_written+offset;
                byte[] smaller_buffer;
                // optimize for special case where the buffer doesn't need to be copied
                if ((bytes_written == 0) && (bytes_to_write <= nfs.NFS_MAXDATA)) {
                    // no memory copy needed
                    args.data = buffer;
                } else {
                    smaller_buffer = new byte[Math.min(bytes_to_write,nfs.NFS_MAXDATA)];
                    Util.TraceTime("NFSFileSystem.Write: Starting memory copy for "+smaller_buffer.length+" bytes");
                    // Get rid of this memory copy...
                    System.arraycopy(buffer,bytes_written,smaller_buffer,0,smaller_buffer.length);
                    Util.TraceTime("NFSFileSystem.Write: Done with memory copy");
                    args.data = smaller_buffer;
                }
                Util.TraceTime("WriterThread: Calling NFSPROC_WRITE_2");
                attrstat result = myNFSClient.NFSPROC_WRITE_2(args);
                Util.TraceTime("WriterThread: Done calling NFSPROC_WRITE_2");

                if (result.status == nfsstat.NFS_OK) {
                    bytes_written+= args.data.length;
                    bytes_to_write-= args.data.length;
                    myAttrCache.Put(args.file,result.attributes);
                } else {
                    Util.Debug("Write","Error writing to file "+path+":"+result.status);
                    break;
                }
            }
        } catch (Exception ex) {
            Util.Warn("Write",path,ex);
        }
        return (bytes_written); // # of bytes written
    }



    /**
     * Write data to the file using multiple threads and taking advantage of the pool of nfs clients
     * << MULTITHREADED VERSION >>
     *
     * @param path the fq filename
     * @param offset offset into the file
     * @param count how many bytes to write
     * @param buffer the buffer of count bytes
     * @return how many bytes were successfully written
     */
    public int Write(String path, int offset, int count, byte[] buffer) {
        int ret = 0;
        int bytes_written = 0;
        int bytes_to_write = count;
        try {
            // This holds my threads for writing
            Vector threads = new Vector();
            Util.TraceTime("NFSFileSystem.Write: Getting directory handle");
            nfs_fh the_file = _GetDirectoryHandle(path);
            Util.TraceTime("NFSFileSystem.Write: Done getting directory handle");
            while (bytes_written < count) {
                writeargs args = new writeargs();
                args.file = the_file;
                // get a different connection for every write to try to parallelize the writes
                nfsClient myNFSClient = GetNFSClientFromPool();
                args.offset = bytes_written+offset;
                byte[] smaller_buffer;
                // optimize for special case where the buffer doesn't need to be copied
                if ((bytes_written == 0) && (bytes_to_write <= nfs.NFS_MAXDATA)) {
                    // no memory copy needed
                    args.data = buffer;
                } else {
                    smaller_buffer = new byte[Math.min(bytes_to_write,nfs.NFS_MAXDATA)];
                    Util.TraceTime("NFSFileSystem.Write: Starting memory copy for "+smaller_buffer.length+" bytes");
                    // Get rid of this memory copy...
                    System.arraycopy(buffer,bytes_written,smaller_buffer,0,smaller_buffer.length);
                    Util.TraceTime("NFSFileSystem.Write: Done with memory copy");
                    args.data = smaller_buffer;
                }
                Util.TraceTime("NFSFileSystem.Write: Calling AsyncWrite...");
                AsyncWrite(myNFSClient,args,path,threads);
                //This next call is always synchronous...so we have to change this to be a threaded call
                //attrstat result = myNFSClient.NFSPROC_WRITE_2(args);
                Util.TraceTime("NFSFileSystem.Write: Done calling AsyncWrite");

                bytes_written+= args.data.length;
                bytes_to_write-= args.data.length;
            }
            // Make sure all writes complete...
            Util.TraceTime("NFSFileSystem.Write: Starting call to SyncAllWrites");
            SynchronizeAllWrites(threads);
            Util.TraceTime("NFSFileSystem.Write: Done with call to SyncAllWrites");
        } catch (Exception ex) {
            Util.Warn("Write",path,ex);
        }
        return (bytes_written); // # of bytes written
    }


    protected void AsyncWrite(nfsClient client, writeargs args, String path, Vector threads) {
        // spawn a thread to do the work...
        WriterThread wt = new WriterThread(client, args, path);
        threads.add(wt);
        wt.start();
    }

    /**
     * Gather all threads and return the total # of bytes written
     *
     * @return the total # of bytes written
     */
    protected void SynchronizeAllWrites(Vector threads) {
        for (int i=0;i<threads.size();i++) {
            WriterThread t = (WriterThread)threads.get(i);
            // wait for this thread to die
            try {
                t.join();
            } catch (InterruptedException ex) {
                // what to do here?
                Driver.logger.warn("Thread "+t+" was interrupted while trying to write bytes to file "+t.myPath);
            }
        }
        threads.clear();
    }



    protected void SynchronizeAllReads(Vector threads) {
        for (int i=0;i<threads.size();i++) {
            ReaderThread t = (ReaderThread)threads.get(i);
            // wait for this thread to die
            try {
                t.join();
            } catch (InterruptedException ex) {
                // what to do here?
                Driver.logger.warn("Reader Thread "+t+" was interrupted while trying to read bytes from file "+t.myPath);
            }
        }
        threads.clear();
    }




class WriterThread extends java.lang.Thread {
    public nfsClient myNFSClient;
    public writeargs myArgs;
    public String myPath;

    public WriterThread(nfsClient client, writeargs args, String path) {
        myNFSClient = client;
        myArgs = args;
        myPath = path;
    }

    public void run() {
        try {
            Util.TraceTime("WriterThread: Calling NFSPROC_WRITE_2");
            attrstat result = myNFSClient.NFSPROC_WRITE_2(myArgs);
            Util.TraceTime("WriterThread: Done calling NFSPROC_WRITE_2");
            if (result.status != nfsstat.NFS_OK) {
                // should we retry???
                Util.Warn("Write","Error writing to file "+myPath+":"+result.status);
            } else {
                myAttrCache.Put(myArgs.file,result.attributes);
            }
        } catch (Exception ex) {
            Util.Warn("NFSFileSystem.WriterThread.run",myPath,ex);
        }
        // now the thread should die...
    }
}



    ////////////////////////////////////////////////////////////////////////////////////
    // HELPER METHODS
    ////////////////////////////////////////////////////////////////////////////////////
    /**
     * Describe <code>ListDirectoryEntries</code> method here.
     *
     * @param dir a <code>fhandle</code> value
     */

    protected String[]  _GetDirectoryEntries(nfs_fh dir) {
        String[] ret = null;
        readdirargs args = new readdirargs();
        args.dir = dir;
        args.count = NFS_MAXDATA; // get up to this many bytes
        args.cookie = new nfscookie();
        args.cookie.value = new byte[] {0,0,0,0}; // this means give me the first directory entry
        Vector entries = new Vector();

        try {
            nfsClient myNFSClient = GetNFSClientFromPool();
            readdirres result = null;
            do {
                result  = myNFSClient.NFSPROC_READDIR_2(args);
                if (result.status == nfsstat.NFS_OK) {
                    args.cookie = _AddDirEntries(result.reply.entries,entries);
                }
            } while (result!=null && (result.status == nfsstat.NFS_OK) && (!result.reply.eof));
            ret = new String[entries.size()];
            entries.copyInto(ret);
        } catch (Exception ex) {
            Util.Warn("_GetDirectoryEntries",Util.ConvertFileHandleToString(dir),ex);
        }
        return (ret);
    }



    // Add entries to the vector -- just strings
    private nfscookie _AddDirEntries(entry entries, Vector vec) {
        nfscookie ret = null;
        entry current =  entries;
        while (current!=null) {
            vec.add(current.name.value);
            ret = current.cookie;
            current = current.nextentry;
        }
        return (ret);
    }

    public void Command(String s) {
        if (s.equals("trace")) {
            Util.PrintTrace();
        }
    }


} // end class

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions