using System; using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.IO; using System.Threading; using System.Threading.Tasks; using Renci.SshNet.Common; namespace Renci.SshNet.Sftp { /// /// Exposes a around a remote SFTP file, supporting both synchronous and asynchronous read and write operations. /// /// #pragma warning disable IDE0079 // We intentionally want to suppress the below warning. [SuppressMessage("Performance", "CA1844: Provide memory-based overrides of async methods when subclassing 'Stream'", Justification = "TODO: This should be addressed in the future.")] #pragma warning restore IDE0079 public class SftpFileStream : Stream { private readonly Lock _lock = new Lock(); private readonly int _readBufferSize; private readonly int _writeBufferSize; // Internal state. private byte[] _handle; private ISftpSession _session; // Buffer information. private byte[] _readBuffer; private byte[] _writeBuffer; private int _bufferPosition; private int _bufferLen; private long _position; private bool _bufferOwnedByWrite; private bool _canRead; private bool _canSeek; private bool _canWrite; private TimeSpan _timeout; /// /// Gets a value indicating whether the current stream supports reading. /// /// /// if the stream supports reading; otherwise, . /// public override bool CanRead { get { return _canRead; } } /// /// Gets a value indicating whether the current stream supports seeking. /// /// /// if the stream supports seeking; otherwise, . /// public override bool CanSeek { get { return _canSeek; } } /// /// Gets a value indicating whether the current stream supports writing. /// /// /// if the stream supports writing; otherwise, . /// public override bool CanWrite { get { return _canWrite; } } /// /// Gets a value indicating whether timeout properties are usable for . /// /// /// in all cases. /// public override bool CanTimeout { get { return true; } } /// /// Gets the length in bytes of the stream. /// /// A long value representing the length of the stream in bytes. /// A class derived from Stream does not support seeking. /// Methods were called after the stream was closed. /// IO operation failed. public override long Length { get { // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); if (!CanSeek) { throw new NotSupportedException("Seek operation is not supported."); } // Flush the write buffer, because it may // affect the length of the stream. if (_bufferOwnedByWrite) { FlushWriteBuffer(); } // obtain file attributes var attributes = _session.RequestFStat(_handle, nullOnError: true); if (attributes != null) { return attributes.Size; } throw new IOException("Seek operation failed."); } } } /// /// Gets or sets the position within the current stream. /// /// The current position within the stream. /// An I/O error occurs. /// The stream does not support seeking. /// Methods were called after the stream was closed. public override long Position { get { CheckSessionIsOpen(); if (!CanSeek) { throw new NotSupportedException("Seek operation not supported."); } return _position; } set { _ = Seek(value, SeekOrigin.Begin); } } /// /// Gets the name of the path that was used to construct the current . /// /// /// The name of the path that was used to construct the current . /// public string Name { get; private set; } /// /// Gets the operating system file handle for the file that the current encapsulates. /// /// /// The operating system file handle for the file that the current encapsulates. /// public virtual byte[] Handle { get { Flush(); return _handle; } } /// /// Gets or sets the operation timeout. /// /// /// The timeout. /// public TimeSpan Timeout { get { return _timeout; } set { value.EnsureValidTimeout(nameof(Timeout)); _timeout = value; } } private SftpFileStream(ISftpSession session, string path, FileAccess access, int bufferSize, byte[] handle, long position) { Timeout = TimeSpan.FromSeconds(30); Name = path; _session = session; _canRead = (access & FileAccess.Read) == FileAccess.Read; _canSeek = true; _canWrite = (access & FileAccess.Write) == FileAccess.Write; _handle = handle; /* * Instead of using the specified buffer size as is, we use it to calculate a buffer size * that ensures we always receive or send the max. number of bytes in a single SSH_FXP_READ * or SSH_FXP_WRITE message. */ _readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize); _writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle); _position = position; } internal SftpFileStream(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize) { if (session is null) { throw new SshConnectionException("Client not connected."); } ThrowHelper.ThrowIfNull(path); if (bufferSize <= 0) { throw new ArgumentOutOfRangeException(nameof(bufferSize), "Cannot be less than or equal to zero."); } Timeout = TimeSpan.FromSeconds(30); Name = path; // Initialize the object state. _session = session; _canRead = (access & FileAccess.Read) == FileAccess.Read; _canSeek = true; _canWrite = (access & FileAccess.Write) == FileAccess.Write; var flags = Flags.None; switch (access) { case FileAccess.Read: flags |= Flags.Read; break; case FileAccess.Write: flags |= Flags.Write; break; case FileAccess.ReadWrite: flags |= Flags.Read; flags |= Flags.Write; break; default: throw new ArgumentOutOfRangeException(nameof(access)); } if ((access & FileAccess.Read) == FileAccess.Read && mode == FileMode.Append) { throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} mode can be requested only when combined with write-only access.", mode.ToString("G")), nameof(mode)); } if ((access & FileAccess.Write) != FileAccess.Write) { if (mode is FileMode.Create or FileMode.CreateNew or FileMode.Truncate or FileMode.Append) { throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Combining {0}: {1} with {2}: {3} is invalid.", nameof(FileMode), mode, nameof(FileAccess), access), nameof(mode)); } } switch (mode) { case FileMode.Append: flags |= Flags.Append | Flags.CreateNewOrOpen; break; case FileMode.Create: _handle = _session.RequestOpen(path, flags | Flags.Truncate, nullOnError: true); if (_handle is null) { flags |= Flags.CreateNew; } else { flags |= Flags.Truncate; } break; case FileMode.CreateNew: flags |= Flags.CreateNew; break; case FileMode.Open: break; case FileMode.OpenOrCreate: flags |= Flags.CreateNewOrOpen; break; case FileMode.Truncate: flags |= Flags.Truncate; break; default: throw new ArgumentOutOfRangeException(nameof(mode)); } _handle ??= _session.RequestOpen(path, flags); /* * Instead of using the specified buffer size as is, we use it to calculate a buffer size * that ensures we always receive or send the max. number of bytes in a single SSH_FXP_READ * or SSH_FXP_WRITE message. */ _readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize); _writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle); if (mode == FileMode.Append) { var attributes = _session.RequestFStat(_handle, nullOnError: false); _position = attributes.Size; } } internal static async Task OpenAsync(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken) { if (session is null) { throw new SshConnectionException("Client not connected."); } ThrowHelper.ThrowIfNull(path); if (bufferSize <= 0) { throw new ArgumentOutOfRangeException(nameof(bufferSize), "Cannot be less than or equal to zero."); } var flags = Flags.None; switch (access) { case FileAccess.Read: flags |= Flags.Read; break; case FileAccess.Write: flags |= Flags.Write; break; case FileAccess.ReadWrite: flags |= Flags.Read; flags |= Flags.Write; break; default: throw new ArgumentOutOfRangeException(nameof(access)); } if ((access & FileAccess.Read) == FileAccess.Read && mode == FileMode.Append) { throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} mode can be requested only when combined with write-only access.", mode.ToString("G")), nameof(mode)); } if ((access & FileAccess.Write) != FileAccess.Write) { if (mode is FileMode.Create or FileMode.CreateNew or FileMode.Truncate or FileMode.Append) { throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Combining {0}: {1} with {2}: {3} is invalid.", nameof(FileMode), mode, nameof(FileAccess), access), nameof(mode)); } } switch (mode) { case FileMode.Append: flags |= Flags.Append | Flags.CreateNewOrOpen; break; case FileMode.Create: flags |= Flags.CreateNewOrOpen | Flags.Truncate; break; case FileMode.CreateNew: flags |= Flags.CreateNew; break; case FileMode.Open: break; case FileMode.OpenOrCreate: flags |= Flags.CreateNewOrOpen; break; case FileMode.Truncate: flags |= Flags.Truncate; break; default: throw new ArgumentOutOfRangeException(nameof(mode)); } var handle = await session.RequestOpenAsync(path, flags, cancellationToken).ConfigureAwait(false); long position = 0; if (mode == FileMode.Append) { try { var attributes = await session.RequestFStatAsync(handle, cancellationToken).ConfigureAwait(false); position = attributes.Size; } catch { try { await session.RequestCloseAsync(handle, cancellationToken).ConfigureAwait(false); } catch { // The original exception is presumably more informative, so we just ignore this one. } throw; } } return new SftpFileStream(session, path, access, bufferSize, handle, position); } /// /// Clears all buffers for this stream and causes any buffered data to be written to the file. /// /// An I/O error occurs. /// Stream is closed. public override void Flush() { lock (_lock) { CheckSessionIsOpen(); if (_bufferOwnedByWrite) { FlushWriteBuffer(); } else { FlushReadBuffer(); } } } /// /// Asynchronously clears all buffers for this stream and causes any buffered data to be written to the file. /// /// The to observe. /// A that represents the asynchronous flush operation. /// An I/O error occurs. /// Stream is closed. public override Task FlushAsync(CancellationToken cancellationToken) { CheckSessionIsOpen(); if (_bufferOwnedByWrite) { return FlushWriteBufferAsync(cancellationToken); } FlushReadBuffer(); return Task.CompletedTask; } /// /// Reads a sequence of bytes from the current stream and advances the position within the stream by the /// number of bytes read. /// /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source. /// The zero-based byte offset in at which to begin storing the data read from the current stream. /// The maximum number of bytes to be read from the current stream. /// /// The total number of bytes read into the buffer. This can be less than the number of bytes requested /// if that many bytes are not currently available, or zero (0) if the end of the stream has been reached. /// /// The sum of and is larger than the buffer length. /// is . /// or is negative. /// An I/O error occurs. /// The stream does not support reading. /// Methods were called after the stream was closed. /// /// /// This method attempts to read up to bytes. This either from the buffer, from the /// server (using one or more SSH_FXP_READ requests) or using a combination of both. /// /// /// The read loop is interrupted when either bytes are read, the server returns zero /// bytes (EOF) or less bytes than the read buffer size. /// /// /// When a server returns less number of bytes than the read buffer size, this may indicate that EOF has /// been reached. A subsequent (SSH_FXP_READ) server request is necessary to make sure EOF has effectively /// been reached. Breaking out of the read loop avoids reading from the server twice to determine EOF: once in /// the read loop, and once upon the next or invocation. /// /// public override int Read(byte[] buffer, int offset, int count) { var readLen = 0; ThrowHelper.ThrowIfNull(buffer); #if NET8_0_OR_GREATER ArgumentOutOfRangeException.ThrowIfNegative(offset); ArgumentOutOfRangeException.ThrowIfNegative(count); #else if (offset < 0) { throw new ArgumentOutOfRangeException(nameof(offset)); } if (count < 0) { throw new ArgumentOutOfRangeException(nameof(count)); } #endif if ((buffer.Length - offset) < count) { throw new ArgumentException("Invalid array range."); } // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); // Set up for the read operation. SetupRead(); // Read data into the caller's buffer. while (count > 0) { // How much data do we have available in the buffer? var bytesAvailableInBuffer = _bufferLen - _bufferPosition; if (bytesAvailableInBuffer <= 0) { var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize); if (data.Length == 0) { _bufferPosition = 0; _bufferLen = 0; break; } var bytesToWriteToCallerBuffer = count; if (bytesToWriteToCallerBuffer >= data.Length) { // write all data read to caller-provided buffer bytesToWriteToCallerBuffer = data.Length; // reset buffer since we will skip buffering _bufferPosition = 0; _bufferLen = 0; } else { // determine number of bytes that we should write into read buffer var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer; // write remaining bytes to read buffer Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer); // update position in read buffer _bufferPosition = 0; // update number of bytes in read buffer _bufferLen = bytesToWriteToReadBuffer; } // write bytes to caller-provided buffer Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer); // update stream position _position += bytesToWriteToCallerBuffer; // record total number of bytes read into caller-provided buffer readLen += bytesToWriteToCallerBuffer; // break out of the read loop when the server returned less than the request number of bytes // as that *may* indicate that we've reached EOF // // doing this avoids reading from server twice to determine EOF: once in the read loop, and // once upon the next Read or ReadByte invocation by the caller if (data.Length < _readBufferSize) { break; } // advance offset to start writing bytes into caller-provided buffer offset += bytesToWriteToCallerBuffer; // update number of bytes left to read into caller-provided buffer count -= bytesToWriteToCallerBuffer; } else { // limit the number of bytes to use from read buffer to the caller-request number of bytes if (bytesAvailableInBuffer > count) { bytesAvailableInBuffer = count; } // copy data from read buffer to the caller-provided buffer Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer); // update position in read buffer _bufferPosition += bytesAvailableInBuffer; // update stream position _position += bytesAvailableInBuffer; // record total number of bytes read into caller-provided buffer readLen += bytesAvailableInBuffer; // advance offset to start writing bytes into caller-provided buffer offset += bytesAvailableInBuffer; // update number of bytes left to read count -= bytesAvailableInBuffer; } } } // return the number of bytes that were read to the caller. return readLen; } /// /// Asynchronously reads a sequence of bytes from the current stream and advances the position within the stream by the /// number of bytes read. /// /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source. /// The zero-based byte offset in at which to begin storing the data read from the current stream. /// The maximum number of bytes to be read from the current stream. /// The to observe. /// A that represents the asynchronous read operation. public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var readLen = 0; ThrowHelper.ThrowIfNull(buffer); #if NET8_0_OR_GREATER ArgumentOutOfRangeException.ThrowIfNegative(offset); ArgumentOutOfRangeException.ThrowIfNegative(count); #else if (offset < 0) { throw new ArgumentOutOfRangeException(nameof(offset)); } if (count < 0) { throw new ArgumentOutOfRangeException(nameof(count)); } #endif if ((buffer.Length - offset) < count) { throw new ArgumentException("Invalid array range."); } CheckSessionIsOpen(); // Set up for the read operation. SetupRead(); // Read data into the caller's buffer. while (count > 0) { // How much data do we have available in the buffer? var bytesAvailableInBuffer = _bufferLen - _bufferPosition; if (bytesAvailableInBuffer <= 0) { var data = await _session.RequestReadAsync(_handle, (ulong)_position, (uint)_readBufferSize, cancellationToken).ConfigureAwait(false); if (data.Length == 0) { _bufferPosition = 0; _bufferLen = 0; break; } var bytesToWriteToCallerBuffer = count; if (bytesToWriteToCallerBuffer >= data.Length) { // write all data read to caller-provided buffer bytesToWriteToCallerBuffer = data.Length; // reset buffer since we will skip buffering _bufferPosition = 0; _bufferLen = 0; } else { // determine number of bytes that we should write into read buffer var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer; // write remaining bytes to read buffer Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer); // update position in read buffer _bufferPosition = 0; // update number of bytes in read buffer _bufferLen = bytesToWriteToReadBuffer; } // write bytes to caller-provided buffer Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer); // update stream position _position += bytesToWriteToCallerBuffer; // record total number of bytes read into caller-provided buffer readLen += bytesToWriteToCallerBuffer; // break out of the read loop when the server returned less than the request number of bytes // as that *may* indicate that we've reached EOF // // doing this avoids reading from server twice to determine EOF: once in the read loop, and // once upon the next Read or ReadByte invocation by the caller if (data.Length < _readBufferSize) { break; } // advance offset to start writing bytes into caller-provided buffer offset += bytesToWriteToCallerBuffer; // update number of bytes left to read into caller-provided buffer count -= bytesToWriteToCallerBuffer; } else { // limit the number of bytes to use from read buffer to the caller-request number of bytes if (bytesAvailableInBuffer > count) { bytesAvailableInBuffer = count; } // copy data from read buffer to the caller-provided buffer Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer); // update position in read buffer _bufferPosition += bytesAvailableInBuffer; // update stream position _position += bytesAvailableInBuffer; // record total number of bytes read into caller-provided buffer readLen += bytesAvailableInBuffer; // advance offset to start writing bytes into caller-provided buffer offset += bytesAvailableInBuffer; // update number of bytes left to read count -= bytesAvailableInBuffer; } } // return the number of bytes that were read to the caller. return readLen; } /// /// Reads a byte from the stream and advances the position within the stream by one byte, or returns -1 if at the end of the stream. /// /// /// The unsigned byte cast to an , or -1 if at the end of the stream. /// /// The stream does not support reading. /// Methods were called after the stream was closed. /// Read operation failed. public override int ReadByte() { // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); // Setup the object for reading. SetupRead(); byte[] readBuffer; // Read more data into the internal buffer if necessary. if (_bufferPosition >= _bufferLen) { var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize); if (data.Length == 0) { // We've reached EOF. return -1; } readBuffer = GetOrCreateReadBuffer(); Buffer.BlockCopy(data, 0, readBuffer, 0, data.Length); _bufferPosition = 0; _bufferLen = data.Length; } else { readBuffer = GetOrCreateReadBuffer(); } // Extract the next byte from the buffer. ++_position; return readBuffer[_bufferPosition++]; } } /// /// Sets the position within the current stream. /// /// A byte offset relative to the parameter. /// A value of type indicating the reference point used to obtain the new position. /// /// The new position within the current stream. /// /// An I/O error occurs. /// The stream does not support seeking, such as if the stream is constructed from a pipe or console output. /// Methods were called after the stream was closed. public override long Seek(long offset, SeekOrigin origin) { long newPosn; // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); if (!CanSeek) { throw new NotSupportedException("Seek is not supported."); } // Don't do anything if the position won't be moving. if (origin == SeekOrigin.Begin && offset == _position) { return offset; } if (origin == SeekOrigin.Current && offset == 0) { return _position; } // The behaviour depends upon the read/write mode. if (_bufferOwnedByWrite) { // Flush the write buffer and then seek. FlushWriteBuffer(); } else { // Determine if the seek is to somewhere inside // the current read buffer bounds. if (origin == SeekOrigin.Begin) { newPosn = _position - _bufferPosition; if (offset >= newPosn && offset < (newPosn + _bufferLen)) { _bufferPosition = (int)(offset - newPosn); _position = offset; return _position; } } else if (origin == SeekOrigin.Current) { newPosn = _position + offset; if (newPosn >= (_position - _bufferPosition) && newPosn < (_position - _bufferPosition + _bufferLen)) { _bufferPosition = (int)(newPosn - (_position - _bufferPosition)); _position = newPosn; return _position; } } // Abandon the read buffer. _bufferPosition = 0; _bufferLen = 0; } // Seek to the new position. switch (origin) { case SeekOrigin.Begin: newPosn = offset; break; case SeekOrigin.Current: newPosn = _position + offset; break; case SeekOrigin.End: var attributes = _session.RequestFStat(_handle, nullOnError: false); newPosn = attributes.Size + offset; break; default: throw new ArgumentException("Invalid seek origin.", nameof(origin)); } if (newPosn < 0) { throw new EndOfStreamException(); } _position = newPosn; return _position; } } /// /// Sets the length of the current stream. /// /// The desired length of the current stream in bytes. /// An I/O error occurs. /// The stream does not support both writing and seeking. /// Methods were called after the stream was closed. /// must be greater than zero. /// /// /// Buffers are first flushed. /// /// /// If the specified value is less than the current length of the stream, the stream is truncated and - if the /// current position is greater than the new length - the current position is moved to the last byte of the stream. /// /// /// If the given value is greater than the current length of the stream, the stream is expanded and the current /// position remains the same. /// /// public override void SetLength(long value) { #if NET8_0_OR_GREATER ArgumentOutOfRangeException.ThrowIfNegative(value); #else if (value < 0) { throw new ArgumentOutOfRangeException(nameof(value)); } #endif // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); if (!CanSeek) { throw new NotSupportedException("Seek is not supported."); } if (_bufferOwnedByWrite) { FlushWriteBuffer(); } else { SetupWrite(); } var attributes = _session.RequestFStat(_handle, nullOnError: false); attributes.Size = value; _session.RequestFSetStat(_handle, attributes); if (_position > value) { _position = value; } } } /// /// Writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written. /// /// An array of bytes. This method copies bytes from to the current stream. /// The zero-based byte offset in at which to begin copying bytes to the current stream. /// The number of bytes to be written to the current stream. /// The sum of and is greater than the buffer length. /// is . /// or is negative. /// An I/O error occurs. /// The stream does not support writing. /// Methods were called after the stream was closed. public override void Write(byte[] buffer, int offset, int count) { ThrowHelper.ThrowIfNull(buffer); #if NET8_0_OR_GREATER ArgumentOutOfRangeException.ThrowIfNegative(offset); ArgumentOutOfRangeException.ThrowIfNegative(count); #else if (offset < 0) { throw new ArgumentOutOfRangeException(nameof(offset)); } if (count < 0) { throw new ArgumentOutOfRangeException(nameof(count)); } #endif if ((buffer.Length - offset) < count) { throw new ArgumentException("Invalid array range."); } // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); // Setup this object for writing. SetupWrite(); // Write data to the file stream. while (count > 0) { // Determine how many bytes we can write to the buffer. var tempLen = _writeBufferSize - _bufferPosition; if (tempLen <= 0) { // flush write buffer, and mark it empty FlushWriteBuffer(); // we can now write or buffer the full buffer size tempLen = _writeBufferSize; } // limit the number of bytes to write to the actual number of bytes requested if (tempLen > count) { tempLen = count; } // Can we short-cut the internal buffer? if (_bufferPosition == 0 && tempLen == _writeBufferSize) { using (var wait = new AutoResetEvent(initialState: false)) { _session.RequestWrite(_handle, (ulong)_position, buffer, offset, tempLen, wait); } } else { // No: copy the data to the write buffer first. Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen); _bufferPosition += tempLen; } // Advance the buffer and stream positions. _position += tempLen; offset += tempLen; count -= tempLen; } // If the buffer is full, then do a speculative flush now, // rather than waiting for the next call to this method. if (_bufferPosition >= _writeBufferSize) { using (var wait = new AutoResetEvent(initialState: false)) { _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, wait); } _bufferPosition = 0; } } } /// /// Asynchronously writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written. /// /// An array of bytes. This method copies bytes from to the current stream. /// The zero-based byte offset in at which to begin copying bytes to the current stream. /// The number of bytes to be written to the current stream. /// The to observe. /// A that represents the asynchronous write operation. /// The sum of and is greater than the buffer length. /// is . /// or is negative. /// An I/O error occurs. /// The stream does not support writing. /// Methods were called after the stream was closed. public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { ThrowHelper.ThrowIfNull(buffer); #if NET8_0_OR_GREATER ArgumentOutOfRangeException.ThrowIfNegative(offset); ArgumentOutOfRangeException.ThrowIfNegative(count); #else if (offset < 0) { throw new ArgumentOutOfRangeException(nameof(offset)); } if (count < 0) { throw new ArgumentOutOfRangeException(nameof(count)); } #endif if ((buffer.Length - offset) < count) { throw new ArgumentException("Invalid array range."); } CheckSessionIsOpen(); // Setup this object for writing. SetupWrite(); // Write data to the file stream. while (count > 0) { // Determine how many bytes we can write to the buffer. var tempLen = _writeBufferSize - _bufferPosition; if (tempLen <= 0) { // flush write buffer, and mark it empty await FlushWriteBufferAsync(cancellationToken).ConfigureAwait(false); // we can now write or buffer the full buffer size tempLen = _writeBufferSize; } // limit the number of bytes to write to the actual number of bytes requested if (tempLen > count) { tempLen = count; } // Can we short-cut the internal buffer? if (_bufferPosition == 0 && tempLen == _writeBufferSize) { await _session.RequestWriteAsync(_handle, (ulong)_position, buffer, offset, tempLen, cancellationToken).ConfigureAwait(false); } else { // No: copy the data to the write buffer first. Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen); _bufferPosition += tempLen; } // Advance the buffer and stream positions. _position += tempLen; offset += tempLen; count -= tempLen; } // If the buffer is full, then do a speculative flush now, // rather than waiting for the next call to this method. if (_bufferPosition >= _writeBufferSize) { await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, cancellationToken).ConfigureAwait(false); _bufferPosition = 0; } } /// /// Writes a byte to the current position in the stream and advances the position within the stream by one byte. /// /// The byte to write to the stream. /// An I/O error occurs. /// The stream does not support writing, or the stream is already closed. /// Methods were called after the stream was closed. public override void WriteByte(byte value) { // Lock down the file stream while we do this. lock (_lock) { CheckSessionIsOpen(); // Setup the object for writing. SetupWrite(); var writeBuffer = GetOrCreateWriteBuffer(); // Flush the current buffer if it is full. if (_bufferPosition >= _writeBufferSize) { using (var wait = new AutoResetEvent(initialState: false)) { _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), writeBuffer, 0, _bufferPosition, wait); } _bufferPosition = 0; } // Write the byte into the buffer and advance the posn. writeBuffer[_bufferPosition++] = value; ++_position; } } /// /// Releases the unmanaged resources used by the and optionally releases the managed resources. /// /// to release both managed and unmanaged resources; to release only unmanaged resources. protected override void Dispose(bool disposing) { base.Dispose(disposing); if (_session != null) { if (disposing) { lock (_lock) { if (_session != null) { _canRead = false; _canSeek = false; _canWrite = false; if (_handle != null) { if (_session.IsOpen) { if (_bufferOwnedByWrite) { FlushWriteBuffer(); } _session.RequestClose(_handle); } _handle = null; } _session = null; } } } } } private byte[] GetOrCreateReadBuffer() { _readBuffer ??= new byte[_readBufferSize]; return _readBuffer; } private byte[] GetOrCreateWriteBuffer() { _writeBuffer ??= new byte[_writeBufferSize]; return _writeBuffer; } /// /// Flushes the read data from the buffer. /// private void FlushReadBuffer() { _bufferPosition = 0; _bufferLen = 0; } /// /// Flush any buffered write data to the file. /// private void FlushWriteBuffer() { if (_bufferPosition > 0) { using (var wait = new AutoResetEvent(initialState: false)) { _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, wait); } _bufferPosition = 0; } } private async Task FlushWriteBufferAsync(CancellationToken cancellationToken) { if (_bufferPosition > 0) { await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, cancellationToken).ConfigureAwait(false); _bufferPosition = 0; } } /// /// Setups the read. /// private void SetupRead() { if (!CanRead) { throw new NotSupportedException("Read not supported."); } if (_bufferOwnedByWrite) { FlushWriteBuffer(); _bufferOwnedByWrite = false; } } /// /// Setups the write. /// private void SetupWrite() { if (!CanWrite) { throw new NotSupportedException("Write not supported."); } if (!_bufferOwnedByWrite) { FlushReadBuffer(); _bufferOwnedByWrite = true; } } private void CheckSessionIsOpen() { ThrowHelper.ThrowObjectDisposedIf(_session is null, this); if (!_session.IsOpen) { throw new ObjectDisposedException(GetType().FullName, "Cannot access a closed SFTP session."); } } } }