浏览代码

Modify PipeStream to interrupt blocking read when stream is closed/disposed.
Added tests for PipeStream.

drieseng 9 年之前
父节点
当前提交
6778cce6ca

+ 50 - 0
src/Renci.SshNet.Tests/Common/PipeStream_Close_BlockingRead.cs

@@ -0,0 +1,50 @@
+using System;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Renci.SshNet.Common;
+
+namespace Renci.SshNet.Tests.Common
+{
+    [TestClass]
+    public class PipeStream_Close_BlockingRead
+    {
+        private PipeStream _pipeStream;
+        private int _bytesRead;
+        private IAsyncResult _asyncReadResult;
+
+        [TestInitialize]
+        public void Init()
+        {
+            _pipeStream = new PipeStream();
+
+            _pipeStream.WriteByte(10);
+            _pipeStream.WriteByte(13);
+            _pipeStream.WriteByte(25);
+
+            _bytesRead = 123;
+
+            Action readAction = () => _bytesRead = _pipeStream.Read(new byte[4], 0, 4);
+            _asyncReadResult = readAction.BeginInvoke(null, null);
+            _asyncReadResult.AsyncWaitHandle.WaitOne(50);
+
+            Act();
+        }
+
+        protected void Act()
+        {
+            _pipeStream.Close();
+        }
+
+        [TestMethod]
+        public void AsyncReadShouldHaveFinished()
+        {
+            Assert.IsTrue(_asyncReadResult.IsCompleted);
+        }
+
+        [TestMethod]
+        public void ReadShouldHaveReturnedZero()
+        {
+            Assert.AreEqual(0, _bytesRead);
+        }
+    }
+}

+ 59 - 0
src/Renci.SshNet.Tests/Common/PipeStream_Close_BlockingWrite.cs

@@ -0,0 +1,59 @@
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Renci.SshNet.Common;
+
+namespace Renci.SshNet.Tests.Common
+{
+    [TestClass]
+    public class PipeStream_Close_BlockingWrite
+    {
+        private PipeStream _pipeStream;
+        private Exception _writeException;
+        private IAsyncResult _asyncWriteResult;
+
+        [TestInitialize]
+        public void Init()
+        {
+            _pipeStream = new PipeStream {MaxBufferLength = 3};
+
+            Action writeAction = () =>
+            {
+                _pipeStream.WriteByte(10);
+                _pipeStream.WriteByte(13);
+                _pipeStream.WriteByte(25);
+
+                try
+                {
+                    _pipeStream.WriteByte(35);
+                }
+                catch (Exception ex)
+                {
+                    _writeException = ex;
+                    throw;
+                }
+            };
+            _asyncWriteResult = writeAction.BeginInvoke(null, null);
+            _asyncWriteResult.AsyncWaitHandle.WaitOne(50);
+
+            Act();
+        }
+
+        protected void Act()
+        {
+            _pipeStream.Close();
+        }
+
+        [TestMethod]
+        public void AsyncWriteShouldHaveFinished()
+        {
+            Assert.IsTrue(_asyncWriteResult.IsCompleted);
+        }
+
+        [TestMethod]
+        public void WriteThatExceedsMaxBufferLengthShouldHaveThrownObjectDisposedException()
+        {
+            Assert.IsNotNull(_writeException);
+            Assert.AreEqual(typeof (ObjectDisposedException), _writeException.GetType());
+        }
+    }
+}

+ 114 - 0
src/Renci.SshNet.Tests/Common/PipeStream_Flush_BytesRemainingAfterRead.cs

