|  | @@ -0,0 +1,229 @@
 | 
	
		
			
				|  |  | +using Renci.SshNet.Abstractions;
 | 
	
		
			
				|  |  | +using Renci.SshNet.Common;
 | 
	
		
			
				|  |  | +using System;
 | 
	
		
			
				|  |  | +using System.Collections.Generic;
 | 
	
		
			
				|  |  | +using System.Threading;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +namespace Renci.SshNet.Sftp
 | 
	
		
			
				|  |  | +{
 | 
	
		
			
				|  |  | +    internal class SftpFileReader
 | 
	
		
			
				|  |  | +    {
 | 
	
		
			
				|  |  | +        private const int MaxReadAhead = 10;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private readonly byte[] _handle;
 | 
	
		
			
				|  |  | +        private readonly ISftpSession _sftpSession;
 | 
	
		
			
				|  |  | +        private SemaphoreLight _semaphore;
 | 
	
		
			
				|  |  | +        private bool _isCompleted;
 | 
	
		
			
				|  |  | +        private uint _chunkLength;
 | 
	
		
			
				|  |  | +        private int _readAheadChunkIndex;
 | 
	
		
			
				|  |  | +        private int _nextChunkIndex;
 | 
	
		
			
				|  |  | +        private ulong _readAheadOffset;
 | 
	
		
			
				|  |  | +        private ulong _offset;
 | 
	
		
			
				|  |  | +        private ulong _fileSize;
 | 
	
		
			
				|  |  | +        private readonly IDictionary<int, BufferedRead> _queue;
 | 
	
		
			
				|  |  | +        private readonly object _readLock;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public SftpFileReader(byte[] handle, ISftpSession sftpSession)
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            _handle = handle;
 | 
	
		
			
				|  |  | +            _sftpSession = sftpSession;
 | 
	
		
			
				|  |  | +            _chunkLength = 16 * 1024; // TODO !
 | 
	
		
			
				|  |  | +            _semaphore = new SemaphoreLight(MaxReadAhead);
 | 
	
		
			
				|  |  | +            _queue = new Dictionary<int, BufferedRead>(MaxReadAhead);
 | 
	
		
			
				|  |  | +            _readLock = new object();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            _fileSize = (ulong)_sftpSession.RequestFStat(_handle).Size;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            ThreadAbstraction.ExecuteThread(() =>
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                while (!_isCompleted)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    if (_readAheadOffset >= _fileSize)
 | 
	
		
			
				|  |  | +                        break;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // TODO implement cancellation!?
 | 
	
		
			
				|  |  | +                    _semaphore.Wait();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // start reading next chunk
 | 
	
		
			
				|  |  | +                    _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkLength, ReadCompleted,
 | 
	
		
			
				|  |  | +                                           new BufferedRead(_readAheadChunkIndex, _readAheadOffset));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // advance read-ahead offset
 | 
	
		
			
				|  |  | +                    _readAheadOffset += _chunkLength;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    _readAheadChunkIndex++;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            });
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public byte[] Read()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            lock (_readLock)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                BufferedRead nextChunk;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && !_isCompleted)
 | 
	
		
			
				|  |  | +                    Monitor.Wait(_readLock);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                if (_isCompleted)
 | 
	
		
			
				|  |  | +                    return new byte[0];
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                if (nextChunk.Offset == _offset)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    var data = nextChunk.Data;
 | 
	
		
			
				|  |  | +                    _offset += (ulong) data.Length;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    // remove processed chunk
 | 
	
		
			
				|  |  | +                    _queue.Remove(_nextChunkIndex);
 | 
	
		
			
				|  |  | +                    // move to next chunk
 | 
	
		
			
				|  |  | +                    _nextChunkIndex++;
 | 
	
		
			
				|  |  | +                    // allow read ahead of a new chunk
 | 
	
		
			
				|  |  | +                    _semaphore.Release();
 | 
	
		
			
				|  |  | +                    return data;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // when the server returned less bytes than requested (for the previous chunk)
 | 
	
		
			
				|  |  | +                // we'll synchronously request the remaining data
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                var catchUp = new byte[nextChunk.Offset - _offset];
 | 
	
		
			
				|  |  | +                var bytesCaughtUp = 0L;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                while (bytesCaughtUp < catchUp.Length)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    // TODO: break loop and interrupt blocking wait in case of exception
 | 
	
		
			
				|  |  | +                    var read = _sftpSession.RequestRead(_handle, _offset, (uint) catchUp.Length);
 | 
	
		
			
				|  |  | +                    if (read.Length == 0)
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        // break in loop in "read-ahead" thread (once a blocking wait is interrupted)
 | 
	
		
			
				|  |  | +                        _isCompleted = true;
 | 
	
		
			
				|  |  | +                        // interrupt blocking wait in "read-ahead" thread
 | 
	
		
			
				|  |  | +                        lock (_readLock)
 | 
	
		
			
				|  |  | +                            Monitor.PulseAll(_readLock);
 | 
	
		
			
				|  |  | +                        // signal failure
 | 
	
		
			
				|  |  | +                        throw new SshException("Unexpectedly reached end of file.");
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    bytesCaughtUp += read.Length;
 | 
	
		
			
				|  |  | +                    _offset += (ulong) bytesCaughtUp;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                return catchUp;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private void ReadCompleted(IAsyncResult result)
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var readAsyncResult = result as SftpReadAsyncResult;
 | 
	
		
			
				|  |  | +            if (readAsyncResult == null)
 | 
	
		
			
				|  |  | +                return;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            var data = readAsyncResult.EndInvoke();
 | 
	
		
			
				|  |  | +            if (data.Length == 0)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                _isCompleted = true;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            else
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
 | 
	
		
			
				|  |  | +                bufferedRead.Complete(data);
 | 
	
		
			
				|  |  | +                _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // signal that a chunk has been read or EOF has been reached;
 | 
	
		
			
				|  |  | +            // in both cases, we want to unblock the "read-ahead" thread
 | 
	
		
			
				|  |  | +            lock (_readLock)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                Monitor.Pulse(_readLock);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //private class BufferedReadState
 | 
	
		
			
				|  |  | +    //{
 | 
	
		
			
				|  |  | +    //    private BlockingQueue<BufferedRead> _queue;
 | 
	
		
			
				|  |  | +    //    private long _offset;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public BufferedReadState(long offset, SemaphoreLight semaphore, BlockingQueue<BufferedRead> queue)
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        _queue = queue;
 | 
	
		
			
				|  |  | +    //        _offset = offset;
 | 
	
		
			
				|  |  | +    //        _semaphore = semaphore;
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public long Offset
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        get { return _offset; }
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public BlockingQueue<BufferedRead> Queue
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        get { return _queue; }
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public SemaphoreLight Semaphore
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        get { return _semaphore; }
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +    //}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //private class BlockingQueue<T>
 | 
	
		
			
				|  |  | +    //{
 | 
	
		
			
				|  |  | +    //    private readonly object _lock = new object();
 | 
	
		
			
				|  |  | +    //    private Queue<T> _queue;
 | 
	
		
			
				|  |  | +    //    private bool _isClosed;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public BlockingQueue(int capacity)
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        _queue = new Queue<T>(capacity);
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public T Dequeue()
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        lock (_lock)
 | 
	
		
			
				|  |  | +    //        {
 | 
	
		
			
				|  |  | +    //            while (!_isClosed && _queue.Count == 0)
 | 
	
		
			
				|  |  | +    //                Monitor.Wait(_lock);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //            if (_queue.Count == 0)
 | 
	
		
			
				|  |  | +    //                return default(T);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //            return _queue.Dequeue();
 | 
	
		
			
				|  |  | +    //        }
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public void Enqueue(T item)
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        lock (_lock)
 | 
	
		
			
				|  |  | +    //        {
 | 
	
		
			
				|  |  | +    //            _queue.Enqueue(item);
 | 
	
		
			
				|  |  | +    //            Monitor.PulseAll(_lock);
 | 
	
		
			
				|  |  | +    //        }
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    //    public void Close()
 | 
	
		
			
				|  |  | +    //    {
 | 
	
		
			
				|  |  | +    //        _isClosed = true;
 | 
	
		
			
				|  |  | +    //        Monitor.PulseAll(_lock);
 | 
	
		
			
				|  |  | +    //    }
 | 
	
		
			
				|  |  | +    //}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    internal class BufferedRead
 | 
	
		
			
				|  |  | +    {
 | 
	
		
			
				|  |  | +        public int ChunkIndex { get; private set; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public byte[] Data { get; private set; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public ulong Offset { get; private set; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public BufferedRead(int chunkIndex, ulong offset)
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            ChunkIndex = chunkIndex;
 | 
	
		
			
				|  |  | +            Offset = offset;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public void Complete(byte[] data)
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            Data = data;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +}
 |