Browse Source

Allow Dispose to interrupt blocking wait.
Avoid ObjectDisposedException in Read().

Gert Driesen 8 năm trước cách đây
mục cha
commit
703e24b629

+ 11 - 0
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTestBase.cs

@@ -1,7 +1,9 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using Moq;
+using Renci.SshNet.Common;
 using Renci.SshNet.Sftp;
 using System;
+using System.Threading;
 
 namespace Renci.SshNet.Tests.Classes.Sftp
 {
@@ -45,5 +47,14 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             random.NextBytes(chunk);
             return chunk;
         }
+
+        protected static int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout)
+        {
+            var result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout);
+
+            if (result == WaitHandle.WaitTimeout)
+                throw new SshOperationTimeoutException();
+            return result;
+        }
     }
 }

+ 28 - 8
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_DisposeShouldUnblockReadAndReadAhead.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private SftpFileReader _reader;
         private ObjectDisposedException _actualException;
 
@@ -27,23 +29,36 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             _handle = CreateByteArray(random, 5);
             _fileSize = 5000;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
         }
 
         protected override void SetupMocks()
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                            .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
         }
 
         protected override void Arrange()
@@ -94,18 +109,23 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             }
         }
 
-        [TestMethod]
-        public void DisposeShouldCloseHandleAndCompleteImmediately()
+        public void HandleShouldHaveBeenClosed()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
+        }
 
+        [TestMethod]
+        public void DisposeShouldCompleteImmediatelyAndNotAttemptToCloseHandleAgain()
+        {
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
             stopwatch.Stop();
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 23 - 6
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_LastChunkBeforeEofIsComplete.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk2;
         private byte[] _chunk3;
@@ -36,16 +38,27 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk2 = CreateByteArray(random, ChunkLength - 10);
             _chunk3 = new byte[0];
             _fileSize = _chunk1.Length + _chunk2.Length;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
         }
 
         protected override void SetupMocks()
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -56,7 +69,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -67,7 +81,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -131,7 +146,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -139,7 +155,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 23 - 6
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_LastChunkBeforeEofIsPartial.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk2;
         private byte[] _chunk3;
@@ -35,16 +37,27 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk2 = CreateByteArray(random, ChunkLength);
             _chunk3 = new byte[0];
             _fileSize = _chunk1.Length + _chunk2.Length;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
         }
 
         protected override void SetupMocks()
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -55,7 +68,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -66,7 +80,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -130,7 +145,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -138,7 +154,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 36 - 11
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_PreviousChunkIsIncompleteAndEofIsNotReached.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk2;
         private byte[] _chunk2CatchUp1;
@@ -34,6 +36,7 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private ManualResetEvent _chunk3BeginRead;
         private ManualResetEvent _chunk4BeginRead;
         private ManualResetEvent _chunk5BeginRead;
+        private ManualResetEvent _waitBeforeChunk6;
         private ManualResetEvent _chunk6BeginRead;
         private byte[] _actualChunk4;
         private byte[] _actualChunk2CatchUp1;
@@ -60,18 +63,30 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk3BeginRead = new ManualResetEvent(false);
             _chunk4BeginRead = new ManualResetEvent(false);
             _chunk5BeginRead = new ManualResetEvent(false);
+            _waitBeforeChunk6 = new ManualResetEvent(false);
             _chunk6BeginRead = new ManualResetEvent(false);
             _fileSize = _chunk1.Length + _chunk2.Length + _chunk2CatchUp1.Length + _chunk2CatchUp2.Length + _chunk3.Length + _chunk4.Length + _chunk5.Length;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
         }
 
         protected override void SetupMocks()
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -83,7 +98,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -95,7 +111,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -107,7 +124,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 3 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -119,7 +137,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 4 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -129,15 +148,17 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                                 asyncResult.SetAsCompleted(_chunk5, false);
                             })
                             .Returns((SftpReadAsyncResult)null);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Callback(() => _waitBeforeChunk6.Set())
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.RequestRead(_handle, 2 * ChunkLength - 17, 17))
                             .Returns(_chunk2CatchUp1);
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.RequestRead(_handle, 2 * ChunkLength - 7, 7))
                             .Returns(_chunk2CatchUp2);
-            SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
-            SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 5 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -170,6 +191,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _actualChunk2 = _reader.Read();
             // consuming chunk2 allows chunk5 to be read-ahead
             Assert.IsTrue(_chunk5BeginRead.WaitOne(200));
+            // pauze until the read-ahead has started waiting a semaphore to become available
+            Assert.IsTrue(_waitBeforeChunk6.WaitOne(200));
             // consume remaining parts of chunk 2
             _actualChunk2CatchUp1 = _reader.Read();
             _actualChunk2CatchUp2 = _reader.Read();
@@ -261,7 +284,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -269,7 +293,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 23 - 6
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_PreviousChunkIsIncompleteAndEofIsReached.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk2;
         private byte[] _chunk2CatchUp;