@@ -0,0 +1,114 @@
+using System;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Renci.SshNet.Common;
+
+namespace Renci.SshNet.Tests.Common
+{
+    [TestClass]
+    public class PipeStream_Flush_BytesRemainingAfterRead
+    {
+        private PipeStream _pipeStream;
+        private byte[] _readBuffer;
+        private int _bytesRead;
+        private IAsyncResult _asyncReadResult;
+
+        [TestInitialize]
+        public void Init()
+        {
+            _pipeStream = new PipeStream();
+            _pipeStream.WriteByte(10);
+            _pipeStream.WriteByte(13);
+            _pipeStream.WriteByte(15);
+            _pipeStream.WriteByte(18);
+            _pipeStream.WriteByte(23);
+            _pipeStream.WriteByte(28);
+
+            _bytesRead = 0;
+            _readBuffer = new byte[4];
+
+            Action readAction = () => _bytesRead = _pipeStream.Read(_readBuffer, 0, _readBuffer.Length);
+            _asyncReadResult = readAction.BeginInvoke(null, null);
+            _asyncReadResult.AsyncWaitHandle.WaitOne(50);
+
+            Act();
+        }
+
+        protected void Act()
+        {
+            _pipeStream.Flush();
+        }
+
+        [TestMethod]
+        public void AsyncReadShouldHaveFinished()
+        {
+            Assert.IsTrue(_asyncReadResult.IsCompleted);
+        }
+
+        [TestMethod]
+        public void ReadShouldReturnNumberOfBytesAvailableThatAreWrittenToBuffer()
+        {
+            Assert.AreEqual(4, _bytesRead);
+        }
+
+        [TestMethod]
+        public void BytesAvailableInStreamShouldHaveBeenWrittenToBuffer()
+        {
+            Assert.AreEqual(10, _readBuffer[0]);
+            Assert.AreEqual(13, _readBuffer[1]);
+            Assert.AreEqual(15, _readBuffer[2]);
+            Assert.AreEqual(18, _readBuffer[3]);
+        }
+
+        [TestMethod]
+        public void RemainingBytesCanBeRead()
+        {
+            var buffer = new byte[3];
+
+            var bytesRead = _pipeStream.Read(buffer, 0, 2);
+
+            Assert.AreEqual(2, bytesRead);
+            Assert.AreEqual(23, buffer[0]);
+            Assert.AreEqual(28, buffer[1]);
+            Assert.AreEqual(0, buffer[2]);
+        }
+
+        [TestMethod]
+        public void ReadingMoreBytesThanAvailableDoesNotBlock()
+        {
+            var buffer = new byte[4];
+
+            var bytesRead = _pipeStream.Read(buffer, 0, buffer.Length);
+
+            Assert.AreEqual(2, bytesRead);
+            Assert.AreEqual(23, buffer[0]);
+            Assert.AreEqual(28, buffer[1]);
+            Assert.AreEqual(0, buffer[2]);
+            Assert.AreEqual(0, buffer[3]);
+        }
+
+        [TestMethod]
+        public void WriteCausesSubsequentReadToBlockUntilRequestedNumberOfBytesAreAvailable()
+        {
+            _pipeStream.WriteByte(32);
+
+            var buffer = new byte[4];
+            int bytesRead = int.MaxValue;
+
+            Thread readThread = new Thread(() =>
+            {
+                bytesRead = _pipeStream.Read(buffer, 0, buffer.Length);
+            });
+            readThread.Start();
+
+            Assert.IsFalse(readThread.Join(500));
+            readThread.Abort();
+
+            Assert.AreEqual(int.MaxValue, bytesRead);
+            Assert.AreEqual(0, buffer[0]);
+            Assert.AreEqual(0, buffer[1]);
+            Assert.AreEqual(0, buffer[2]);
+            Assert.AreEqual(0, buffer[3]);
+        }
+    }
+}

+ 58 - 0
src/Renci.SshNet.Tests/Common/PipeStream_Flush_NoBytesRemainingAfterRead.cs

