Browse Source

Added BeginRead/EndRead to ISftpSession and SftpSession.
Draft of read-ahead implementation for DownloadFile and BeginDownloadFile overloads of SftpClient.

Gert Driesen 8 years ago
parent
commit
fe27ec31e1

+ 7 - 1
src/Renci.SshNet.NET35/Renci.SshNet.NET35.csproj

@@ -861,6 +861,9 @@
     <Compile Include="..\Renci.SshNet\Sftp\SftpFileAttributes.cs">
       <Link>Sftp\SftpFileAttributes.cs</Link>
     </Compile>
+    <Compile Include="..\Renci.SshNet\Sftp\SftpFileReader.cs">
+      <Link>Sftp\SftpFileReader.cs</Link>
+    </Compile>
     <Compile Include="..\Renci.SshNet\Sftp\SftpFileStream.cs">
       <Link>Sftp\SftpFileStream.cs</Link>
     </Compile>
@@ -876,6 +879,9 @@
     <Compile Include="..\Renci.SshNet\Sftp\SftpMessageTypes.cs">
       <Link>Sftp\SftpMessageTypes.cs</Link>
     </Compile>
+    <Compile Include="..\Renci.SshNet\Sftp\SftpReadAsyncResult.cs">
+      <Link>Sftp\SftpReadAsyncResult.cs</Link>
+    </Compile>
     <Compile Include="..\Renci.SshNet\Sftp\SftpSession.cs">
       <Link>Sftp\SftpSession.cs</Link>
     </Compile>
@@ -921,7 +927,7 @@
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <ProjectExtensions>
     <VisualStudio>
-      <UserProperties ProjectLinkReference="2f5f8c90-0bd1-424f-997c-7bc6280919d1" ProjectLinkerExcludeFilter="\\?desktop(\\.*)?$;\\?silverlight(\\.*)?$;\.desktop;\.silverlight;\.xaml;^service references(\\.*)?$;\.clientconfig;^web references(\\.*)?$" />
+      <UserProperties ProjectLinkerExcludeFilter="\\?desktop(\\.*)?$;\\?silverlight(\\.*)?$;\.desktop;\.silverlight;\.xaml;^service references(\\.*)?$;\.clientconfig;^web references(\\.*)?$" ProjectLinkReference="2f5f8c90-0bd1-424f-997c-7bc6280919d1" />
     </VisualStudio>
   </ProjectExtensions>
   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 

+ 2 - 0
src/Renci.SshNet/Renci.SshNet.csproj

@@ -416,6 +416,7 @@
     <Compile Include="Sftp\SftpDownloadAsyncResult.cs" />
     <Compile Include="Sftp\SftpFile.cs" />
     <Compile Include="Sftp\SftpFileAttributes.cs" />
+    <Compile Include="Sftp\SftpFileReader.cs" />
     <Compile Include="Sftp\SftpFileStream.cs" />
     <Compile Include="Sftp\SftpFileSystemInformation.cs">
       <SubType>Code</SubType>
@@ -425,6 +426,7 @@
     <Compile Include="Sftp\SftpMessageTypes.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="Sftp\SftpReadAsyncResult.cs" />
     <Compile Include="Sftp\SftpSession.cs">
       <SubType>Code</SubType>
     </Compile>

+ 27 - 0
src/Renci.SshNet/Sftp/ISftpSession.cs

@@ -95,6 +95,33 @@ namespace Renci.SshNet.Sftp
         /// <returns>data array; null if EOF</returns>
         byte[] RequestRead(byte[] handle, ulong offset, uint length);
 