@@ -44,16 +46,27 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk2BeginRead = new ManualResetEvent(false);
             _chunk3BeginRead = new ManualResetEvent(false);
             _fileSize = _chunk1.Length + _chunk2.Length + _chunk2CatchUp.Length + _chunk3.Length;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
         }
 
         protected override void SetupMocks()
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -65,7 +78,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -77,7 +91,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                             .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -159,7 +174,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -167,7 +183,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 23 - 6
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_ReadAheadEndInvokeException_DiscardsFurtherReadAheads.cs

@@ -18,7 +18,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk3;
         private ManualResetEvent _readAheadChunk3Completed;
@@ -34,7 +36,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk1 = CreateByteArray(random, ChunkLength);
             _chunk3 = CreateByteArray(random, ChunkLength);
             _fileSize = 3 * ChunkLength;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
 
             _readAheadChunk3Completed = new ManualResetEvent(false);
 
@@ -45,9 +49,18 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                            .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                            .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -58,7 +71,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -76,7 +90,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -142,7 +157,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -150,7 +166,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 23 - 6
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_ReadAheadEndInvokeException_PreventsFurtherReadAheads.cs

@@ -18,7 +18,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk3;
         private SftpFileReader _reader;
@@ -35,7 +37,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk1 = CreateByteArray(random, ChunkLength);
             _chunk3 = CreateByteArray(random, ChunkLength);
             _fileSize = 3 * _chunk1.Length;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
 
             _readAheadChunk2 = new ManualResetEvent(false);
             _readChunk2 = new ManualResetEvent(false);
@@ -47,9 +51,18 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                            .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                            .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -60,7 +73,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -81,7 +95,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
         }
 
         protected override void Arrange()
@@ -137,7 +152,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -145,7 +161,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
 
         [TestMethod]

+ 23 - 8
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_Read_ReadAheadExceptionInWaitOnHandle_ChunkAvailable.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk2;
         private SftpFileReader _reader;
@@ -33,7 +35,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk1 = CreateByteArray(random, ChunkLength);
             _chunk2 = CreateByteArray(random, ChunkLength);
             _fileSize = _chunk1.Length + _chunk2.Length + 1;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
 
             _exception = new SshException();
             _exceptionSignaled = new ManualResetEvent(false);
@@ -43,9 +47,18 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                               {
+                                   _waitHandleArray[0] = disposingWaitHandle;
+                                   _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                                   return _waitHandleArray;
+                               });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                            .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -56,7 +69,7 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout))
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
                            .Callback(() => _exceptionSignaled.Set())
                            .Throws(_exception);
         }
@@ -70,7 +83,7 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
         protected override void Act()
         {
-            // wait for the exception to be signaled by the second call to WaitOnHandle
+            // wait for the exception to be signaled by the second call to WaitAny
             _exceptionSignaled.WaitOne(5000);
             // allow a little time to allow SftpFileReader to process exception
             Thread.Sleep(100);
@@ -86,14 +99,14 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         }
 
         [TestMethod]
-        public void ReadShouldHaveRethrownExceptionThrownByWaitOnHandle()
+        public void ReadShouldHaveRethrownExceptionThrownByWaitAny()
         {
             Assert.IsNotNull(_actualException);
             Assert.AreSame(_exception, _actualException);
         }
 
         [TestMethod]
-        public void ReadShouldRethrowExceptionThrownByWaitOnHandle()
+        public void ReadShouldRethrowExceptionThrownByWaitAny()
         {
             try
             {
@@ -109,7 +122,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -117,7 +131,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 20 - 5
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_Read_ReadAheadExceptionInWaitOnHandle_NoChunkAvailable.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private SftpFileReader _reader;
         private SshException _exception;
         private SshException _actualException;
@@ -28,7 +30,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             _handle = CreateByteArray(random, 5);
             _fileSize = random.Next();
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
 
             _exception = new SshException();
         }
@@ -37,15 +41,24 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout));
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                            .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                            .Returns((SftpReadAsyncResult)null);
             SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
             SftpSessionMock.InSequence(_seq)
-                           .Setup(p => p.WaitOnHandle(It.IsAny<WaitHandle>(), _operationTimeout))
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
                            .Throws(_exception);
         }
 
@@ -93,7 +106,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -101,7 +115,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 32 - 7
src/Renci.SshNet.Tests/Classes/Sftp/SftpFileReaderTest_Read_ReahAheadExceptionInBeginRead.cs

@@ -17,7 +17,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         private MockSequence _seq;
         private byte[] _handle;
         private int _fileSize;
+        private WaitHandle[] _waitHandleArray;
         private int _operationTimeout;
+        private SftpCloseAsyncResult _closeAsyncResult;
         private byte[] _chunk1;
         private byte[] _chunk2;
         private SftpFileReader _reader;
