AsyncSocketListener.cs 14 KB


  1. #pragma warning disable IDE0005 // Using directive is unnecessary; IntegrationTests use implicit usings
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using Renci.SshNet.Common;
  8. #pragma warning restore IDE0005
  9. namespace Renci.SshNet.Tests.Common
  10. {
  11. public class AsyncSocketListener : IDisposable
  12. {
  13. private readonly IPEndPoint _endPoint;
  14. private readonly ManualResetEvent _acceptCallbackDone;
  15. private readonly List<Socket> _connectedClients;
  16. private readonly Lock _syncLock;
  17. private Socket _listener;
  18. private Thread _receiveThread;
  19. private bool _started;
  20. private string _stackTrace;
  21. public delegate void BytesReceivedHandler(byte[] bytesReceived, Socket socket);
  22. public delegate void ConnectedHandler(Socket socket);
  23. public event BytesReceivedHandler BytesReceived;
  24. public event ConnectedHandler Connected;
  25. public event ConnectedHandler Disconnected;
  26. public AsyncSocketListener(IPEndPoint endPoint)
  27. {
  28. _endPoint = endPoint;
  29. _acceptCallbackDone = new ManualResetEvent(false);
  30. _connectedClients = new List<Socket>();
  31. _syncLock = new Lock();
  32. ShutdownRemoteCommunicationSocket = true;
  33. }
  34. /// <summary>
  35. /// Gets a value indicating whether the <see cref="Socket.Shutdown(SocketShutdown)"/> is invoked on the <see cref="Socket"/>
  36. /// that is used to handle the communication with the remote host, when the remote host has closed the connection.
  37. /// </summary>
  38. /// <value>
  39. /// <see langword="true"/> to invoke <see cref="Socket.Shutdown(SocketShutdown)"/> on the <see cref="Socket"/> that is used
  40. /// to handle the communication with the remote host, when the remote host has closed the connection; otherwise,
  41. /// <see langword="false"/>. The default is <see langword="true"/>.
  42. /// </value>
  43. public bool ShutdownRemoteCommunicationSocket { get; set; }
  44. public void Start()
  45. {
  46. _listener = new Socket(_endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  47. _listener.Bind(_endPoint);
  48. _listener.Listen(1);
  49. _started = true;
  50. _receiveThread = new Thread(StartListener);
  51. _receiveThread.Start(_listener);
  52. _stackTrace = Environment.StackTrace;
  53. }
  54. public void Stop()
  55. {
  56. _started = false;
  57. lock (_syncLock)
  58. {
  59. foreach (var connectedClient in _connectedClients)
  60. {
  61. try
  62. {
  63. connectedClient.Shutdown(SocketShutdown.Send);
  64. }
  65. catch (Exception ex)
  66. {
  67. Console.Error.WriteLine("[{0}] Failure shutting down socket: {1}",
  68. typeof(AsyncSocketListener).FullName,
  69. ex);
  70. }
  71. DrainSocket(connectedClient);
  72. connectedClient.Dispose();
  73. }
  74. _connectedClients.Clear();
  75. }
  76. _listener?.Dispose();
  77. if (_receiveThread != null)
  78. {
  79. _receiveThread.Join();
  80. _receiveThread = null;
  81. }
  82. }
  83. public void Dispose()
  84. {
  85. Stop();
  86. GC.SuppressFinalize(this);
  87. }
  88. private void StartListener(object state)
  89. {
  90. try
  91. {
  92. var listener = (Socket)state;
  93. while (_started)
  94. {
  95. _ = _acceptCallbackDone.Reset();
  96. _ = listener.BeginAccept(AcceptCallback, listener);
  97. _ = _acceptCallbackDone.WaitOne();
  98. }
  99. }
  100. catch (Exception ex)
  101. {
  102. // On .NET framework when Thread throws an exception then unit tests
  103. // were executed without any problem.
  104. // On new .NET exceptions from Thread breaks unit tests session.
  105. Console.Error.WriteLine("[{0}] Failure in StartListener: {1}",
  106. typeof(AsyncSocketListener).FullName,
  107. ex);
  108. }
  109. }
  110. private void AcceptCallback(IAsyncResult ar)
  111. {
  112. // Signal the main thread to continue
  113. _ = _acceptCallbackDone.Set();
  114. // Get the socket that listens for inbound connections
  115. var listener = (Socket)ar.AsyncState;
  116. // Get the socket that handles the client request
  117. Socket handler;
  118. try
  119. {
  120. handler = listener.EndAccept(ar);
  121. }
  122. catch (SocketException ex)
  123. {
  124. // The listener is stopped through a Dispose() call, which in turn causes
  125. // Socket.EndAccept(...) to throw a SocketException or
  126. // ObjectDisposedException
  127. //
  128. // Since we consider such an exception normal when the listener is being
  129. // stopped, we only write a message to stderr if the listener is considered
  130. // to be up and running
  131. if (_started)
  132. {
  133. Console.Error.WriteLine("[{0}] Failure accepting new connection: {1}",
  134. typeof(AsyncSocketListener).FullName,
  135. ex);
  136. }
  137. return;
  138. }
  139. catch (ObjectDisposedException ex)
  140. {
  141. // The listener is stopped through a Dispose() call, which in turn causes
  142. // Socket.EndAccept(IAsyncResult) to throw a SocketException or
  143. // ObjectDisposedException
  144. //
  145. // Since we consider such an exception normal when the listener is being
  146. // stopped, we only write a message to stderr if the listener is considered
  147. // to be up and running
  148. if (_started)
  149. {
  150. Console.Error.WriteLine("[{0}] Failure accepting new connection: {1}",
  151. typeof(AsyncSocketListener).FullName,
  152. ex);
  153. }
  154. return;
  155. }
  156. // Signal new connection
  157. SignalConnected(handler);
  158. lock (_syncLock)
  159. {
  160. // Register client socket
  161. _connectedClients.Add(handler);
  162. }
  163. var state = new SocketStateObject(handler);
  164. try
  165. {
  166. _ = handler.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, ReadCallback, state);
  167. }
  168. catch (SocketException ex)
  169. {
  170. // The listener is stopped through a Dispose() call, which in turn causes
  171. // Socket.BeginReceive(...) to throw a SocketException or
  172. // ObjectDisposedException
  173. //
  174. // Since we consider such an exception normal when the listener is being
  175. // stopped, we only write a message to stderr if the listener is considered
  176. // to be up and running
  177. if (_started)
  178. {
  179. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  180. typeof(AsyncSocketListener).FullName,
  181. ex);
  182. }
  183. }
  184. catch (ObjectDisposedException ex)
  185. {
  186. // The listener is stopped through a Dispose() call, which in turn causes
  187. // Socket.BeginReceive(...) to throw a SocketException or
  188. // ObjectDisposedException
  189. //
  190. // Since we consider such an exception normal when the listener is being
  191. // stopped, we only write a message to stderr if the listener is considered
  192. // to be up and running
  193. if (_started)
  194. {
  195. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  196. typeof(AsyncSocketListener).FullName,
  197. ex);
  198. }
  199. }
  200. }
  201. private void ReadCallback(IAsyncResult ar)
  202. {
  203. // Retrieve the state object and the handler socket
  204. // from the asynchronous state object
  205. var state = (SocketStateObject)ar.AsyncState;
  206. var handler = state.Socket;
  207. int bytesRead;
  208. try
  209. {
  210. // Read data from the client socket.
  211. bytesRead = handler.EndReceive(ar);
  212. }
  213. catch (SocketException ex)
  214. {
  215. // The listener is stopped through a Dispose() call, which in turn causes
  216. // Socket.EndReceive(...) to throw a SocketException or
  217. // ObjectDisposedException
  218. //
  219. // Since we consider such an exception normal when the listener is being
  220. // stopped, we only write a message to stderr if the listener is considered
  221. // to be up and running
  222. if (_started)
  223. {
  224. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  225. typeof(AsyncSocketListener).FullName,
  226. ex);
  227. }
  228. return;
  229. }
  230. catch (ObjectDisposedException ex)
  231. {
  232. // The listener is stopped through a Dispose() call, which in turn causes
  233. // Socket.EndReceive(...) to throw a SocketException or
  234. // ObjectDisposedException
  235. //
  236. // Since we consider such an exception normal when the listener is being
  237. // stopped, we only write a message to stderr if the listener is considered
  238. // to be up and running
  239. if (_started)
  240. {
  241. Console.Error.WriteLine("[{0}] Failure receiving new data: {1}",
  242. typeof(AsyncSocketListener).FullName,
  243. ex);
  244. }
  245. return;
  246. }
  247. void ConnectionDisconnected()
  248. {
  249. SignalDisconnected(handler);
  250. if (ShutdownRemoteCommunicationSocket)
  251. {
  252. lock (_syncLock)
  253. {
  254. if (!_started)
  255. {
  256. return;
  257. }
  258. try
  259. {
  260. if (handler.Connected)
  261. {
  262. handler.Shutdown(SocketShutdown.Send);
  263. }
  264. handler.Close();
  265. }
  266. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset)
  267. {
  268. // On .NET 7 we got Socker Exception with ConnectionReset from Shutdown method
  269. // when the socket is disposed
  270. }
  271. catch (SocketException ex)
  272. {
  273. throw new Exception("Exception in ReadCallback: " + ex.SocketErrorCode + " " + _stackTrace, ex);
  274. }
  275. catch (Exception ex)
  276. {
  277. throw new Exception("Exception in ReadCallback: " + _stackTrace, ex);
  278. }
  279. _ = _connectedClients.Remove(handler);
  280. }
  281. }
  282. }
  283. if (bytesRead > 0)
  284. {
  285. var bytesReceived = new byte[bytesRead];
  286. Array.Copy(state.Buffer, bytesReceived, bytesRead);
  287. SignalBytesReceived(bytesReceived, handler);
  288. try
  289. {
  290. _ = handler.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, ReadCallback, state);
  291. }
  292. catch (ObjectDisposedException)
  293. {
  294. // TODO On .NET 7, sometimes we get ObjectDisposedException when _started but only in CI, locally it works
  295. ConnectionDisconnected();
  296. }
  297. catch (SocketException ex)
  298. {
  299. if (!_started)
  300. {
  301. throw new Exception("BeginReceive while stopping!", ex);
  302. }
  303. throw new Exception("BeginReceive while started!: " + ex.SocketErrorCode + " " + _stackTrace, ex);
  304. }
  305. }
  306. else
  307. {
  308. ConnectionDisconnected();
  309. }
  310. }
  311. private void SignalBytesReceived(byte[] bytesReceived, Socket client)
  312. {
  313. BytesReceived?.Invoke(bytesReceived, client);
  314. }
  315. private void SignalConnected(Socket client)
  316. {
  317. Connected?.Invoke(client);
  318. }
  319. private void SignalDisconnected(Socket client)
  320. {
  321. Disconnected?.Invoke(client);
  322. }
  323. private static void DrainSocket(Socket socket)
  324. {
  325. var buffer = new byte[128];
  326. try
  327. {
  328. while (true && socket.Connected)
  329. {
  330. var bytesRead = socket.Receive(buffer);
  331. if (bytesRead == 0)
  332. {
  333. break;
  334. }
  335. }
  336. }
  337. catch (SocketException ex)
  338. {
  339. Console.Error.WriteLine("[{0}] Failure draining socket ({1}): {2}",
  340. typeof(AsyncSocketListener).FullName,
  341. ex.SocketErrorCode.ToString("G"),
  342. ex);
  343. }
  344. }
  345. private class SocketStateObject
  346. {
  347. public Socket Socket { get; private set; }
  348. public readonly byte[] Buffer = new byte[2048];
  349. public SocketStateObject(Socket handler)
  350. {
  351. Socket = handler;
  352. }
  353. }
  354. }
  355. }