@@ -0,0 +1,58 @@
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Renci.SshNet.Common;
+
+namespace Renci.SshNet.Tests.Common
+{
+    [TestClass]
+    public class PipeStream_Flush_NoBytesRemainingAfterRead
+    {
+        private PipeStream _pipeStream;
+        private byte[] _readBuffer;
+        private int _bytesRead;
+        private IAsyncResult _asyncReadResult;
+
+        [TestInitialize]
+        public void Init()
+        {
+            _pipeStream = new PipeStream();
+            _pipeStream.WriteByte(10);
+            _pipeStream.WriteByte(13);
+
+            _bytesRead = 0;
+            _readBuffer = new byte[4];
+
+            Action readAction = () => _bytesRead = _pipeStream.Read(_readBuffer, 0, _readBuffer.Length);
+            _asyncReadResult = readAction.BeginInvoke(null, null);
+            _asyncReadResult.AsyncWaitHandle.WaitOne(50);
+
+            Act();
+        }
+
+        protected void Act()
+        {
+            _pipeStream.Flush();
+        }
+
+        [TestMethod]
+        public void AsyncReadShouldHaveFinished()
+        {
+            Assert.IsTrue(_asyncReadResult.IsCompleted);
+        }
+
+        [TestMethod]
+        public void ReadShouldReturnNumberOfBytesAvailableThatAreWrittenToBuffer()
+        {
+            Assert.AreEqual(2, _bytesRead);
+        }
+
+        [TestMethod]
+        public void BytesAvailableInStreamShouldHaveBeenWrittenToBuffer()
+        {
+            Assert.AreEqual(10, _readBuffer[0]);
+            Assert.AreEqual(13, _readBuffer[1]);
+            Assert.AreEqual(0, _readBuffer[2]);
+            Assert.AreEqual(0, _readBuffer[3]);
+        }
+    }
+}

+ 4 - 0
src/Renci.SshNet.Tests/Renci.SshNet.Tests.csproj

@@ -441,6 +441,10 @@
     <Compile Include="Common\AsyncSocketListener.cs" />
     <Compile Include="Common\HttpProxyStub.cs" />
     <Compile Include="Common\HttpRequest.cs" />
+    <Compile Include="Common\PipeStream_Close_BlockingRead.cs" />
+    <Compile Include="Common\PipeStream_Close_BlockingWrite.cs" />
+    <Compile Include="Common\PipeStream_Flush_BytesRemainingAfterRead.cs" />
+    <Compile Include="Common\PipeStream_Flush_NoBytesRemainingAfterRead.cs" />
     <Compile Include="Common\TestBase.cs" />
     <Compile Include="Classes\Compression\CompressorTest.cs" />
     <Compile Include="Classes\Common\DerDataTest.cs" />

+ 29 - 3
src/Renci.SshNet/Common/PipeStream.cs

@@ -122,6 +122,10 @@
         /// </summary>
         /// <exception cref="IOException">An I/O error occurs.</exception>
         /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
+        /// <remarks>
+        /// Once flushed, any subsequent read operations no longer block until requested bytes are available. Any write operation reactivates blocking
+        /// reads.
+        /// </remarks>
         public override void Flush()
         {
             if (_isDisposed)
@@ -129,7 +133,10 @@
 
             _isFlushed = true;
             lock (_buffer)
+            {
+                // unblock read hereby allowing buffer to be partially filled
                 Monitor.Pulse(_buffer);
+            }
         }
 
         /// <summary>
@@ -160,7 +167,7 @@
         ///When overridden in a derived class, reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
         ///</summary>
         ///<returns>
-        ///The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero if the end of the stream has been reached.
+        ///The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero if the stream is closed or end of the stream has been reached.
         ///</returns>
         ///<param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the current stream.</param>
         ///<param name="count">The maximum number of bytes to be read from the current stream.</param>
@@ -192,8 +199,16 @@
 
             lock (_buffer)
             {
-                while (!ReadAvailable(count))
+                while (!_isDisposed && !ReadAvailable(count))
+                {
                     Monitor.Wait(_buffer);
+                }
+
+                // return zero when the read is interrupted by a close/dispose of the stream
+                if (_isDisposed)
+                {
+                    return 0;
+                }
 
                 // fill the read buffer
                 for (; readLength < count && Length > 0 && _buffer.Count > 0; readLength++)
@@ -203,6 +218,7 @@
 
                 Monitor.Pulse(_buffer);
             }
+
             return readLength;
         }
 
@@ -264,11 +280,21 @@
         /// Releases the unmanaged resources used by the Stream and optionally releases the managed resources.
         /// </summary>
         /// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
+        /// <remarks>
+        /// Disposing a <see cref="PipeStream"/> will interrupt blocking read and write operations.
+        /// </remarks>
         protected override void Dispose(bool disposing)
         {
             base.Dispose(disposing);
 
-            _isDisposed = true;
+            if (!_isDisposed)
+            {
+                lock (_buffer)
+                {
+                    _isDisposed = true;
+                    Monitor.Pulse(_buffer);
+                }
+            }
         }
 
         ///<summary>