@@ -34,7 +36,9 @@ namespace Renci.SshNet.Tests.Classes.Sftp
             _chunk1 = CreateByteArray(random, ChunkLength);
             _chunk2 = CreateByteArray(random, ChunkLength);
             _fileSize = _chunk1.Length + _chunk2.Length + 1;
-            _operationTimeout = random.Next();
+            _waitHandleArray = new WaitHandle[2];
+            _operationTimeout = random.Next(10000, 20000);
+            _closeAsyncResult = new SftpCloseAsyncResult(null, null);
 
             _readAheadChunk3 = new ManualResetEvent(false);
             _readChunk3 = new ManualResetEvent(false);
@@ -46,6 +50,18 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         {
             _seq = new MockSequence();
 
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.CreateWaitHandleArray(It.IsNotNull<WaitHandle>(), It.IsNotNull<WaitHandle>()))
+                           .Returns<WaitHandle, WaitHandle>((disposingWaitHandle, semaphoreAvailableWaitHandle) =>
+                           {
+                               _waitHandleArray[0] = disposingWaitHandle;
+                               _waitHandleArray[1] = semaphoreAvailableWaitHandle;
+                               return _waitHandleArray;
+                           });
+            SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                            .Setup(p => p.BeginRead(_handle, 0, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                            .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -54,6 +70,10 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                                     asyncResult.SetAsCompleted(_chunk1, false);
                                 })
                            .Returns((SftpReadAsyncResult)null);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -62,6 +82,10 @@ namespace Renci.SshNet.Tests.Classes.Sftp
                                 asyncResult.SetAsCompleted(_chunk2, false);
                             })
                             .Returns((SftpReadAsyncResult)null);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.OperationTimeout).Returns(_operationTimeout);
+            SftpSessionMock.InSequence(_seq)
+                           .Setup(p => p.WaitAny(_waitHandleArray, _operationTimeout))
+                           .Returns(() => WaitAny(_waitHandleArray, _operationTimeout));
             SftpSessionMock.InSequence(_seq)
                             .Setup(p => p.BeginRead(_handle, 2 * ChunkLength, ChunkLength, It.IsNotNull<AsyncCallback>(), It.IsAny<BufferedRead>()))
                             .Callback<byte[], ulong, uint, AsyncCallback, object>((handle, offset, length, callback, state) =>
@@ -110,24 +134,24 @@ namespace Renci.SshNet.Tests.Classes.Sftp
         }
 
         [TestMethod]
-        public void ReadAfterReadAheadExceptionShouldThrowObjectDisposedException()
+        public void ReadAfterReadAheadExceptionShouldRethrowExceptionThatOccurredInReadAhead()
         {
             try
             {
                 _reader.Read();
                 Assert.Fail();
             }
-            catch (ObjectDisposedException ex)
+            catch (SshException ex)
             {
-                Assert.IsNull(ex.InnerException);
-                Assert.AreEqual(typeof(SftpFileReader).FullName, ex.ObjectName);
+                Assert.AreSame(_exception, ex);
             }
         }
 
         [TestMethod]
         public void DisposeShouldCloseHandleAndCompleteImmediately()
         {
-            SftpSessionMock.InSequence(_seq).Setup(p => p.RequestClose(_handle));
+            SftpSessionMock.InSequence(_seq).Setup(p => p.BeginClose(_handle, null, null)).Returns(_closeAsyncResult);
+            SftpSessionMock.InSequence(_seq).Setup(p => p.EndClose(_closeAsyncResult));
 
             var stopwatch = Stopwatch.StartNew();
             _reader.Dispose();
@@ -135,7 +159,8 @@ namespace Renci.SshNet.Tests.Classes.Sftp
 
             Assert.IsTrue(stopwatch.ElapsedMilliseconds < 200, "Dispose took too long to complete: " + stopwatch.ElapsedMilliseconds);
 
-            SftpSessionMock.Verify(p => p.RequestClose(_handle), Times.Once);
+            SftpSessionMock.Verify(p => p.BeginClose(_handle, null, null), Times.Once);
+            SftpSessionMock.Verify(p => p.EndClose(_closeAsyncResult), Times.Once);
         }
     }
 }

+ 84 - 0
src/Renci.SshNet/ISubsystemSession.cs

@@ -46,5 +46,89 @@ namespace Renci.SshNet
         /// <exception cref="SshException">The channel was closed.</exception>
         /// <exception cref="SshOperationTimeoutException">The handle did not get signaled within the specified timeout.</exception>
         void WaitOnHandle(WaitHandle waitHandle, int millisecondsTimeout);
