SftpFileReader.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Globalization;
  4. using System.Runtime.ExceptionServices;
  5. using System.Threading;
  6. using Microsoft.Extensions.Logging;
  7. using Renci.SshNet.Abstractions;
  8. using Renci.SshNet.Common;
  9. namespace Renci.SshNet.Sftp
  10. {
  11. internal sealed class SftpFileReader : ISftpFileReader
  12. {
  13. private const int ReadAheadWaitTimeoutInMilliseconds = 1000;
  14. private readonly byte[] _handle;
  15. private readonly ISftpSession _sftpSession;
  16. private readonly uint _chunkSize;
  17. private readonly SemaphoreSlim _semaphore;
  18. private readonly object _readLock;
  19. private readonly ManualResetEvent _disposingWaitHandle;
  20. private readonly ManualResetEvent _readAheadCompleted;
  21. private readonly Dictionary<int, BufferedRead> _queue;
  22. private readonly WaitHandle[] _waitHandles;
  23. private readonly ILogger _logger;
  24. /// <summary>
  25. /// Holds the size of the file, when available.
  26. /// </summary>
  27. private readonly long? _fileSize;
  28. private ulong _offset;
  29. private int _readAheadChunkIndex;
  30. private ulong _readAheadOffset;
  31. private int _nextChunkIndex;
  32. /// <summary>
  33. /// Holds a value indicating whether EOF has already been signaled by the SSH server.
  34. /// </summary>
  35. private bool _endOfFileReceived;
  36. /// <summary>
  37. /// Holds a value indicating whether the client has read up to the end of the file.
  38. /// </summary>
  39. private bool _isEndOfFileRead;
  40. private bool _disposingOrDisposed;
  41. private Exception _exception;
  42. /// <summary>
  43. /// Initializes a new instance of the <see cref="SftpFileReader"/> class with the specified handle,
  44. /// <see cref="ISftpSession"/> and the maximum number of pending reads.
  45. /// </summary>
  46. /// <param name="handle">The file handle.</param>
  47. /// <param name="sftpSession">The SFT session.</param>
  48. /// <param name="chunkSize">The size of a individual read-ahead chunk.</param>
  49. /// <param name="maxPendingReads">The maximum number of pending reads.</param>
  50. /// <param name="fileSize">The size of the file, if known; otherwise, <see langword="null"/>.</param>
  51. public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
  52. {
  53. _handle = handle;
  54. _sftpSession = sftpSession;
  55. _chunkSize = chunkSize;
  56. _fileSize = fileSize;
  57. _semaphore = new SemaphoreSlim(maxPendingReads);
  58. _queue = new Dictionary<int, BufferedRead>(maxPendingReads);
  59. _readLock = new object();
  60. _readAheadCompleted = new ManualResetEvent(initialState: false);
  61. _disposingWaitHandle = new ManualResetEvent(initialState: false);
  62. _waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
  63. _logger = sftpSession.SessionLoggerFactory.CreateLogger<SftpFileReader>();
  64. StartReadAhead();
  65. }
  66. public byte[] Read()
  67. {
  68. ThrowHelper.ThrowObjectDisposedIf(_disposingOrDisposed, this);
  69. if (_exception is not null)
  70. {
  71. ExceptionDispatchInfo.Capture(_exception).Throw();
  72. }
  73. if (_isEndOfFileRead)
  74. {
  75. throw new SshException("Attempting to read beyond the end of the file.");
  76. }
  77. BufferedRead nextChunk;
  78. lock (_readLock)
  79. {
  80. // wait until either the next chunk is available, an exception has occurred or the current
  81. // instance is already disposed
  82. while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception is null)
  83. {
  84. _ = Monitor.Wait(_readLock);
  85. }
  86. // throw when exception occured in read-ahead, or the current instance is already disposed
  87. if (_exception != null)
  88. {
  89. ExceptionDispatchInfo.Capture(_exception).Throw();
  90. }
  91. var data = nextChunk.Data;
  92. if (nextChunk.Offset == _offset)
  93. {
  94. // have we reached EOF?
  95. if (data.Length == 0)
  96. {
  97. // PERF: we do not bother updating all of the internal state when we've reached EOF
  98. _isEndOfFileRead = true;
  99. }
  100. else
  101. {
  102. // remove processed chunk
  103. _ = _queue.Remove(_nextChunkIndex);
  104. // update offset
  105. _offset += (ulong)data.Length;
  106. // move to next chunk
  107. _nextChunkIndex++;
  108. }
  109. // unblock wait in read-ahead
  110. _ = _semaphore.Release();
  111. return data;
  112. }
  113. // When we received an EOF for the next chunk and the size of the file is known, then
  114. // we only complete the current chunk if we haven't already read up to the file size.
  115. // This way we save an extra round-trip to the server.
  116. if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value)
  117. {
  118. // avoid future reads
  119. _isEndOfFileRead = true;
  120. // unblock wait in read-ahead
  121. _ = _semaphore.Release();
  122. // signal EOF to caller
  123. return nextChunk.Data;
  124. }
  125. }
  126. /*
  127. * When the server returned less bytes than requested (for the previous chunk)
  128. * we'll synchronously request the remaining data.
  129. *
  130. * Due to the optimization above, we'll only get here in one of the following cases:
  131. * - an EOF situation for files for which we were unable to obtain the file size
  132. * - fewer bytes that requested were returned
  133. *
  134. * According to the SSH specification, this last case should never happen for normal
  135. * disk files (but can happen for device files). In practice, OpenSSH - for example -
  136. * returns less bytes than requested when requesting more than 64 KB.
  137. *
  138. * Important:
  139. * To avoid a deadlock, this read must be done outside of the read lock.
  140. */
  141. var bytesToCatchUp = nextChunk.Offset - _offset;
  142. /*
  143. * TODO: break loop and interrupt blocking wait in case of exception
  144. */
  145. var read = _sftpSession.RequestRead(_handle, _offset, (uint)bytesToCatchUp);
  146. if (read.Length == 0)
  147. {
  148. // process data in read lock to avoid ObjectDisposedException while releasing semaphore
  149. lock (_readLock)
  150. {
  151. // a zero-length (EOF) response is only valid for the read-back when EOF has
  152. // been signaled for the next read-ahead chunk
  153. if (nextChunk.Data.Length == 0)
  154. {
  155. _isEndOfFileRead = true;
  156. // ensure we've not yet disposed the current instance
  157. if (!_disposingOrDisposed)
  158. {
  159. // unblock wait in read-ahead
  160. _ = _semaphore.Release();
  161. }
  162. // signal EOF to caller
  163. return read;
  164. }
  165. // move reader to error state
  166. _exception = new SshException("Unexpectedly reached end of file.");
  167. // ensure we've not yet disposed the current instance
  168. if (!_disposingOrDisposed)
  169. {
  170. // unblock wait in read-ahead
  171. _ = _semaphore.Release();
  172. }
  173. // notify caller of error
  174. throw _exception;
  175. }
  176. }
  177. _offset += (uint)read.Length;
  178. return read;
  179. }
  180. public void Dispose()
  181. {
  182. Dispose(disposing: true);
  183. GC.SuppressFinalize(this);
  184. }
  185. /// <summary>
  186. /// Releases unmanaged and - optionally - managed resources.
  187. /// </summary>
  188. /// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
  189. private void Dispose(bool disposing)
  190. {
  191. if (_disposingOrDisposed)
  192. {
  193. return;
  194. }
  195. // transition to disposing state
  196. _disposingOrDisposed = true;
  197. if (disposing)
  198. {
  199. // record exception to break prevent future Read()
  200. _exception = new ObjectDisposedException(GetType().FullName);
  201. // signal that we're disposing to interrupt wait in read-ahead
  202. _ = _disposingWaitHandle.Set();
  203. // wait until the read-ahead thread has completed
  204. _ = _readAheadCompleted.WaitOne();
  205. // unblock the Read()
  206. lock (_readLock)
  207. {
  208. // dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
  209. // in Read()
  210. _semaphore.Dispose();
  211. // awake Read
  212. Monitor.PulseAll(_readLock);
  213. }
  214. _readAheadCompleted.Dispose();
  215. _disposingWaitHandle.Dispose();
  216. if (_sftpSession.IsOpen)
  217. {
  218. try
  219. {
  220. var closeAsyncResult = _sftpSession.BeginClose(_handle, callback: null, state: null);
  221. _sftpSession.EndClose(closeAsyncResult);
  222. }
  223. catch (Exception ex)
  224. {
  225. _logger.LogInformation(ex, "Failure closing handle");
  226. }
  227. }
  228. }
  229. }
  230. private void StartReadAhead()
  231. {
  232. ThreadAbstraction.ExecuteThread(() =>
  233. {
  234. while (!_endOfFileReceived && _exception is null)
  235. {
  236. // check if we should continue with the read-ahead loop
  237. // note that the EOF and exception check are not included
  238. // in this check as they do not require Read() to be
  239. // unblocked (or have already done this)
  240. if (!ContinueReadAhead())
  241. {
  242. // unblock the Read()
  243. lock (_readLock)
  244. {
  245. Monitor.PulseAll(_readLock);
  246. }
  247. // break the read-ahead loop
  248. break;
  249. }
  250. // attempt to obtain the semaphore; this may time out when all semaphores are
  251. // in use due to pending read-aheads (which in turn can happen when the server
  252. // is slow to respond or when the session is broken)
  253. if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))
  254. {
  255. // re-evaluate whether an exception occurred, and - if not - wait again
  256. continue;
  257. }
  258. // don't bother reading any more chunks if we received EOF, an exception has occurred
  259. // or the current instance is disposed
  260. if (_endOfFileReceived || _exception != null)
  261. {
  262. break;
  263. }
  264. // start reading next chunk
  265. var bufferedRead = new BufferedRead(_readAheadChunkIndex, _readAheadOffset);
  266. try
  267. {
  268. // even if we know the size of the file and have read up to EOF, we still want
  269. // to keep reading (ahead) until we receive zero bytes from the remote host as
  270. // we do not want to rely purely on the reported file size
  271. //
  272. // if the offset of the read-ahead chunk is greater than that file size, then
  273. // we can expect to be reading the last (zero-byte) chunk and switch to synchronous
  274. // mode to avoid having multiple read-aheads that read beyond EOF
  275. if (_fileSize != null && (long)_readAheadOffset > _fileSize.Value)
  276. {
  277. var asyncResult = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, callback: null, bufferedRead);
  278. var data = _sftpSession.EndRead(asyncResult);
  279. ReadCompletedCore(bufferedRead, data);
  280. }
  281. else
  282. {
  283. _ = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, bufferedRead);
  284. }
  285. }
  286. catch (Exception ex)
  287. {
  288. HandleFailure(ex);
  289. break;
  290. }
  291. // advance read-ahead offset
  292. _readAheadOffset += _chunkSize;
  293. // increment index of read-ahead chunk
  294. _readAheadChunkIndex++;
  295. }
  296. _ = _readAheadCompleted.Set();
  297. });
  298. }
  299. /// <summary>
  300. /// Returns a value indicating whether the read-ahead loop should be continued.
  301. /// </summary>
  302. /// <returns>
  303. /// <see langword="true"/> if the read-ahead loop should be continued; otherwise, <see langword="false"/>.
  304. /// </returns>
  305. private bool ContinueReadAhead()
  306. {
  307. try
  308. {
  309. var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
  310. switch (waitResult)
  311. {
  312. case 0: // disposing
  313. return false;
  314. case 1: // semaphore available
  315. return true;
  316. default:
  317. throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", waitResult));
  318. }
  319. }
  320. catch (Exception ex)
  321. {
  322. _ = Interlocked.CompareExchange(ref _exception, ex, comparand: null);
  323. return false;
  324. }
  325. }
  326. private void ReadCompleted(IAsyncResult result)
  327. {
  328. if (_disposingOrDisposed)
  329. {
  330. // skip further processing if we're disposing the current instance
  331. // to avoid accessing disposed members
  332. return;
  333. }
  334. var readAsyncResult = (SftpReadAsyncResult)result;
  335. byte[] data;
  336. try
  337. {
  338. data = readAsyncResult.EndInvoke();
  339. }
  340. catch (Exception ex)
  341. {
  342. HandleFailure(ex);
  343. return;
  344. }
  345. // a read that completes with a zero-byte result signals EOF
  346. // but there may be pending reads before that read
  347. var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
  348. ReadCompletedCore(bufferedRead, data);
  349. }
  350. private void ReadCompletedCore(BufferedRead bufferedRead, byte[] data)
  351. {
  352. bufferedRead.Complete(data);
  353. lock (_readLock)
  354. {
  355. // add item to queue
  356. _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
  357. // Signal that a chunk has been read or EOF has been reached.
  358. // In both cases, Read() will eventually also unblock the "read-ahead" thread.
  359. Monitor.PulseAll(_readLock);
  360. }
  361. // check if server signaled EOF
  362. if (data.Length == 0)
  363. {
  364. // set a flag to stop read-aheads
  365. _endOfFileReceived = true;
  366. }
  367. }
  368. private void HandleFailure(Exception cause)
  369. {
  370. _ = Interlocked.CompareExchange(ref _exception, cause, comparand: null);
  371. // unblock read-ahead
  372. _ = _semaphore.Release();
  373. // unblock Read()
  374. lock (_readLock)
  375. {
  376. Monitor.PulseAll(_readLock);
  377. }
  378. }
  379. internal sealed class BufferedRead
  380. {
  381. public int ChunkIndex { get; }
  382. public byte[] Data { get; private set; }
  383. public ulong Offset { get; }
  384. public BufferedRead(int chunkIndex, ulong offset)
  385. {
  386. ChunkIndex = chunkIndex;
  387. Offset = offset;
  388. }
  389. public void Complete(byte[] data)
  390. {
  391. Data = data;
  392. }
  393. }
  394. }
  395. }