+        /// <summary>
+        /// Begins an asynchronous read using a SSH_FXP_READ request.
+        /// </summary>
+        /// <param name="handle">The handle to the file to read from.</param>
+        /// <param name="offset">The offset in the file to start reading from.</param>
+        /// <param name="length">The number of bytes to read.</param>
+        /// <param name="callback">The <see cref="AsyncCallback"/> delegate that is executed when <see cref="BeginRead(byte[], ulong, uint, AsyncCallback, object)"/> completes.</param>
+        /// <param name="state">An object that contains any additional user-defined data.</param>
+        /// <returns>
+        /// A <see cref="SftpReadAsyncResult"/> that represents the asynchronous call.
+        /// </returns>
+        SftpReadAsyncResult BeginRead(byte[] handle, ulong offset, uint length, AsyncCallback callback, object state);
+
+        /// <summary>
+        /// Handles the end of an asynchronous read.
+        /// </summary>
+        /// <param name="asyncResult">An <see cref="SftpReadAsyncResult"/> that represents an asynchronous call.</param>
+        /// <returns>
+        /// A <see cref="byte"/> array representing the data read.
+        /// </returns>
+        /// <remarks>
+        /// If all available data has been read, the <see cref="EndRead(SftpReadAsyncResult)"/> method completes
+        /// immediately and returns zero bytes.
+        /// </remarks>
+        /// <exception cref="ArgumentNullException"><paramref name="asyncResult"/> is <c>null</c>.</exception>
+        byte[] EndRead(SftpReadAsyncResult asyncResult);
+
         /// <summary>
         /// Performs SSH_FXP_READDIR request
         /// </summary>

+ 163 - 0
src/Renci.SshNet/Sftp/SftpFileReader.cs

@@ -0,0 +1,163 @@
+using Renci.SshNet.Abstractions;
+using Renci.SshNet.Common;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Renci.SshNet.Sftp
+{
+    internal class SftpFileReader
+    {
+        private const int MaxReadAhead = 15;
+
+        private readonly byte[] _handle;
+        private readonly ISftpSession _sftpSession;
+        private SemaphoreLight _semaphore;
+        private bool _isCompleted;
+        private uint _chunkLength;
+        private int _readAheadChunkIndex;
+        private int _nextChunkIndex;
+        private ulong _readAheadOffset;
+        private ulong _offset;
+        private ulong _fileSize;
+        private readonly IDictionary<int, BufferedRead> _queue;
+        private readonly object _readLock;
+
+        public SftpFileReader(byte[] handle, ISftpSession sftpSession)
+        {
+            _handle = handle;
+            _sftpSession = sftpSession;
+            _chunkLength = 32 * 1024; // TODO !
+            _semaphore = new SemaphoreLight(MaxReadAhead);
+            _queue = new Dictionary<int, BufferedRead>(MaxReadAhead);
+            _readLock = new object();
+
+            _fileSize = (ulong)_sftpSession.RequestFStat(_handle).Size;
+
+            ThreadAbstraction.ExecuteThread(() =>
+            {
+                while (!_isCompleted)
+                {
+                    // we reach one chunk beyond the file size to get an EOF
+                    if (_readAheadOffset > _fileSize)
+                        break;
+
+                    // TODO implement cancellation!?
+                    _semaphore.Wait();
+
+                    // start reading next chunk
+                    _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkLength, ReadCompleted,
+                                           new BufferedRead(_readAheadChunkIndex, _readAheadOffset));
+
+                    // advance read-ahead offset
+                    _readAheadOffset += _chunkLength;
+
+                    _readAheadChunkIndex++;
+                }
+            });
+        }
+
+        public byte[] Read()
+        {
+            lock (_readLock)
+            {
+                BufferedRead nextChunk;
+
+                while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && !_isCompleted)
+                    Monitor.Wait(_readLock);
+
+                if (_isCompleted)
+                    return new byte[0];
+
+                if (nextChunk.Offset == _offset)
+                {
+                    var data = nextChunk.Data;
+                    _offset += (ulong) data.Length;
+
+                    // remove processed chunk
+                    _queue.Remove(_nextChunkIndex);
+                    // move to next chunk
+                    _nextChunkIndex++;
+                    // allow read-ahead of a new chunk
+                    _semaphore.Release();
+                    return data;
+                }
+
+                // when the server returned less bytes than requested (for the previous chunk)
+                // we'll synchronously request the remaining data
+
+                var catchUp = new byte[nextChunk.Offset - _offset];
+                var bytesCaughtUp = 0L;
+
+                while (bytesCaughtUp < catchUp.Length)
+                {
+                    // TODO: break loop and interrupt blocking wait in case of exception
+                    var read = _sftpSession.RequestRead(_handle, _offset, (uint) catchUp.Length);
+                    if (read.Length == 0)
+                    {
+                        // break in loop in "read-ahead" thread (once a blocking wait is interrupted)
+                        _isCompleted = true;
+                        // interrupt blocking wait in "read-ahead" thread
+                        lock (_readLock)
+                            Monitor.PulseAll(_readLock);
+                        // signal failure
+                        throw new SshException("Unexpectedly reached end of file.");
+                    }
+
+                    bytesCaughtUp += read.Length;
+                    _offset += (ulong) bytesCaughtUp;
+                }
+
+                return catchUp;
+            }
+        }
+
+        private void ReadCompleted(IAsyncResult result)
+        {
+            var readAsyncResult = result as SftpReadAsyncResult;
+            if (readAsyncResult == null)
+                return;
+
+            var data = readAsyncResult.EndInvoke();
+            if (data.Length == 0)
+            {
+                Console.WriteLine("EOF");
+
+                _isCompleted = true;
+            }
+            else
+            {
+                var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
+                bufferedRead.Complete(data);
+                _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
+            }
+
+            // signal that a chunk has been read or EOF has been reached;
+            // in both cases, we want to unblock the "read-ahead" thread
+            lock (_readLock)
+            {
+                Monitor.Pulse(_readLock);
+            }
+        }
+
+        private class BufferedRead
+        {
+            public int ChunkIndex { get; private set; }
+
+            public byte[] Data { get; private set; }
+
+            public ulong Offset { get; private set; }
+
+            public BufferedRead(int chunkIndex, ulong offset)
+            {
+                ChunkIndex = chunkIndex;
+                Offset = offset;
+            }
+
+            public void Complete(byte[] data)
+            {
+                Data = data;
+            }
+        }
+    }
+}

