Browse Source

Added ReadContinuous method.

drieseng 9 năm trước cách đây
mục cha
commit
21e81d0c78
1 tập tin đã thay đổi với 123 bổ sung0 xóa
  1. 123 0
      src/Renci.SshNet/Abstractions/SocketAbstraction.cs

+ 123 - 0
src/Renci.SshNet/Abstractions/SocketAbstraction.cs

@@ -153,6 +153,68 @@ namespace Renci.SshNet.Abstractions
 #endif
         }
 
+        public static void ReadContinuous(Socket socket, byte[] buffer, int offset, int size, Action<byte[], int, int> processReceivedBytesAction)
+        {
+#if FEATURE_SOCKET_SYNC
+            // do not time-out receive
+            socket.ReceiveTimeout = 0;
+
+            while (socket.Connected)
+            {
+                try
+                {
+                    var bytesRead = socket.Receive(buffer, offset, size, SocketFlags.None);
+                    if (bytesRead == 0)
+                        break;
+
+                    processReceivedBytesAction(buffer, offset, bytesRead);
+                }
+                catch (SocketException ex)
+                {
+                    if (IsErrorResumable(ex.SocketErrorCode))
+                        continue;
+
+                    switch (ex.SocketErrorCode)
+                    {
+                        case SocketError.ConnectionAborted:
+                        case SocketError.ConnectionReset:
+                            // connection was closed
+                            return;
+                        case SocketError.Interrupted:
+                            // connection was closed because FIN/ACK was not received in time after
+                            // shutting down the (send part of the) socket
+                            return;
+                        default:
+                            throw; // throw any other error
+                    }
+                }
+            }
+#elif FEATURE_SOCKET_EAP
+            var completionWaitHandle = new ManualResetEvent(false);
+            var readToken = new ContinuousReceiveToken(socket, processReceivedBytesAction, completionWaitHandle);
+            var args = new SocketAsyncEventArgs
+            {
+                RemoteEndPoint = socket.RemoteEndPoint,
+                UserToken = readToken
+            };
+            args.Completed += ReceiveCompleted;
+            args.SetBuffer(buffer, offset, size);
+
+            if (!socket.ReceiveAsync(args))
+            {
+                ReceiveCompleted(null, args);
+            }
+
+            completionWaitHandle.WaitOne();
+            completionWaitHandle.Dispose();
+
+            if (readToken.Exception != null)
+                throw readToken.Exception;
+#else
+#error Receiving data from a Socket is not implemented.
+#endif
+        }
+
         /// <summary>
         /// Reads a byte from the specified <see cref="Socket"/>.
         /// </summary>
@@ -502,6 +564,67 @@ namespace Renci.SshNet.Abstractions
             private readonly EventWaitHandle _completionWaitHandle;
             private readonly Socket _socket;
         }
+
+        private class ContinuousReceiveToken : Token
+        {
+            public ContinuousReceiveToken(Socket socket, Action<byte[], int, int> processReceivedBytesAction, EventWaitHandle completionWaitHandle)
+            {
+                _socket = socket;
+                _processReceivedBytesAction = processReceivedBytesAction;
+                _completionWaitHandle = completionWaitHandle;
+            }
+
+            public Exception Exception { get; private set; }
+
+            public void Process(SocketAsyncEventArgs args)
+            {
+                if (args.SocketError == SocketError.Success)
+                {
+                    if (args.BytesTransferred == 0)
+                    {
+                        // remote socket was closed
+                        _completionWaitHandle.Set();
+                        return;
+                    }
+
+                    _processReceivedBytesAction(args.Buffer, args.Offset, args.BytesTransferred);
+                    ResumeOperation(args);
+                    return;
+                }
+
+                if (IsErrorResumable(args.SocketError))
+                {
+                    ThreadAbstraction.Sleep(30);
+                    ResumeOperation(args);
+                    return;
+                }
+
+                if (args.SocketError != SocketError.OperationAborted)
+                {
+                    Exception = new SocketException((int) args.SocketError);
+                }
+
+                // we're dealing with a (fatal) error
+                _completionWaitHandle.Set();
+            }
+
+            private void ResumeOperation(SocketAsyncEventArgs args)
+            {
+                switch (args.LastOperation)
+                {
+                    case SocketAsyncOperation.Receive:
+                        _socket.ReceiveAsync(args);
+                        break;
+                    case SocketAsyncOperation.Send:
+                        _socket.SendAsync(args);
+                        break;
+                }
+            }
+
+            private readonly EventWaitHandle _completionWaitHandle;
+            private readonly Socket _socket;
+            private readonly Action<byte[], int, int> _processReceivedBytesAction;
+        }
 #endif // FEATURE_SOCKET_EAP && !FEATURE_SOCKET_SYNC
     }
 }