| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 | using System;using System.Collections.Generic;using System.Globalization;using System.Runtime.ExceptionServices;using System.Threading;using Microsoft.Extensions.Logging;using Renci.SshNet.Abstractions;using Renci.SshNet.Common;namespace Renci.SshNet.Sftp{    internal sealed class SftpFileReader : ISftpFileReader    {        private const int ReadAheadWaitTimeoutInMilliseconds = 1000;        private readonly byte[] _handle;        private readonly ISftpSession _sftpSession;        private readonly uint _chunkSize;        private readonly SemaphoreSlim _semaphore;        private readonly object _readLock;        private readonly ManualResetEvent _disposingWaitHandle;        private readonly ManualResetEvent _readAheadCompleted;        private readonly Dictionary<int, BufferedRead> _queue;        private readonly WaitHandle[] _waitHandles;        private readonly ILogger _logger;        /// <summary>        /// Holds the size of the file, when available.        /// </summary>        private readonly long? _fileSize;        private ulong _offset;        private int _readAheadChunkIndex;        private ulong _readAheadOffset;        private int _nextChunkIndex;        /// <summary>        /// Holds a value indicating whether EOF has already been signaled by the SSH server.        /// </summary>        private bool _endOfFileReceived;        /// <summary>        /// Holds a value indicating whether the client has read up to the end of the file.        /// </summary>        private bool _isEndOfFileRead;        private bool _disposingOrDisposed;        private Exception _exception;        /// <summary>        /// Initializes a new instance of the <see cref="SftpFileReader"/> class with the specified handle,        /// <see cref="ISftpSession"/> and the maximum number of pending reads.        /// </summary>        /// <param name="handle">The file handle.</param>        /// <param name="sftpSession">The SFT session.</param>        /// <param name="chunkSize">The size of a individual read-ahead chunk.</param>        /// <param name="maxPendingReads">The maximum number of pending reads.</param>        /// <param name="fileSize">The size of the file, if known; otherwise, <see langword="null"/>.</param>        public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)        {            _handle = handle;            _sftpSession = sftpSession;            _chunkSize = chunkSize;            _fileSize = fileSize;            _semaphore = new SemaphoreSlim(maxPendingReads);            _queue = new Dictionary<int, BufferedRead>(maxPendingReads);            _readLock = new object();            _readAheadCompleted = new ManualResetEvent(initialState: false);            _disposingWaitHandle = new ManualResetEvent(initialState: false);            _waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);            _logger = sftpSession.SessionLoggerFactory.CreateLogger<SftpFileReader>();            StartReadAhead();        }        public byte[] Read()        {            ThrowHelper.ThrowObjectDisposedIf(_disposingOrDisposed, this);            if (_exception is not null)            {                ExceptionDispatchInfo.Capture(_exception).Throw();            }            if (_isEndOfFileRead)            {                throw new SshException("Attempting to read beyond the end of the file.");            }            BufferedRead nextChunk;            lock (_readLock)            {                // wait until either the next chunk is available, an exception has occurred or the current                // instance is already disposed                while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception is null)                {                    _ = Monitor.Wait(_readLock);                }                // throw when exception occured in read-ahead, or the current instance is already disposed                if (_exception != null)                {                    ExceptionDispatchInfo.Capture(_exception).Throw();                }                var data = nextChunk.Data;                if (nextChunk.Offset == _offset)                {                    // have we reached EOF?                    if (data.Length == 0)                    {                        // PERF: we do not bother updating all of the internal state when we've reached EOF                        _isEndOfFileRead = true;                    }                    else                    {                        // remove processed chunk                        _ = _queue.Remove(_nextChunkIndex);                        // update offset                        _offset += (ulong)data.Length;                        // move to next chunk                        _nextChunkIndex++;                    }                    // unblock wait in read-ahead                    _ = _semaphore.Release();                    return data;                }                // When we received an EOF for the next chunk and the size of the file is known, then                // we only complete the current chunk if we haven't already read up to the file size.                // This way we save an extra round-trip to the server.                if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value)                {                    // avoid future reads                    _isEndOfFileRead = true;                    // unblock wait in read-ahead                    _ = _semaphore.Release();                    // signal EOF to caller                    return nextChunk.Data;                }            }            /*             * When the server returned less bytes than requested (for the previous chunk)             * we'll synchronously request the remaining data.             *             * Due to the optimization above, we'll only get here in one of the following cases:             * - an EOF situation for files for which we were unable to obtain the file size             * - fewer bytes that requested were returned             *             * According to the SSH specification, this last case should never happen for normal             * disk files (but can happen for device files). In practice, OpenSSH - for example -             * returns less bytes than requested when requesting more than 64 KB.             *             * Important:             * To avoid a deadlock, this read must be done outside of the read lock.             */            var bytesToCatchUp = nextChunk.Offset - _offset;            /*             * TODO: break loop and interrupt blocking wait in case of exception             */            var read = _sftpSession.RequestRead(_handle, _offset, (uint)bytesToCatchUp);            if (read.Length == 0)            {                // process data in read lock to avoid ObjectDisposedException while releasing semaphore                lock (_readLock)                {                    // a zero-length (EOF) response is only valid for the read-back when EOF has                    // been signaled for the next read-ahead chunk                    if (nextChunk.Data.Length == 0)                    {                        _isEndOfFileRead = true;                        // ensure we've not yet disposed the current instance                        if (!_disposingOrDisposed)                        {                            // unblock wait in read-ahead                            _ = _semaphore.Release();                        }                        // signal EOF to caller                        return read;                    }                    // move reader to error state                    _exception = new SshException("Unexpectedly reached end of file.");                    // ensure we've not yet disposed the current instance                    if (!_disposingOrDisposed)                    {                        // unblock wait in read-ahead                        _ = _semaphore.Release();                    }                    // notify caller of error                    throw _exception;                }            }            _offset += (uint)read.Length;            return read;        }        public void Dispose()        {            Dispose(disposing: true);            GC.SuppressFinalize(this);        }        /// <summary>        /// Releases unmanaged and - optionally - managed resources.        /// </summary>        /// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>        private void Dispose(bool disposing)        {            if (_disposingOrDisposed)            {                return;            }            // transition to disposing state            _disposingOrDisposed = true;            if (disposing)            {                // record exception to break prevent future Read()                _exception = new ObjectDisposedException(GetType().FullName);                // signal that we're disposing to interrupt wait in read-ahead                _ = _disposingWaitHandle.Set();                // wait until the read-ahead thread has completed                _ = _readAheadCompleted.WaitOne();                // unblock the Read()                lock (_readLock)                {                    // dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException                    // in Read()                    _semaphore.Dispose();                    // awake Read                    Monitor.PulseAll(_readLock);                }                _readAheadCompleted.Dispose();                _disposingWaitHandle.Dispose();                if (_sftpSession.IsOpen)                {                    try                    {                        var closeAsyncResult = _sftpSession.BeginClose(_handle, callback: null, state: null);                        _sftpSession.EndClose(closeAsyncResult);                    }                    catch (Exception ex)                    {                        _logger.LogInformation(ex, "Failure closing handle");                    }                }            }        }        private void StartReadAhead()        {            ThreadAbstraction.ExecuteThread(() =>            {                while (!_endOfFileReceived && _exception is null)                {                    // check if we should continue with the read-ahead loop                    // note that the EOF and exception check are not included                    // in this check as they do not require Read() to be                    // unblocked (or have already done this)                    if (!ContinueReadAhead())                    {                        // unblock the Read()                        lock (_readLock)                        {                            Monitor.PulseAll(_readLock);                        }                        // break the read-ahead loop                        break;                    }                    // attempt to obtain the semaphore; this may time out when all semaphores are                    // in use due to pending read-aheads (which in turn can happen when the server                    // is slow to respond or when the session is broken)                    if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))                    {                        // re-evaluate whether an exception occurred, and - if not - wait again                        continue;                    }                    // don't bother reading any more chunks if we received EOF, an exception has occurred                    // or the current instance is disposed                    if (_endOfFileReceived || _exception != null)                    {                        break;                    }                    // start reading next chunk                    var bufferedRead = new BufferedRead(_readAheadChunkIndex, _readAheadOffset);                    try                    {                        // even if we know the size of the file and have read up to EOF, we still want                        // to keep reading (ahead) until we receive zero bytes from the remote host as                        // we do not want to rely purely on the reported file size                        //                        // if the offset of the read-ahead chunk is greater than that file size, then                        // we can expect to be reading the last (zero-byte) chunk and switch to synchronous                        // mode to avoid having multiple read-aheads that read beyond EOF                        if (_fileSize != null && (long)_readAheadOffset > _fileSize.Value)                        {                            var asyncResult = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, callback: null, bufferedRead);                            var data = _sftpSession.EndRead(asyncResult);                            ReadCompletedCore(bufferedRead, data);                        }                        else                        {                            _ = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, bufferedRead);                        }                    }                    catch (Exception ex)                    {                        HandleFailure(ex);                        break;                    }                    // advance read-ahead offset                    _readAheadOffset += _chunkSize;                    // increment index of read-ahead chunk                    _readAheadChunkIndex++;                }                _ = _readAheadCompleted.Set();            });        }        /// <summary>        /// Returns a value indicating whether the read-ahead loop should be continued.        /// </summary>        /// <returns>        /// <see langword="true"/> if the read-ahead loop should be continued; otherwise, <see langword="false"/>.        /// </returns>        private bool ContinueReadAhead()        {            try            {                var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);                switch (waitResult)                {                    case 0: // disposing                        return false;                    case 1: // semaphore available                        return true;                    default:                        throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", waitResult));                }            }            catch (Exception ex)            {                _ = Interlocked.CompareExchange(ref _exception, ex, comparand: null);                return false;            }        }        private void ReadCompleted(IAsyncResult result)        {            if (_disposingOrDisposed)            {                // skip further processing if we're disposing the current instance                // to avoid accessing disposed members                return;            }            var readAsyncResult = (SftpReadAsyncResult)result;            byte[] data;            try            {                data = readAsyncResult.EndInvoke();            }            catch (Exception ex)            {                HandleFailure(ex);                return;            }            // a read that completes with a zero-byte result signals EOF            // but there may be pending reads before that read            var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;            ReadCompletedCore(bufferedRead, data);        }        private void ReadCompletedCore(BufferedRead bufferedRead, byte[] data)        {            bufferedRead.Complete(data);            lock (_readLock)            {                // add item to queue                _queue.Add(bufferedRead.ChunkIndex, bufferedRead);                // Signal that a chunk has been read or EOF has been reached.                // In both cases, Read() will eventually also unblock the "read-ahead" thread.                Monitor.PulseAll(_readLock);            }            // check if server signaled EOF            if (data.Length == 0)            {                // set a flag to stop read-aheads                _endOfFileReceived = true;            }        }        private void HandleFailure(Exception cause)        {            _ = Interlocked.CompareExchange(ref _exception, cause, comparand: null);            // unblock read-ahead            _ = _semaphore.Release();            // unblock Read()            lock (_readLock)            {                Monitor.PulseAll(_readLock);            }        }        internal sealed class BufferedRead        {            public int ChunkIndex { get; }            public byte[] Data { get; private set; }            public ulong Offset { get; }            public BufferedRead(int chunkIndex, ulong offset)            {                ChunkIndex = chunkIndex;                Offset = offset;            }            public void Complete(byte[] data)            {                Data = data;            }        }    }}
 |