+ 12 - 0
src/Renci.SshNet/Sftp/SftpReadAsyncResult.cs

@@ -0,0 +1,12 @@
+using Renci.SshNet.Common;
+using System;
+
+namespace Renci.SshNet.Sftp
+{
+    internal class SftpReadAsyncResult : AsyncResult<byte[]>
+    {
+        public SftpReadAsyncResult(AsyncCallback asyncCallback, object state) : base(asyncCallback, state)
+        {
+        }
+    }
+}

+ 63 - 1
src/Renci.SshNet/Sftp/SftpSession.cs

@@ -397,6 +397,65 @@ namespace Renci.SshNet.Sftp
             }
         }
 
+        /// <summary>
+        /// Begins an asynchronous read using a SSH_FXP_READ request.
+        /// </summary>
+        /// <param name="handle">The handle to the file to read from.</param>
+        /// <param name="offset">The offset in the file to start reading from.</param>
+        /// <param name="length">The number of bytes to read.</param>
+        /// <param name="callback">The <see cref="AsyncCallback"/> delegate that is executed when <see cref="BeginRead(byte[], ulong, uint, AsyncCallback, object)"/> completes.</param>
+        /// <param name="state">An object that contains any additional user-defined data.</param>
+        /// <returns>
+        /// A <see cref="SftpReadAsyncResult"/> that represents the asynchronous call.
+        /// </returns>
+        public SftpReadAsyncResult BeginRead(byte[] handle, ulong offset, uint length, AsyncCallback callback, object state)
+        {
+            var asyncResult = new SftpReadAsyncResult(callback, state);
+
+            var request = new SftpReadRequest(ProtocolVersion, NextRequestId, handle, offset, length,
+                response =>
+                {
+                    asyncResult.SetAsCompleted(response.Data, false);
+                },
+                response =>
+                {
+                    if (response.StatusCode != StatusCodes.Eof)
+                    {
+                        asyncResult.SetAsCompleted(GetSftpException(response), false);
+                    }
+                    else
+                    {
+                        asyncResult.SetAsCompleted(Array<byte>.Empty, false);
+                    }
+                });
+            SendRequest(request);
+
+            return asyncResult;
+        }
+
+        /// <summary>
+        /// Handles the end of an asynchronous read.
+        /// </summary>
+        /// <param name="asyncResult">An <see cref="SftpReadAsyncResult"/> that represents an asynchronous call.</param>
+        /// <returns>
+        /// A <see cref="byte"/> array representing the data read.
+        /// </returns>
+        /// <remarks>
+        /// If all available data has been read, the <see cref="EndRead(SftpReadAsyncResult)"/> method completes
+        /// immediately and returns zero bytes.
+        /// </remarks>
+        /// <exception cref="ArgumentNullException"><paramref name="asyncResult"/> is <c>null</c>.</exception>
+        public byte[] EndRead(SftpReadAsyncResult asyncResult)
+        {
+            if (asyncResult == null)
+                throw new ArgumentNullException("asyncResult");
+
+            if (asyncResult.EndInvokeCalled)
+                throw new InvalidOperationException("EndRead has already been called.");
+
+            return asyncResult.EndInvoke();
+        }
+
         /// <summary>
         /// Performs SSH_FXP_READ request.
         /// </summary>
