AsyncSocketListener.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. using System.Net.Sockets;
  5. using System.Threading;
  6. namespace Renci.SshNet.Tests.Common
  7. {
  8. public class AsyncSocketListener : IDisposable
  9. {
  10. private readonly IPEndPoint _endPoint;
  11. private readonly ManualResetEvent _acceptCallbackDone;
  12. private List<Socket> _connectedClients;
  13. private Socket _listener;
  14. private Thread _receiveThread;
  15. private bool _started;
  16. private object _syncLock;
  17. private string _stackTrace;
  18. public delegate void BytesReceivedHandler(byte[] bytesReceived, Socket socket);
  19. public delegate void ConnectedHandler(Socket socket);
  20. public event BytesReceivedHandler BytesReceived;
  21. public event ConnectedHandler Connected;
  22. public event ConnectedHandler Disconnected;
  23. public AsyncSocketListener(IPEndPoint endPoint)
  24. {
  25. _endPoint = endPoint;
  26. _acceptCallbackDone = new ManualResetEvent(false);
  27. _connectedClients = new List<Socket>();
  28. _syncLock = new object();
  29. ShutdownRemoteCommunicationSocket = true;
  30. }
  31. /// <summary>
  32. /// Gets a value indicating whether the <see cref="Socket.Shutdown(SocketShutdown)"/> is invoked on the <see cref="Socket"/>
  33. /// that is used to handle the communication with the remote host, when the remote host has closed the connection.
  34. /// </summary>
  35. /// <value>
  36. /// <see langword="true"/> to invoke <see cref="Socket.Shutdown(SocketShutdown)"/> on the <see cref="Socket"/> that is used
  37. /// to handle the communication with the remote host, when the remote host has closed the connection; otherwise,
  38. /// <see langword="false""/>. The default is <see langword="true"/>.
  39. /// </value>
  40. public bool ShutdownRemoteCommunicationSocket { get; set; }
  41. public void Start()
  42. {
  43. _listener = new Socket(_endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  44. _listener.Bind(_endPoint);
  45. _listener.Listen(1);
  46. _started = true;
  47. _receiveThread = new Thread(StartListener);
  48. _receiveThread.Start(_listener);
  49. _stackTrace = Environment.StackTrace;
  50. }
  51. public void Stop()
  52. {
  53. _started = false;
  54. lock (_syncLock)
  55. {
  56. foreach (var connectedClient in _connectedClients)
  57. {
  58. try
  59. {
  60. connectedClient.Shutdown(SocketShutdown.Send);
  61. }
  62. catch (Exception ex)
  63. {
  64. Console.Error.WriteLine("[{0}] Failure shutting down socket: {1}",
  65. typeof(AsyncSocketListener).FullName,
  66. ex);
  67. }
  68. DrainSocket(connectedClient);
  69. connectedClient.Dispose();
  70. }
  71. _connectedClients.Clear();
  72. }
  73. if (_listener != null)
  74. {
  75. _listener.Dispose();
  76. }
  77. if (_receiveThread != null)
  78. {
  79. _receiveThread.Join();
  80. _receiveThread = null;
  81. }
  82. }
  83. public void Dispose()
  84. {
  85. Stop();
  86. GC.SuppressFinalize(this);
  87. }
  88. private void StartListener(object state)
  89. {
  90. try
  91. {
  92. var listener = (Socket)state;
  93. while (_started)
  94. {
  95. _acceptCallbackDone.Reset();
  96. listener.BeginAccept(AcceptCallback, listener);
  97. _acceptCallbackDone.WaitOne();
  98. }
  99. }
  100. catch (Exception ex)
  101. {
  102. // On .NET framework when Thread throws an exception then unit tests
  103. // were executed without any problem.
  104. // On new .NET exceptions from Thread breaks unit tests session.
  105. Console.Error.WriteLine("[{0}] Failure in StartListener: {1}",
  106. typeof(AsyncSocketListener).FullName,
  107. ex);
  108. }
  109. }
  110. private void AcceptCallback(IAsyncResult ar)
  111. {
  112. // Signal the main thread to continue
  113. _acceptCallbackDone.Set();
  114. // Get the socket that listens for inbound connections
  115. var listener = (Socket)ar.AsyncState;
  116. // Get the socket that handles the client request
  117. Socket handler;
  118. try
  119. {
  120. handler = listener.EndAccept(ar);
  121. }
  122. catch (SocketException ex)
  123. {
  124. // The listener is stopped through a Dispose() call, which in turn causes
  125. // Socket.EndAccept(...) to throw a SocketException or
  126. // ObjectDisposedException
  127. //
  128. // Since we consider such an exception normal when the listener is being
  129. // stopped, we only write a message to stderr if the listener is considered
  130. // to be up and running
  131. if (_started)
  132. {
  133. Console.Error.WriteLine("[{0}] Failure accepting new connection: {1}",
  134. typeof(AsyncSocketListener).FullName,
  135. ex);
  136. }
  137. return;
  138. }
  139. catch (ObjectDisposedException ex)
  140. {
  141. // The listener is stopped through a Dispose() call, which in turn causes
  142. // Socket.EndAccept(IAsyncResult) to throw a SocketException or
  143. // ObjectDisposedException
  144. //
  145. // Since we consider such an exception normal when the listener is being
  146. // stopped, we only write a message to stderr if the listener is considered
  147. // to be up and running
  148. if (_started)
  149. {
  150. Console.Error.WriteLine("[{0}] Failure accepting new connection: {1}",
  151. typeof(AsyncSocketListener).FullName,
  152. ex);
  153. }
  154. return;
  155. }
  156. // Signal new connection
  157. SignalConnected(handler);
  158. lock (_syncLock)
  159. {
  160. // Register client socket
  161. _connectedClients.Add(handler);
  162. }
  163. var state = new SocketStateObject(handler);
  164. try
  165. {
  166. handler.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, ReadCallback, state);
  167. }
  168. catch (SocketException ex)
  169. {
  170. // The listener is stopped through a Dispose() call, which in turn causes
  171. // Socket.BeginReceive(...) to throw a SocketException or
  172. // ObjectDisposedException
  173. //
  174. // Since we consider such an exception normal when the listener is being
  175. // stopped, we only write a message to stderr if the listener is considered
  176. // to be up and running
  177. if (_started)
  178. {
  179. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  180. typeof(AsyncSocketListener).FullName,
  181. ex);
  182. }
  183. }
  184. catch (ObjectDisposedException ex)
  185. {
  186. // The listener is stopped through a Dispose() call, which in turn causes
  187. // Socket.BeginReceive(...) to throw a SocketException or
  188. // ObjectDisposedException
  189. //
  190. // Since we consider such an exception normal when the listener is being
  191. // stopped, we only write a message to stderr if the listener is considered
  192. // to be up and running
  193. if (_started)
  194. {
  195. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  196. typeof(AsyncSocketListener).FullName,
  197. ex);
  198. }
  199. }
  200. }
  201. private void ReadCallback(IAsyncResult ar)
  202. {
  203. // Retrieve the state object and the handler socket
  204. // from the asynchronous state object
  205. var state = (SocketStateObject)ar.AsyncState;
  206. var handler = state.Socket;
  207. int bytesRead;
  208. try
  209. {
  210. // Read data from the client socket.
  211. bytesRead = handler.EndReceive(ar, out var errorCode);
  212. if (errorCode != SocketError.Success)
  213. {
  214. bytesRead = 0;
  215. }
  216. }
  217. catch (SocketException ex)
  218. {
  219. // The listener is stopped through a Dispose() call, which in turn causes
  220. // Socket.EndReceive(...) to throw a SocketException or
  221. // ObjectDisposedException
  222. //
  223. // Since we consider such an exception normal when the listener is being
  224. // stopped, we only write a message to stderr if the listener is considered
  225. // to be up and running
  226. if (_started)
  227. {
  228. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  229. typeof(AsyncSocketListener).FullName,
  230. ex);
  231. }
  232. return;
  233. }
  234. catch (ObjectDisposedException ex)
  235. {
  236. // The listener is stopped through a Dispose() call, which in turn causes
  237. // Socket.EndReceive(...) to throw a SocketException or
  238. // ObjectDisposedException
  239. //
  240. // Since we consider such an exception normal when the listener is being
  241. // stopped, we only write a message to stderr if the listener is considered
  242. // to be up and running
  243. if (_started)
  244. {
  245. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  246. typeof(AsyncSocketListener).FullName,
  247. ex);
  248. }
  249. return;
  250. }
  251. void ConnectionDisconnected()
  252. {
  253. SignalDisconnected(handler);
  254. if (ShutdownRemoteCommunicationSocket)
  255. {
  256. lock (_syncLock)
  257. {
  258. if (!_started)
  259. {
  260. return;
  261. }
  262. try
  263. {
  264. handler.Shutdown(SocketShutdown.Send);
  265. handler.Close();
  266. }
  267. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
  268. {
  269. // On .NET 7 we got Socker Exception with ConnectionReset from Shutdown method
  270. // when the socket is disposed
  271. }
  272. catch (SocketException ex)
  273. {
  274. throw new Exception("Exception in ReadCallback: " + ex.SocketErrorCode + " " + _stackTrace, ex);
  275. }
  276. catch (Exception ex)
  277. {
  278. throw new Exception("Exception in ReadCallback: " + _stackTrace, ex);
  279. }
  280. _connectedClients.Remove(handler);
  281. }
  282. }
  283. }
  284. if (bytesRead > 0)
  285. {
  286. var bytesReceived = new byte[bytesRead];
  287. Array.Copy(state.Buffer, bytesReceived, bytesRead);
  288. SignalBytesReceived(bytesReceived, handler);
  289. try
  290. {
  291. handler.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, ReadCallback, state);
  292. }
  293. catch (ObjectDisposedException)
  294. {
  295. // TODO On .NET 7, sometimes we get ObjectDisposedException when _started but only on appveyor, locally it works
  296. ConnectionDisconnected();
  297. }
  298. catch (SocketException ex)
  299. {
  300. if (!_started)
  301. {
  302. throw new Exception("BeginReceive while stopping!", ex);
  303. }
  304. throw new Exception("BeginReceive while started!: " + ex.SocketErrorCode + " " + _stackTrace, ex);
  305. }
  306. }
  307. else
  308. {
  309. ConnectionDisconnected();
  310. }
  311. }
  312. private void SignalBytesReceived(byte[] bytesReceived, Socket client)
  313. {
  314. var subscribers = BytesReceived;
  315. if (subscribers != null)
  316. subscribers(bytesReceived, client);
  317. }
  318. private void SignalConnected(Socket client)
  319. {
  320. var subscribers = Connected;
  321. if (subscribers != null)
  322. subscribers(client);
  323. }
  324. private void SignalDisconnected(Socket client)
  325. {
  326. var subscribers = Disconnected;
  327. if (subscribers != null)
  328. subscribers(client);
  329. }
  330. private static void DrainSocket(Socket socket)
  331. {
  332. var buffer = new byte[128];
  333. try
  334. {
  335. while (true && socket.Connected)
  336. {
  337. var bytesRead = socket.Receive(buffer);
  338. if (bytesRead == 0)
  339. {
  340. break;
  341. }
  342. }
  343. }
  344. catch (SocketException ex)
  345. {
  346. Console.Error.WriteLine("[{0}] Failure draining socket ({1}): {2}",
  347. typeof(AsyncSocketListener).FullName,
  348. ex.SocketErrorCode.ToString("G"),
  349. ex);
  350. }
  351. }
  352. private class SocketStateObject
  353. {
  354. public Socket Socket { get; private set; }
  355. public readonly byte[] Buffer = new byte[1024];
  356. public SocketStateObject(Socket handler)
  357. {
  358. Socket = handler;
  359. }
  360. }
  361. }
  362. }