+
+        /// <summary>
+        /// Blocks the current thread until the specified <see cref="WaitHandle"/> gets signaled, using a
+        /// 32-bit signed integer to specify the time interval in milliseconds.
+        /// </summary>
+        /// <param name="waitHandle">The handle to wait for.</param>
+        /// <param name="millisecondsTimeout">To number of milliseconds to wait for <paramref name="waitHandle"/> to get signaled, or -1 to wait indefinitely.</param>
+        /// <returns>
+        /// <c>true</c> if <paramref name="waitHandle"/> received a signal within the specified timeout;
+        /// otherwise, <c>false</c>.
+        /// </returns>
+        /// <exception cref="SshException">The connection was closed by the server.</exception>
+        /// <exception cref="SshException">The channel was closed.</exception>
+        /// <remarks>
+        /// The blocking wait is also interrupted when either the established channel is closed, the current
+        /// session is disconnected or an unexpected <see cref="Exception"/> occurred while processing a channel
+        /// or session event.
+        /// </remarks>
+        bool WaitOne(WaitHandle waitHandle, int millisecondsTimeout);
+
+        /// <summary>
+        /// Blocks the current thread until the specified <see cref="WaitHandle"/> gets signaled, using a
+        /// 32-bit signed integer to specify the time interval in milliseconds.
+        /// </summary>
+        /// <param name="waitHandleA">The first handle to wait for.</param>
+        /// <param name="waitHandleB">The second handle to wait for.</param>
+        /// <param name="millisecondsTimeout">To number of milliseconds to wait for a <see cref="WaitHandle"/> to get signaled, or -1 to wait indefinitely.</param>
+        /// <returns>
+        /// <c>0</c> if <paramref name="waitHandleA"/> received a signal within the specified timeout and <c>1</c>
+        /// if <paramref name="waitHandleB"/> received a signal within the specified timeout, or <see cref="WaitHandle.WaitTimeout"/>
+        /// if no object satisfied the wait.
+        /// </returns>
+        /// <exception cref="SshException">The connection was closed by the server.</exception>
+        /// <exception cref="SshException">The channel was closed.</exception>
+        /// <remarks>
+        /// <para>
+        /// The blocking wait is also interrupted when either the established channel is closed, the current
+        /// session is disconnected or an unexpected <see cref="Exception"/> occurred while processing a channel
+        /// or session event.
+        /// </para>
+        /// <para>
+        /// When both <paramref name="waitHandleA"/> and <paramref name="waitHandleB"/> are signaled during the call,
+        /// then <c>0</c> is returned.
+        /// </para>
+        /// </remarks>
+        int WaitAny(WaitHandle waitHandleA, WaitHandle waitHandleB, int millisecondsTimeout);
+
+        /// <summary>
+        /// Waits for any of the elements in the specified array to receive a signal, using a 32-bit signed
+        /// integer to specify the time interval.
+        /// </summary>
+        /// <param name="waitHandles">A <see cref="WaitHandle"/> array - constructed using <see cref="CreateWaitHandleArray(WaitHandle[])"/> - containing the objects to wait for.</param>
+        /// <param name="millisecondsTimeout">To number of milliseconds to wait for a <see cref="WaitHandle"/> to get signaled, or -1 to wait indefinitely.</param>
+        /// <returns>
+        /// The array index of the first non-system object that satisfied the wait.
+        /// </returns>
+        /// <exception cref="SshException">The connection was closed by the server.</exception>
+        /// <exception cref="SshException">The channel was closed.</exception>
+        /// <exception cref="SshOperationTimeoutException">No object satified the wait and a time interval equivalent to <paramref name="millisecondsTimeout"/> has passed.</exception>
+        /// <remarks>
+        /// For the return value, the index of the first non-system object is considered to be zero.
+        /// </remarks>
+        int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout);
+
+        /// <summary>
+        /// Creates a <see cref="WaitHandle"/> array that is composed of system objects and the specified
+        /// elements.
+        /// </summary>
+        /// <param name="waitHandles">A <see cref="WaitHandle"/> array containing the objects to wait for.</param>
+        /// <returns>
+        /// A <see cref="WaitHandle"/> array that is composed of system objects and the specified elements.
+        /// </returns>
+        WaitHandle[] CreateWaitHandleArray(params WaitHandle[] waitHandles);
+
+        /// <summary>
+        /// Creates a <see cref="WaitHandle"/> array that is composed of system objects and the specified
+        /// elements.
+        /// </summary>
+        /// <param name="waitHandle1">The first <see cref="WaitHandle"/> to wait for.</param>
+        /// <param name="waitHandle2">The second <see cref="WaitHandle"/> to wait for.</param>
+        /// <returns>
+        /// A <see cref="WaitHandle"/> array that is composed of system objects and the specified elements.
+        /// </returns>
+        WaitHandle[] CreateWaitHandleArray(WaitHandle waitHandle1, WaitHandle waitHandle2);
     }
 }

+ 3 - 2
src/Renci.SshNet/ServiceFactory.cs

@@ -109,7 +109,6 @@ namespace Renci.SshNet
             int maxPendingReads;
 
             var chunkSize = sftpSession.CalculateOptimalReadLength(bufferSize);
-            var handle = sftpSession.EndOpen(openAsyncResult);
 
             // fallback to a default maximum of pending reads when remote server does not allow us to obtain
             // the attributes of the file
