2
0
Эх сурвалжийг харах

Do not use reported file size to determine whether read-ahead should be stopped.
Only use file size for optimization.

Gert Driesen 8 жил өмнө
parent
commit
1f2c7f2547

+ 75 - 37
src/Renci.SshNet/Sftp/SftpFileReader.cs

@@ -6,13 +6,17 @@ using System.Threading;
 
 namespace Renci.SshNet.Sftp
 {
-    internal class SftpFileReader : IDisposable
+    internal class SftpFileReader : ISftpFileReader
     {
         private readonly byte[] _handle;
         private readonly ISftpSession _sftpSession;
-        private uint _chunkLength;
+        private readonly uint _chunkSize;
         private ulong _offset;
-        private ulong _fileSize;
+
+        /// <summary>
+        /// Holds the size of the file, when available.
+        /// </summary>
+        private long? _fileSize;
         private readonly IDictionary<int, BufferedRead> _queue;
 
         private int _readAheadChunkIndex;
@@ -20,7 +24,14 @@ namespace Renci.SshNet.Sftp
         private ManualResetEvent _readAheadCompleted;
         private int _nextChunkIndex;
 
-        private bool _isEndOfFile;
+        /// <summary>
+        /// Holds a value indicating whether EOF has already been signaled by the SSH server.
+        /// </summary>
+        private bool _endOfFileReceived;
+        /// <summary>
+        /// Holds a value indicating whether the client has read up to the end of the file.
+        /// </summary>
+        private bool _isEndOfFileRead;
         private SemaphoreLight _semaphore;
         private readonly object _readLock;
 
@@ -33,19 +44,20 @@ namespace Renci.SshNet.Sftp
         /// </summary>
         /// <param name="handle"></param>
         /// <param name="sftpSession"></param>
-        /// <param name="maxReadHead">The maximum number of pending reads.</param>
-        public SftpFileReader(byte[] handle, ISftpSession sftpSession, int maxReadHead)
+        /// <param name="chunkSize">The size of a individual read-ahead chunk.</param>
+        /// <param name="maxPendingReads">The maximum number of pending reads.</param>
+        /// <param name="fileSize">The size of the file, if known; otherwise, <c>null</c>.</param>
+        public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
         {
             _handle = handle;
             _sftpSession = sftpSession;
-            _chunkLength = 32 * 1024 - 13; // TODO !
-            _semaphore = new SemaphoreLight(maxReadHead);
-            _queue = new Dictionary<int, BufferedRead>(maxReadHead);
+            _fileSize = fileSize;
+            _chunkSize = chunkSize;
+            _semaphore = new SemaphoreLight(maxPendingReads);
+            _queue = new Dictionary<int, BufferedRead>(maxPendingReads);
             _readLock = new object();
             _readAheadCompleted = new ManualResetEvent(false);
 
-            _fileSize = (ulong)_sftpSession.RequestFStat(_handle).Size;
-
             StartReadAhead();
         }
 
@@ -53,7 +65,7 @@ namespace Renci.SshNet.Sftp
         {
             if (_exception != null || _disposed)
                 throw new ObjectDisposedException(GetType().FullName);
-            if (_isEndOfFile)
+            if (_isEndOfFileRead)
                 throw new SshException("Attempting to read beyond the end of the file.");
 
             lock (_readLock)
@@ -74,27 +86,32 @@ namespace Renci.SshNet.Sftp
                 if (nextChunk.Offset == _offset)
                 {
                     var data = nextChunk.Data;
-                    _offset += (ulong) data.Length;
 
-                    // remove processed chunk
-                    _queue.Remove(_nextChunkIndex);
-                    // move to next chunk
-                    _nextChunkIndex++;
                     // have we reached EOF?
                     if (data.Length == 0)
                     {
-                        _isEndOfFile = true;
+                        _isEndOfFileRead = true;
+                    }
+                    else
+                    {
+                        // remove processed chunk
+                        _queue.Remove(_nextChunkIndex);
+                        // update offset
+                        _offset += (ulong)data.Length;
+                        // move to next chunk
+                        _nextChunkIndex++;
                     }
                     // unblock wait in read-ahead
                     _semaphore.Release();
                     return data;
                 }
 
-                // when we received an EOF for the next chunk, then we only complete the current
-                // chunk if we haven't already read up to the file size
-                if (nextChunk.Data.Length == 0 && _offset == _fileSize)
+                // when we received an EOF for the next chunk and the size of the file is known, then
+                // we only complete the current chunk if we haven't already read up to the file size;
+                // this way we save an extra round-trip to the server
+                if (nextChunk.Data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value)
                 {
-                    _isEndOfFile = true;
+                    _isEndOfFileRead = true;
 
                     // unblock wait in read-ahead
                     _semaphore.Release();
@@ -104,6 +121,13 @@ namespace Renci.SshNet.Sftp
 
                 // when the server returned less bytes than requested (for the previous chunk)
                 // we'll synchronously request the remaining data
+                //
+                // due to the optimization above, we'll only get here in one of the following cases:
+                // - an EOF situation for files for which we were unable to obtain the file size
+                // - fewer bytes that requested were returned
+                // 
+                // according to the SSH specification, this last case should never happen for normal
+                // disk files (but can happen for device files).
 
                 var bytesToCatchUp = nextChunk.Offset - _offset;
 
@@ -111,6 +135,18 @@ namespace Renci.SshNet.Sftp
                 var read = _sftpSession.RequestRead(_handle, _offset, (uint) bytesToCatchUp);
                 if (read.Length == 0)
                 {
+                    // a zero-length (EOF) response is only valid for the read-back when EOF has
+                    // been signaled for the next read-ahead chunk
+                    if (nextChunk.Data.Length == 0)
+                    {
+                        _isEndOfFileRead = true;
+
+                        // unblock wait in read-ahead
+                        _semaphore.Release();
+                        // signal EOF to caller
+                        return read;
+                    }
+
                     // move reader to error state
                     _exception = new SshException("Unexpectedly reached end of file.");
                     // unblock wait in read-ahead
@@ -128,7 +164,6 @@ namespace Renci.SshNet.Sftp
         public void Dispose()
         {
             Dispose(true);
-            GC.SuppressFinalize(this);
         }
 
         protected void Dispose(bool disposing)
@@ -138,14 +173,16 @@ namespace Renci.SshNet.Sftp
                 var readAheadCompleted = _readAheadCompleted;
                 if (readAheadCompleted != null)
                 {
-                    _readAheadCompleted = null;
                     if (!readAheadCompleted.WaitOne(TimeSpan.FromSeconds(1)))
                     {
                         DiagnosticAbstraction.Log("Read-ahead thread did not complete within time-out.");
                     }
                     readAheadCompleted.Dispose();
+                    _readAheadCompleted = null;
                 }
 
+                _sftpSession.RequestClose(_handle);
+
                 _disposed = true;
             }
         }
@@ -154,6 +191,7 @@ namespace Renci.SshNet.Sftp
         {
             ThreadAbstraction.ExecuteThread(() =>
             {
+                // TODO: take dispose into account
                 while (_exception == null)
                 {
                     // TODO implement cancellation!?
@@ -161,15 +199,15 @@ namespace Renci.SshNet.Sftp
                     // TODO check if the BCL Semaphore unblocks wait on dispose (and mimick same behavior in our SemaphoreLight ?)
                     _semaphore.Wait();
 
-                    // don't bother reading any more chunks if we reached EOF, or an exception has occurred
+                    // don't bother reading any more chunks if we received EOF, or an exception has occurred
                     // while processing a chunk
-                    if (_isEndOfFile || _exception != null)
+                    if (_endOfFileReceived || _exception != null)
                         break;
 
                     // start reading next chunk
                     try
                     {
-                        _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkLength, ReadCompleted,
+                        _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted,
                                                new BufferedRead(_readAheadChunkIndex, _readAheadOffset));
                     }
                     catch (Exception ex)
@@ -178,15 +216,8 @@ namespace Renci.SshNet.Sftp
                         break;
                     }
 
-                    if (_readAheadOffset >= _fileSize)
-                    {
-                        // read one chunk beyond the chunk in which we read "file size" bytes
-                        // to get an EOF
-                        break;
-                    }
-
                     // advance read-ahead offset
-                    _readAheadOffset += _chunkLength;
+                    _readAheadOffset += _chunkSize;
 
                     _readAheadChunkIndex++;
                 }
@@ -201,7 +232,7 @@ namespace Renci.SshNet.Sftp
             if (readAsyncResult == null)
                 return;
 
-            byte[] data = null;
+            byte[] data;
 
             try
             {
@@ -219,8 +250,15 @@ namespace Renci.SshNet.Sftp
             bufferedRead.Complete(data);
             _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
 
+            // check if server signaled EOF
+            if (data.Length == 0)
+            {
+                // set a flag to stop read-aheads
+                _endOfFileReceived = true;
+            }
+
             // signal that a chunk has been read or EOF has been reached;
-            // in both cases, we want to unblock the "read-ahead" thread
+            // in both cases, Read() will eventually also unblock the "read-ahead" thread
             lock (_readLock)
             {
                 Monitor.PulseAll(_readLock);