@@ -424,7 +483,10 @@ namespace Renci.SshNet.Sftp
                             {
                                 exception = GetSftpException(response);
                             }
-                            data = Array<byte>.Empty;
+                            else
+                            {
+                                data = Array<byte>.Empty;
+                            }
                             wait.Set();
                         });
 

+ 14 - 20
src/Renci.SshNet/SftpClient.cs

@@ -1996,37 +1996,31 @@ namespace Renci.SshNet
             var fullPath = _sftpSession.GetCanonicalPath(path);
 
             var handle = _sftpSession.RequestOpen(fullPath, Flags.Read);
+            
+            // TODO close handle in case of exception
+            // TODO decide whether to move opening (and closing) of handle to SftpFileReader
 
-            ulong offset = 0;
-
-            var optimalReadLength = _sftpSession.CalculateOptimalReadLength(_bufferSize);
-
-            var data = _sftpSession.RequestRead(handle, offset, optimalReadLength);
+            var fileReader = new SftpFileReader(handle, _sftpSession);
+            var totalBytesRead = 0UL;
 
-            //  Read data while available
-            while (data.Length > 0)
+            while (true)
             {
+                // TODO: cancel read ahead when download is canceled by user
+
                 //  Cancel download
                 if (asyncResult != null && asyncResult.IsDownloadCanceled)
                     break;
 
-                output.Write(data, 0, data.Length);
+                var data = fileReader.Read();
+                if (data.Length == 0)
+                    break;
 
-                output.Flush();
+                output.Write(data, 0, data.Length);
 
-                offset += (ulong)data.Length;
+                totalBytesRead += (ulong) data.Length;
 
-                //  Call callback to report number of bytes read
                 if (downloadCallback != null)
-                {
-                    // copy offset to ensure it's not modified between now and execution of callback
-                    var downloadOffset = offset;
-
-                    //  Execute callback on different thread
-                    ThreadAbstraction.ExecuteThread(() => { downloadCallback(downloadOffset); });
-                }
-
-                data = _sftpSession.RequestRead(handle, offset, optimalReadLength);
+                    downloadCallback(totalBytesRead);
             }
 
             _sftpSession.RequestClose(handle);