@@ -117,7 +116,7 @@ namespace Renci.SshNet
             {
                 var fileAttributes = sftpSession.EndLStat(statAsyncResult);
                 fileSize = fileAttributes.Size;
-                maxPendingReads = Math.Min(20, (int) Math.Ceiling((double) fileAttributes.Size / chunkSize) + 1);
+                maxPendingReads = Math.Min(10, (int) Math.Ceiling((double) fileAttributes.Size / chunkSize) + 1);
             }
             catch (SshException ex)
             {
@@ -127,6 +126,8 @@ namespace Renci.SshNet
                 DiagnosticAbstraction.Log(string.Format("Failed to obtain size of file. Allowing maximum {0} pending reads: {1}", maxPendingReads, ex));
             }
 
+            var handle = sftpSession.EndOpen(openAsyncResult);
+
             return sftpSession.CreateFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize);
         }
     }

+ 94 - 60
src/Renci.SshNet/Sftp/SftpFileReader.cs

@@ -2,6 +2,7 @@
 using Renci.SshNet.Common;
 using System;
 using System.Collections.Generic;
+using System.Globalization;
 using System.Threading;
 
 namespace Renci.SshNet.Sftp
@@ -18,12 +19,14 @@ namespace Renci.SshNet.Sftp
         /// <summary>
         /// Holds the size of the file, when available.
         /// </summary>
-        private long? _fileSize;
+        private readonly long? _fileSize;
         private readonly IDictionary<int, BufferedRead> _queue;
+        private readonly WaitHandle[] _waitHandles;
 
         private int _readAheadChunkIndex;
         private ulong _readAheadOffset;
         private ManualResetEvent _readAheadCompleted;
+        private ManualResetEvent _disposingWaitHandle;
         private int _nextChunkIndex;
 
         /// <summary>
@@ -38,7 +41,6 @@ namespace Renci.SshNet.Sftp
         private readonly object _readLock;
 
         private Exception _exception;
-        private bool _disposed;
 
         /// <summary>
         /// Initializes a new <see cref="SftpFileReader"/> instance with the specified handle,
@@ -59,13 +61,15 @@ namespace Renci.SshNet.Sftp
             _queue = new Dictionary<int, BufferedRead>(maxPendingReads);
             _readLock = new object();
             _readAheadCompleted = new ManualResetEvent(false);
+            _disposingWaitHandle = new ManualResetEvent(false);
+            _waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
 
             StartReadAhead();
         }
 
         public byte[] Read()
         {
-            if (_disposed)
+            if (_disposingWaitHandle == null)
                 throw new ObjectDisposedException(GetType().FullName);
             if (_exception != null)
                 throw _exception;
@@ -85,13 +89,15 @@ namespace Renci.SshNet.Sftp
                 if (_exception != null)
                     throw _exception;
 
+                var data = nextChunk.Data;
+
                 if (nextChunk.Offset == _offset)
                 {
-                    var data = nextChunk.Data;
-
                     // have we reached EOF?
                     if (data.Length == 0)
                     {
+                        // PERF: we do not bother updating internal state when we've EOF
+
                         _isEndOfFileRead = true;
                     }
                     else
@@ -105,16 +111,17 @@ namespace Renci.SshNet.Sftp
                     }
                     // unblock wait in read-ahead
                     _semaphore.Release();
+
                     return data;
                 }
 
                 // when we received an EOF for the next chunk and the size of the file is known, then
                 // we only complete the current chunk if we haven't already read up to the file size;
                 // this way we save an extra round-trip to the server
-                if (nextChunk.Data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value)
+                if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong) _fileSize.Value)
                 {
+                    // avoid future reads
                     _isEndOfFileRead = true;
-
                     // unblock wait in read-ahead
                     _semaphore.Release();
                     // signal EOF to caller
@@ -122,7 +129,6 @@ namespace Renci.SshNet.Sftp
                 }
             }
 
-
             // when the server returned less bytes than requested (for the previous chunk)
             // we'll synchronously request the remaining data
             //
