Browse Source

Harden read-ahead by relying
Throw SshException when read has completed.
Identify more tasks that need to be completed before the implementation can be considered done.

Gert Driesen 8 years ago
parent
commit
5b3e64e368
1 changed files with 55 additions and 26 deletions
  1. 55 26
      src/Renci.SshNet/Sftp/SftpFileReader.cs

+ 55 - 26
src/Renci.SshNet/Sftp/SftpFileReader.cs

@@ -20,6 +20,7 @@ namespace Renci.SshNet.Sftp
         private ulong _readAheadOffset;
         private ulong _offset;
         private ulong _fileSize;
+        private Exception _exception;
         private readonly IDictionary<int, BufferedRead> _queue;
         private readonly object _readLock;
 
@@ -36,15 +37,23 @@ namespace Renci.SshNet.Sftp
 
             ThreadAbstraction.ExecuteThread(() =>
             {
-                while (!_isCompleted)
-                {
-                    // we read one chunk beyond the file size to get an EOF
-                    if (_readAheadOffset > _fileSize)
-                        break;
+                // read one chunk beyond the chunk in which we read "file size" bytes
+                // to get an EOF
 
+                while (_readAheadOffset <= (_fileSize + _chunkLength) && _exception == null)
+                {
                     // TODO implement cancellation!?
+                    // TODO implement IDisposable to cancel the Wait in case the client never completes reading to EOF
+                    // TODO check if the BCL Semaphore unblocks wait on dispose (and mimick same behavior in our SemaphoreLight ?)
                     _semaphore.Wait();
 
+                    // don't bother reading any more chunks if we reached EOF, or an exception has occurred
+                    // while processing a chunk
+                    if (_isCompleted || _exception != null)
+                        break;
+
+                    // TODO: catch exception, signal error to Read() and break loop
+
                     // start reading next chunk
                     _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkLength, ReadCompleted,
                                            new BufferedRead(_readAheadChunkIndex, _readAheadOffset));
@@ -54,20 +63,28 @@ namespace Renci.SshNet.Sftp
 
                     _readAheadChunkIndex++;
                 }
+
+                Console.WriteLine("Finished read-ahead");
             });
         }
 
         public byte[] Read()
         {
+            if (_isCompleted)
+                throw new SshException("Attempting to read beyond the end of the file.");
+
             lock (_readLock)
             {
                 BufferedRead nextChunk;
 
-                while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && !_isCompleted)
+                // TODO: break when we've reached file size and still haven't received an EOF ?
+
+                // wait until either the next chunk is avalable or an exception has occurred
+                while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception == null)
                     Monitor.Wait(_readLock);
 
-                if (_isCompleted)
-                    return new byte[0];
+                if (_exception != null)
+                    throw _exception;
 
                 if (nextChunk.Offset == _offset)
                 {
@@ -78,11 +95,28 @@ namespace Renci.SshNet.Sftp
                     _queue.Remove(_nextChunkIndex);
                     // move to next chunk
                     _nextChunkIndex++;
-                    // allow read-ahead of a new chunk
+                    // have we reached EOF?
+                    if (data.Length == 0)
+                    {
+                        _isCompleted = true;
+                    }
+                    // unblock wait in read-ahead
                     _semaphore.Release();
                     return data;
                 }
 
+                // when we received an EOF for the next chunk, then we only complete the current
+                // chunk if we haven't already read up to the file size
+                if (nextChunk.Data.Length == 0 && _offset == _fileSize)
+                {
+                    _isCompleted = true;
+
+                    // unblock wait in read-ahead
+                    _semaphore.Release();
+                    // signal EOF to caller
+                    return nextChunk.Data;
+                }
+
                 // when the server returned less bytes than requested (for the previous chunk)
                 // we'll synchronously request the remaining data
 
@@ -95,13 +129,12 @@ namespace Renci.SshNet.Sftp
                     var read = _sftpSession.RequestRead(_handle, _offset, (uint) catchUp.Length);
                     if (read.Length == 0)
                     {
-                        // break in loop in "read-ahead" thread (once a blocking wait is interrupted)
-                        _isCompleted = true;
-                        // interrupt blocking wait in "read-ahead" thread
-                        lock (_readLock)
-                            Monitor.PulseAll(_readLock);
-                        // signal failure
-                        throw new SshException("Unexpectedly reached end of file.");
+                        // move reader to error state
+                        _exception = new SshException("Unexpectedly reached end of file.");
+                        // unblock wait in read-ahead
+                        _semaphore.Release();
+                        // notify caller of error
+                        throw _exception;
                     }
 
                     bytesCaughtUp += read.Length;
@@ -119,16 +152,12 @@ namespace Renci.SshNet.Sftp
                 return;
 
             var data = readAsyncResult.EndInvoke();
-            if (data.Length == 0)
-            {
-                _isCompleted = true;
-            }
-            else
-            {
-                var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
-                bufferedRead.Complete(data);
-                _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
-            }
+
+            // a read that completes with a zero-byte result signals EOF
+            // but there may be pending reads before that read
+            var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
+            bufferedRead.Complete(data);
+            _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
 
             // signal that a chunk has been read or EOF has been reached;
             // in both cases, we want to unblock the "read-ahead" thread