@@ -143,6 +149,7 @@ namespace Renci.SshNet.Sftp
             var read = _sftpSession.RequestRead(_handle, _offset, (uint) bytesToCatchUp);
             if (read.Length == 0)
             {
+                // process data in read lock to avoid ObjectDisposedException while releasing semaphore
                 lock (_readLock)
                 {
                     // a zero-length (EOF) response is only valid for the read-back when EOF has
@@ -150,9 +157,12 @@ namespace Renci.SshNet.Sftp
                     if (nextChunk.Data.Length == 0)
                     {
                         _isEndOfFileRead = true;
-
-                        // unblock wait in read-ahead
-                        _semaphore.Release();
+                        // ensure we've not yet disposed the current instance
+                        if (_semaphore != null)
+                        {
+                            // unblock wait in read-ahead
+                            _semaphore.Release();
+                        }
                         // signal EOF to caller
                         return read;
                     }
@@ -175,52 +185,57 @@ namespace Renci.SshNet.Sftp
             return read;
         }
 
+        ~SftpFileReader()
+        {
+            Dispose(false);
+        }
+
         public void Dispose()
         {
             Dispose(true);
+            GC.SuppressFinalize(this);
         }
 
         /// <summary>
         /// Releases unmanaged and - optionally - managed resources
         /// </summary>
         /// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
-        /// <remarks>
-        /// Note that we will break the read-ahead loop, but will not interrupt the blocking wait
-        /// in that loop.
-        /// </remarks>
         protected void Dispose(bool disposing)
         {
-            if (_disposed)
+            if (_disposingWaitHandle == null)
                 return;
 
             if (disposing)
             {
-                // use this to break the read-ahead loop
+                // record exception to break prevent future Read()
                 _exception = new ObjectDisposedException(GetType().FullName);
 
-                var readAheadCompleted = _readAheadCompleted;
-                if (readAheadCompleted != null)
-                {
-                    readAheadCompleted.WaitOne();
-                    readAheadCompleted.Dispose();
-                    _readAheadCompleted = null;
-                }
+                // signal that we're disposing to interrupt wait in read-ahead
+                _disposingWaitHandle.Set();
 
-                // dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
-                // in Read()
+                // wait until the read-ahead thread has completed
+                _readAheadCompleted.WaitOne();
+
+                // unblock the Read()
                 lock (_readLock)
                 {
-                    if (_semaphore != null)
-                    {
-                        _semaphore.Dispose();
-                        _semaphore = null;
-                    }
+                    // dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
+                    // in Read()
+                    _semaphore.Dispose();
+                    // awake Read
+                    Monitor.Pulse(_readLock);
                 }
 
-                _sftpSession.RequestClose(_handle);
-            }
+                var closeAsyncResult = _sftpSession.BeginClose(_handle, null, null);
 
-            _disposed = true;
+                _readAheadCompleted.Dispose();
+                _disposingWaitHandle.Dispose();
+
+                _sftpSession.EndClose(closeAsyncResult);
+
+                // dereference the disposing waithandle to indicate that the current instance is disposed
+                _disposingWaitHandle = null;
+            }
         }
 
         private void StartReadAhead()
@@ -229,36 +244,26 @@ namespace Renci.SshNet.Sftp
             {
                 while (!_endOfFileReceived && _exception == null)
                 {
-                    // wait on either a release on the semaphore, or signaling of a broken session/connection
-                    try
-                    {
-                        _sftpSession.WaitOnHandle(_semaphore.AvailableWaitHandle, _sftpSession.OperationTimeout);
-                    }
-                    catch (Exception ex)
+                    // check if we should continue with the read-ahead loop
+                    // note that the EOF and exception check are not included
+                    // in this check as they do not require Read() to be
+                    // unblocked (or have already done this)
+                    if (!ContinueReadAhead())
                     {
-                        _exception = ex;
-
                         // unblock the Read()
                         lock (_readLock)
                         {
                             Monitor.Pulse(_readLock);
                         }
-
                         // break the read-ahead loop
                         break;
                     }
 
                     // attempt to obtain the semaphore; this may time out when all semaphores are
-                    // in use due to pending read-aheads
-                    //
-                    // in general this only happens when the session is broken
+                    // in use due to pending read-aheads (which in turn can happen when the server
+                    // is slow to respond or when the session is broken)
                     if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))
                     {
-                        // unblock the Read()
-                        lock (_readLock)
-                        {
-                            Monitor.Pulse(_readLock);
-                        }
                         // re-evaluate whether an exception occurred, and - if not - wait again
                         continue;
                     }
@@ -290,6 +295,34 @@ namespace Renci.SshNet.Sftp
             });
         }
 
+        /// <summary>
+        /// Returns a value indicating whether the read-ahead loop should be continued.
+        /// </summary>
+        /// <returns>
+        /// <c>true</c> if the read-ahead loop should be continued; otherwise, <c>false</c>.
+        /// </returns>
+        private bool ContinueReadAhead()
+        {
+            try
+            {
+                var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
+                switch (waitResult)
+                {
+                    case 0: // disposing
+                        return false;
+                    case 1: // semaphore available
+                        return true;
+                    default:
+                        throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", waitResult));
+                }
+            }
+            catch (Exception ex)
+            {
+                _exception = ex;
+                return false;
+            }
+        }
+
         private void ReadCompleted(IAsyncResult result)
         {
             var readAsyncResult = result as SftpReadAsyncResult;
@@ -312,7 +345,15 @@ namespace Renci.SshNet.Sftp
             // but there may be pending reads before that read
             var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
             bufferedRead.Complete(data);
-            _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
+
+            lock (_readLock)
+            {
+                // add item to queue
+                _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
+                // signal that a chunk has been read or EOF has been reached;
+                // in both cases, Read() will eventually also unblock the "read-ahead" thread
+                Monitor.Pulse(_readLock);
+            }
 
             // check if server signaled EOF
             if (data.Length == 0)
@@ -320,13 +361,6 @@ namespace Renci.SshNet.Sftp
                 // set a flag to stop read-aheads
                 _endOfFileReceived = true;
             }
-
-            // signal that a chunk has been read or EOF has been reached;
-            // in both cases, Read() will eventually also unblock the "read-ahead" thread
-            lock (_readLock)
-            {
-                Monitor.PulseAll(_readLock);
-            }
         }
 
         private void HandleFailure(Exception cause)
@@ -339,7 +373,7 @@ namespace Renci.SshNet.Sftp
             // unblock Read()
             lock (_readLock)
             {
-                Monitor.PulseAll(_readLock);
+                Monitor.Pulse(_readLock);
             }
         }
 

+ 191 - 2
src/Renci.SshNet/SubsystemSession.cs

@@ -13,6 +13,12 @@ namespace Renci.SshNet
     /// </summary>
     internal abstract class SubsystemSession : ISubsystemSession
     {
+        /// <summary>
+        /// Holds the number of system wait handles that are returned as the leading entries in the array returned
+        /// in <see cref="CreateWaitHandleArray(WaitHandle[])"/>.
+        /// </summary>
+        private const int SystemWaitHandleCount = 3;
+
         private ISession _session;
         private readonly string _subsystemName;
         private IChannelSession _channel;
@@ -225,7 +231,107 @@ namespace Renci.SshNet
                     waitHandle
                 };
 
-            switch (WaitHandle.WaitAny(waitHandles, millisecondsTimeout))
+            var result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout);
+            switch (result)
+            {
+                case 0:
+                    throw _exception;
+                case 1:
+                    throw new SshException("Connection was closed by the server.");
+                case 2:
+                    throw new SshException("Channel was closed.");
+                case 3:
+                    break;
+                case WaitHandle.WaitTimeout:
+                    throw new SshOperationTimeoutException("Operation has timed out.");
+                default:
+                    throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", result));
+            }
+        }
+
+        /// <summary>
+        /// Blocks the current thread until the specified <see cref="WaitHandle"/> gets signaled, using a
+        /// 32-bit signed integer to specify the time interval in milliseconds.
+        /// </summary>
+        /// <param name="waitHandle">The handle to wait for.</param>
+        /// <param name="millisecondsTimeout">To number of milliseconds to wait for <paramref name="waitHandle"/> to get signaled, or -1 to wait indefinitely.</param>
+        /// <returns>
+        /// <c>true</c> if <paramref name="waitHandle"/> received a signal within the specified timeout;
+        /// otherwise, <c>false</c>.
+        /// </returns>
+        /// <exception cref="SshException">The connection was closed by the server.</exception>
+        /// <exception cref="SshException">The channel was closed.</exception>
+        /// <remarks>
+        /// The blocking wait is also interrupted when either the established channel is closed, the current
+        /// session is disconnected or an unexpected <see cref="Exception"/> occurred while processing a channel
+        /// or session event.
+        /// </remarks>
+        public bool WaitOne(WaitHandle waitHandle, int millisecondsTimeout)
+        {
+            var waitHandles = new[]
+                {
+                    _errorOccuredWaitHandle,
+                    _sessionDisconnectedWaitHandle,
+                    _channelClosedWaitHandle,
+                    waitHandle
+                };
+
+            var result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout);
+            switch (result)
+            {
+                case 0:
+                    throw _exception;
+                case 1:
+                    throw new SshException("Connection was closed by the server.");
+                case 2:
+                    throw new SshException("Channel was closed.");
+                case 3:
+                    return true;
+                case WaitHandle.WaitTimeout:
+                    return false;
+                default:
+                    throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", result));
+            }
+        }
+
+        /// <summary>
+        /// Blocks the current thread until the specified <see cref="WaitHandle"/> gets signaled, using a
+        /// 32-bit signed integer to specify the time interval in milliseconds.
+        /// </summary>
+        /// <param name="waitHandle1">The first handle to wait for.</param>
+        /// <param name="waitHandle2">The second handle to wait for.</param>
+        /// <param name="millisecondsTimeout">To number of milliseconds to wait for a <see cref="WaitHandle"/> to get signaled, or -1 to wait indefinitely.</param>
+        /// <returns>
+        /// <c>0</c> if <paramref name="waitHandle1"/> received a signal within the specified timeout, and <c>1</c>
+        /// if <paramref name="waitHandle2"/> received a signal within the specified timeout.
+        /// </returns>
+        /// <exception cref="SshException">The connection was closed by the server.</exception>
+        /// <exception cref="SshException">The channel was closed.</exception>
+        /// <exception cref="SshOperationTimeoutException">The handle did not get signaled within the specified timeout.</exception>
+        /// <remarks>
+        /// <para>
+        /// The blocking wait is also interrupted when either the established channel is closed, the current
+        /// session is disconnected or an unexpected <see cref="Exception"/> occurred while processing a channel
+        /// or session event.
+        /// </para>
+        /// <para>
+        /// When both <paramref name="waitHandle1"/> and <paramref name="waitHandle2"/> are signaled during the call,
+        /// then <c>0</c> is returned.
+        /// </para>
+        /// </remarks>
+        public int WaitAny(WaitHandle waitHandle1, WaitHandle waitHandle2, int millisecondsTimeout)
+        {
+            var waitHandles = new[]
+                {
+                    _errorOccuredWaitHandle,
+                    _sessionDisconnectedWaitHandle,
+                    _channelClosedWaitHandle,
+                    waitHandle1,
+                    waitHandle2
+                };
+
+            var result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout);
+            switch (result)
             {
                 case 0:
                     throw _exception;
@@ -233,11 +339,94 @@ namespace Renci.SshNet
                     throw new SshException("Connection was closed by the server.");
                 case 2:
                     throw new SshException("Channel was closed.");
+                case 3:
+                    return 0;
+                case 4:
+                    return 1;
                 case WaitHandle.WaitTimeout:
-                    throw new SshOperationTimeoutException(string.Format(CultureInfo.CurrentCulture, "Operation has timed out."));
+                    throw new SshOperationTimeoutException("Operation has timed out.");
+                default:
+                    throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", result));
             }
         }
 
+        /// <summary>
+        /// Waits for any of the elements in the specified array to receive a signal, using a 32-bit signed
+        /// integer to specify the time interval.
+        /// </summary>
+        /// <param name="waitHandles">A <see cref="WaitHandle"/> array - constructed using <see cref="CreateWaitHandleArray(WaitHandle[])"/> - containing the objects to wait for.</param>
+        /// <param name="millisecondsTimeout">To number of milliseconds to wait for a <see cref="WaitHandle"/> to get signaled, or -1 to wait indefinitely.</param>
+        /// <returns>
+        /// The array index of the first non-system object that satisfied the wait.
+        /// </returns>
+        /// <exception cref="SshException">The connection was closed by the server.</exception>
+        /// <exception cref="SshException">The channel was closed.</exception>
+        /// <exception cref="SshOperationTimeoutException">No object satified the wait and a time interval equivalent to <paramref name="millisecondsTimeout"/> has passed.</exception>
+        /// <remarks>
+        /// For the return value, the index of the first non-system object is considered to be zero.
+        /// </remarks>
+        public int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout)
+        {
+            var result = WaitHandle.WaitAny(waitHandles, millisecondsTimeout);
+            switch (result)
+            {
+                case 0:
+                    throw _exception;
+                case 1:
+                    throw new SshException("Connection was closed by the server.");
+                case 2:
+                    throw new SshException("Channel was closed.");
+                case WaitHandle.WaitTimeout:
+                    throw new SshOperationTimeoutException("Operation has timed out.");
+                default:
+                    return result - SystemWaitHandleCount;
+            }
+        }
+
+        /// <summary>
+        /// Creates a <see cref="WaitHandle"/> array that is composed of system objects and the specified
+        /// elements.
+        /// </summary>
+        /// <param name="waitHandle1">The first <see cref="WaitHandle"/> to wait for.</param>
+        /// <param name="waitHandle2">The second <see cref="WaitHandle"/> to wait for.</param>
+        /// <returns>
+        /// A <see cref="WaitHandle"/> array that is composed of system objects and the specified elements.
+        /// </returns>
+        public WaitHandle[] CreateWaitHandleArray(WaitHandle waitHandle1, WaitHandle waitHandle2)
+        {
+            return new WaitHandle[]
+                {
+                    _errorOccuredWaitHandle,
+                    _sessionDisconnectedWaitHandle,
+                    _channelClosedWaitHandle,
+                    waitHandle1,
+                    waitHandle2
+                };
+        }
+
+        /// <summary>
+        /// Creates a <see cref="WaitHandle"/> array that is composed of system objects and the specified
+        /// elements.
+        /// </summary>
+        /// <param name="waitHandles">A <see cref="WaitHandle"/> array containing the objects to wait for.</param>
+        /// <returns>
+        /// A <see cref="WaitHandle"/> array that is composed of system objects and the specified elements.
+        /// </returns>
+        public WaitHandle[] CreateWaitHandleArray(params WaitHandle[] waitHandles)
+        {
+            var array = new WaitHandle[waitHandles.Length + SystemWaitHandleCount];
+            array[0] = _errorOccuredWaitHandle;
+            array[1] = _sessionDisconnectedWaitHandle;
+            array[2] = _channelClosedWaitHandle;
+
+            for (var i = 0; i < waitHandles.Length; i++)
+            {
+                array[i + SystemWaitHandleCount] = waitHandles[i];
+            }
+
+            return array;
+        }
+
         private void Session_Disconnected(object sender, EventArgs e)
         {
             var sessionDisconnectedWaitHandle = _sessionDisconnectedWaitHandle;