using System;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Renci.SshNet.Common;
namespace Renci.SshNet.Sftp
{
///
/// Exposes a around a remote SFTP file, supporting both synchronous and asynchronous read and write operations.
///
///
#pragma warning disable IDE0079 // We intentionally want to suppress the below warning.
[SuppressMessage("Performance", "CA1844: Provide memory-based overrides of async methods when subclassing 'Stream'", Justification = "TODO: This should be addressed in the future.")]
#pragma warning restore IDE0079
public class SftpFileStream : Stream
{
private readonly Lock _lock = new Lock();
private readonly int _readBufferSize;
private readonly int _writeBufferSize;
// Internal state.
private byte[] _handle;
private ISftpSession _session;
// Buffer information.
private byte[] _readBuffer;
private byte[] _writeBuffer;
private int _bufferPosition;
private int _bufferLen;
private long _position;
private bool _bufferOwnedByWrite;
private bool _canRead;
private bool _canSeek;
private bool _canWrite;
private TimeSpan _timeout;
///
/// Gets a value indicating whether the current stream supports reading.
///
///
/// if the stream supports reading; otherwise, .
///
public override bool CanRead
{
get { return _canRead; }
}
///
/// Gets a value indicating whether the current stream supports seeking.
///
///
/// if the stream supports seeking; otherwise, .
///
public override bool CanSeek
{
get { return _canSeek; }
}
///
/// Gets a value indicating whether the current stream supports writing.
///
///
/// if the stream supports writing; otherwise, .
///
public override bool CanWrite
{
get { return _canWrite; }
}
///
/// Gets a value indicating whether timeout properties are usable for .
///
///
/// in all cases.
///
public override bool CanTimeout
{
get { return true; }
}
///
/// Gets the length in bytes of the stream.
///
/// A long value representing the length of the stream in bytes.
/// A class derived from Stream does not support seeking.
/// Methods were called after the stream was closed.
/// IO operation failed.
public override long Length
{
get
{
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
if (!CanSeek)
{
throw new NotSupportedException("Seek operation is not supported.");
}
// Flush the write buffer, because it may
// affect the length of the stream.
if (_bufferOwnedByWrite)
{
FlushWriteBuffer();
}
// obtain file attributes
var attributes = _session.RequestFStat(_handle, nullOnError: true);
if (attributes != null)
{
return attributes.Size;
}
throw new IOException("Seek operation failed.");
}
}
}
///
/// Gets or sets the position within the current stream.
///
/// The current position within the stream.
/// An I/O error occurs.
/// The stream does not support seeking.
/// Methods were called after the stream was closed.
public override long Position
{
get
{
CheckSessionIsOpen();
if (!CanSeek)
{
throw new NotSupportedException("Seek operation not supported.");
}
return _position;
}
set
{
_ = Seek(value, SeekOrigin.Begin);
}
}
///
/// Gets the name of the path that was used to construct the current .
///
///
/// The name of the path that was used to construct the current .
///
public string Name { get; private set; }
///
/// Gets the operating system file handle for the file that the current encapsulates.
///
///
/// The operating system file handle for the file that the current encapsulates.
///
public virtual byte[] Handle
{
get
{
Flush();
return _handle;
}
}
///
/// Gets or sets the operation timeout.
///
///
/// The timeout.
///
public TimeSpan Timeout
{
get
{
return _timeout;
}
set
{
value.EnsureValidTimeout(nameof(Timeout));
_timeout = value;
}
}
private SftpFileStream(ISftpSession session, string path, FileAccess access, int bufferSize, byte[] handle, long position)
{
Timeout = TimeSpan.FromSeconds(30);
Name = path;
_session = session;
_canRead = (access & FileAccess.Read) == FileAccess.Read;
_canSeek = true;
_canWrite = (access & FileAccess.Write) == FileAccess.Write;
_handle = handle;
/*
* Instead of using the specified buffer size as is, we use it to calculate a buffer size
* that ensures we always receive or send the max. number of bytes in a single SSH_FXP_READ
* or SSH_FXP_WRITE message.
*/
_readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize);
_writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle);
_position = position;
}
internal SftpFileStream(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize)
{
if (session is null)
{
throw new SshConnectionException("Client not connected.");
}
ThrowHelper.ThrowIfNull(path);
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), "Cannot be less than or equal to zero.");
}
Timeout = TimeSpan.FromSeconds(30);
Name = path;
// Initialize the object state.
_session = session;
_canRead = (access & FileAccess.Read) == FileAccess.Read;
_canSeek = true;
_canWrite = (access & FileAccess.Write) == FileAccess.Write;
var flags = Flags.None;
switch (access)
{
case FileAccess.Read:
flags |= Flags.Read;
break;
case FileAccess.Write:
flags |= Flags.Write;
break;
case FileAccess.ReadWrite:
flags |= Flags.Read;
flags |= Flags.Write;
break;
default:
throw new ArgumentOutOfRangeException(nameof(access));
}
if ((access & FileAccess.Read) == FileAccess.Read && mode == FileMode.Append)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture,
"{0} mode can be requested only when combined with write-only access.",
mode.ToString("G")),
nameof(mode));
}
if ((access & FileAccess.Write) != FileAccess.Write)
{
if (mode is FileMode.Create or FileMode.CreateNew or FileMode.Truncate or FileMode.Append)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture,
"Combining {0}: {1} with {2}: {3} is invalid.",
nameof(FileMode),
mode,
nameof(FileAccess),
access),
nameof(mode));
}
}
switch (mode)
{
case FileMode.Append:
flags |= Flags.Append | Flags.CreateNewOrOpen;
break;
case FileMode.Create:
_handle = _session.RequestOpen(path, flags | Flags.Truncate, nullOnError: true);
if (_handle is null)
{
flags |= Flags.CreateNew;
}
else
{
flags |= Flags.Truncate;
}
break;
case FileMode.CreateNew:
flags |= Flags.CreateNew;
break;
case FileMode.Open:
break;
case FileMode.OpenOrCreate:
flags |= Flags.CreateNewOrOpen;
break;
case FileMode.Truncate:
flags |= Flags.Truncate;
break;
default:
throw new ArgumentOutOfRangeException(nameof(mode));
}
_handle ??= _session.RequestOpen(path, flags);
/*
* Instead of using the specified buffer size as is, we use it to calculate a buffer size
* that ensures we always receive or send the max. number of bytes in a single SSH_FXP_READ
* or SSH_FXP_WRITE message.
*/
_readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize);
_writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, _handle);
if (mode == FileMode.Append)
{
var attributes = _session.RequestFStat(_handle, nullOnError: false);
_position = attributes.Size;
}
}
internal static async Task OpenAsync(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken)
{
if (session is null)
{
throw new SshConnectionException("Client not connected.");
}
ThrowHelper.ThrowIfNull(path);
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), "Cannot be less than or equal to zero.");
}
var flags = Flags.None;
switch (access)
{
case FileAccess.Read:
flags |= Flags.Read;
break;
case FileAccess.Write:
flags |= Flags.Write;
break;
case FileAccess.ReadWrite:
flags |= Flags.Read;
flags |= Flags.Write;
break;
default:
throw new ArgumentOutOfRangeException(nameof(access));
}
if ((access & FileAccess.Read) == FileAccess.Read && mode == FileMode.Append)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture,
"{0} mode can be requested only when combined with write-only access.",
mode.ToString("G")),
nameof(mode));
}
if ((access & FileAccess.Write) != FileAccess.Write)
{
if (mode is FileMode.Create or FileMode.CreateNew or FileMode.Truncate or FileMode.Append)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture,
"Combining {0}: {1} with {2}: {3} is invalid.",
nameof(FileMode),
mode,
nameof(FileAccess),
access),
nameof(mode));
}
}
switch (mode)
{
case FileMode.Append:
flags |= Flags.Append | Flags.CreateNewOrOpen;
break;
case FileMode.Create:
flags |= Flags.CreateNewOrOpen | Flags.Truncate;
break;
case FileMode.CreateNew:
flags |= Flags.CreateNew;
break;
case FileMode.Open:
break;
case FileMode.OpenOrCreate:
flags |= Flags.CreateNewOrOpen;
break;
case FileMode.Truncate:
flags |= Flags.Truncate;
break;
default:
throw new ArgumentOutOfRangeException(nameof(mode));
}
var handle = await session.RequestOpenAsync(path, flags, cancellationToken).ConfigureAwait(false);
long position = 0;
if (mode == FileMode.Append)
{
try
{
var attributes = await session.RequestFStatAsync(handle, cancellationToken).ConfigureAwait(false);
position = attributes.Size;
}
catch
{
try
{
await session.RequestCloseAsync(handle, cancellationToken).ConfigureAwait(false);
}
catch
{
// The original exception is presumably more informative, so we just ignore this one.
}
throw;
}
}
return new SftpFileStream(session, path, access, bufferSize, handle, position);
}
///
/// Clears all buffers for this stream and causes any buffered data to be written to the file.
///
/// An I/O error occurs.
/// Stream is closed.
public override void Flush()
{
lock (_lock)
{
CheckSessionIsOpen();
if (_bufferOwnedByWrite)
{
FlushWriteBuffer();
}
else
{
FlushReadBuffer();
}
}
}
///
/// Asynchronously clears all buffers for this stream and causes any buffered data to be written to the file.
///
/// The to observe.
/// A that represents the asynchronous flush operation.
/// An I/O error occurs.
/// Stream is closed.
public override Task FlushAsync(CancellationToken cancellationToken)
{
CheckSessionIsOpen();
if (_bufferOwnedByWrite)
{
return FlushWriteBufferAsync(cancellationToken);
}
FlushReadBuffer();
return Task.CompletedTask;
}
///
/// Reads a sequence of bytes from the current stream and advances the position within the stream by the
/// number of bytes read.
///
/// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source.
/// The zero-based byte offset in at which to begin storing the data read from the current stream.
/// The maximum number of bytes to be read from the current stream.
///
/// The total number of bytes read into the buffer. This can be less than the number of bytes requested
/// if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
///
/// The sum of and is larger than the buffer length.
/// is .
/// or is negative.
/// An I/O error occurs.
/// The stream does not support reading.
/// Methods were called after the stream was closed.
///
///
/// This method attempts to read up to bytes. This either from the buffer, from the
/// server (using one or more SSH_FXP_READ requests) or using a combination of both.
///
///
/// The read loop is interrupted when either bytes are read, the server returns zero
/// bytes (EOF) or less bytes than the read buffer size.
///
///
/// When a server returns less number of bytes than the read buffer size, this may indicate that EOF has
/// been reached. A subsequent (SSH_FXP_READ) server request is necessary to make sure EOF has effectively
/// been reached. Breaking out of the read loop avoids reading from the server twice to determine EOF: once in
/// the read loop, and once upon the next or invocation.
///
///
public override int Read(byte[] buffer, int offset, int count)
{
var readLen = 0;
ThrowHelper.ThrowIfNull(buffer);
#if NET8_0_OR_GREATER
ArgumentOutOfRangeException.ThrowIfNegative(offset);
ArgumentOutOfRangeException.ThrowIfNegative(count);
#else
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
#endif
if ((buffer.Length - offset) < count)
{
throw new ArgumentException("Invalid array range.");
}
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
// Set up for the read operation.
SetupRead();
// Read data into the caller's buffer.
while (count > 0)
{
// How much data do we have available in the buffer?
var bytesAvailableInBuffer = _bufferLen - _bufferPosition;
if (bytesAvailableInBuffer <= 0)
{
var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
if (data.Length == 0)
{
_bufferPosition = 0;
_bufferLen = 0;
break;
}
var bytesToWriteToCallerBuffer = count;
if (bytesToWriteToCallerBuffer >= data.Length)
{
// write all data read to caller-provided buffer
bytesToWriteToCallerBuffer = data.Length;
// reset buffer since we will skip buffering
_bufferPosition = 0;
_bufferLen = 0;
}
else
{
// determine number of bytes that we should write into read buffer
var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer;
// write remaining bytes to read buffer
Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer);
// update position in read buffer
_bufferPosition = 0;
// update number of bytes in read buffer
_bufferLen = bytesToWriteToReadBuffer;
}
// write bytes to caller-provided buffer
Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer);
// update stream position
_position += bytesToWriteToCallerBuffer;
// record total number of bytes read into caller-provided buffer
readLen += bytesToWriteToCallerBuffer;
// break out of the read loop when the server returned less than the request number of bytes
// as that *may* indicate that we've reached EOF
//
// doing this avoids reading from server twice to determine EOF: once in the read loop, and
// once upon the next Read or ReadByte invocation by the caller
if (data.Length < _readBufferSize)
{
break;
}
// advance offset to start writing bytes into caller-provided buffer
offset += bytesToWriteToCallerBuffer;
// update number of bytes left to read into caller-provided buffer
count -= bytesToWriteToCallerBuffer;
}
else
{
// limit the number of bytes to use from read buffer to the caller-request number of bytes
if (bytesAvailableInBuffer > count)
{
bytesAvailableInBuffer = count;
}
// copy data from read buffer to the caller-provided buffer
Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer);
// update position in read buffer
_bufferPosition += bytesAvailableInBuffer;
// update stream position
_position += bytesAvailableInBuffer;
// record total number of bytes read into caller-provided buffer
readLen += bytesAvailableInBuffer;
// advance offset to start writing bytes into caller-provided buffer
offset += bytesAvailableInBuffer;
// update number of bytes left to read
count -= bytesAvailableInBuffer;
}
}
}
// return the number of bytes that were read to the caller.
return readLen;
}
///
/// Asynchronously reads a sequence of bytes from the current stream and advances the position within the stream by the
/// number of bytes read.
///
/// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source.
/// The zero-based byte offset in at which to begin storing the data read from the current stream.
/// The maximum number of bytes to be read from the current stream.
/// The to observe.
/// A that represents the asynchronous read operation.
public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var readLen = 0;
ThrowHelper.ThrowIfNull(buffer);
#if NET8_0_OR_GREATER
ArgumentOutOfRangeException.ThrowIfNegative(offset);
ArgumentOutOfRangeException.ThrowIfNegative(count);
#else
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
#endif
if ((buffer.Length - offset) < count)
{
throw new ArgumentException("Invalid array range.");
}
CheckSessionIsOpen();
// Set up for the read operation.
SetupRead();
// Read data into the caller's buffer.
while (count > 0)
{
// How much data do we have available in the buffer?
var bytesAvailableInBuffer = _bufferLen - _bufferPosition;
if (bytesAvailableInBuffer <= 0)
{
var data = await _session.RequestReadAsync(_handle, (ulong)_position, (uint)_readBufferSize, cancellationToken).ConfigureAwait(false);
if (data.Length == 0)
{
_bufferPosition = 0;
_bufferLen = 0;
break;
}
var bytesToWriteToCallerBuffer = count;
if (bytesToWriteToCallerBuffer >= data.Length)
{
// write all data read to caller-provided buffer
bytesToWriteToCallerBuffer = data.Length;
// reset buffer since we will skip buffering
_bufferPosition = 0;
_bufferLen = 0;
}
else
{
// determine number of bytes that we should write into read buffer
var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer;
// write remaining bytes to read buffer
Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer);
// update position in read buffer
_bufferPosition = 0;
// update number of bytes in read buffer
_bufferLen = bytesToWriteToReadBuffer;
}
// write bytes to caller-provided buffer
Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer);
// update stream position
_position += bytesToWriteToCallerBuffer;
// record total number of bytes read into caller-provided buffer
readLen += bytesToWriteToCallerBuffer;
// break out of the read loop when the server returned less than the request number of bytes
// as that *may* indicate that we've reached EOF
//
// doing this avoids reading from server twice to determine EOF: once in the read loop, and
// once upon the next Read or ReadByte invocation by the caller
if (data.Length < _readBufferSize)
{
break;
}
// advance offset to start writing bytes into caller-provided buffer
offset += bytesToWriteToCallerBuffer;
// update number of bytes left to read into caller-provided buffer
count -= bytesToWriteToCallerBuffer;
}
else
{
// limit the number of bytes to use from read buffer to the caller-request number of bytes
if (bytesAvailableInBuffer > count)
{
bytesAvailableInBuffer = count;
}
// copy data from read buffer to the caller-provided buffer
Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer);
// update position in read buffer
_bufferPosition += bytesAvailableInBuffer;
// update stream position
_position += bytesAvailableInBuffer;
// record total number of bytes read into caller-provided buffer
readLen += bytesAvailableInBuffer;
// advance offset to start writing bytes into caller-provided buffer
offset += bytesAvailableInBuffer;
// update number of bytes left to read
count -= bytesAvailableInBuffer;
}
}
// return the number of bytes that were read to the caller.
return readLen;
}
///
/// Reads a byte from the stream and advances the position within the stream by one byte, or returns -1 if at the end of the stream.
///
///
/// The unsigned byte cast to an , or -1 if at the end of the stream.
///
/// The stream does not support reading.
/// Methods were called after the stream was closed.
/// Read operation failed.
public override int ReadByte()
{
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
// Setup the object for reading.
SetupRead();
byte[] readBuffer;
// Read more data into the internal buffer if necessary.
if (_bufferPosition >= _bufferLen)
{
var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
if (data.Length == 0)
{
// We've reached EOF.
return -1;
}
readBuffer = GetOrCreateReadBuffer();
Buffer.BlockCopy(data, 0, readBuffer, 0, data.Length);
_bufferPosition = 0;
_bufferLen = data.Length;
}
else
{
readBuffer = GetOrCreateReadBuffer();
}
// Extract the next byte from the buffer.
++_position;
return readBuffer[_bufferPosition++];
}
}
///
/// Sets the position within the current stream.
///
/// A byte offset relative to the parameter.
/// A value of type indicating the reference point used to obtain the new position.
///
/// The new position within the current stream.
///
/// An I/O error occurs.
/// The stream does not support seeking, such as if the stream is constructed from a pipe or console output.
/// Methods were called after the stream was closed.
public override long Seek(long offset, SeekOrigin origin)
{
long newPosn;
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
if (!CanSeek)
{
throw new NotSupportedException("Seek is not supported.");
}
// Don't do anything if the position won't be moving.
if (origin == SeekOrigin.Begin && offset == _position)
{
return offset;
}
if (origin == SeekOrigin.Current && offset == 0)
{
return _position;
}
// The behaviour depends upon the read/write mode.
if (_bufferOwnedByWrite)
{
// Flush the write buffer and then seek.
FlushWriteBuffer();
}
else
{
// Determine if the seek is to somewhere inside
// the current read buffer bounds.
if (origin == SeekOrigin.Begin)
{
newPosn = _position - _bufferPosition;
if (offset >= newPosn && offset < (newPosn + _bufferLen))
{
_bufferPosition = (int)(offset - newPosn);
_position = offset;
return _position;
}
}
else if (origin == SeekOrigin.Current)
{
newPosn = _position + offset;
if (newPosn >= (_position - _bufferPosition) &&
newPosn < (_position - _bufferPosition + _bufferLen))
{
_bufferPosition = (int)(newPosn - (_position - _bufferPosition));
_position = newPosn;
return _position;
}
}
// Abandon the read buffer.
_bufferPosition = 0;
_bufferLen = 0;
}
// Seek to the new position.
switch (origin)
{
case SeekOrigin.Begin:
newPosn = offset;
break;
case SeekOrigin.Current:
newPosn = _position + offset;
break;
case SeekOrigin.End:
var attributes = _session.RequestFStat(_handle, nullOnError: false);
newPosn = attributes.Size + offset;
break;
default:
throw new ArgumentException("Invalid seek origin.", nameof(origin));
}
if (newPosn < 0)
{
throw new EndOfStreamException();
}
_position = newPosn;
return _position;
}
}
///
/// Sets the length of the current stream.
///
/// The desired length of the current stream in bytes.
/// An I/O error occurs.
/// The stream does not support both writing and seeking.
/// Methods were called after the stream was closed.
/// must be greater than zero.
///
///
/// Buffers are first flushed.
///
///
/// If the specified value is less than the current length of the stream, the stream is truncated and - if the
/// current position is greater than the new length - the current position is moved to the last byte of the stream.
///
///
/// If the given value is greater than the current length of the stream, the stream is expanded and the current
/// position remains the same.
///
///
public override void SetLength(long value)
{
#if NET8_0_OR_GREATER
ArgumentOutOfRangeException.ThrowIfNegative(value);
#else
if (value < 0)
{
throw new ArgumentOutOfRangeException(nameof(value));
}
#endif
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
if (!CanSeek)
{
throw new NotSupportedException("Seek is not supported.");
}
if (_bufferOwnedByWrite)
{
FlushWriteBuffer();
}
else
{
SetupWrite();
}
var attributes = _session.RequestFStat(_handle, nullOnError: false);
attributes.Size = value;
_session.RequestFSetStat(_handle, attributes);
if (_position > value)
{
_position = value;
}
}
}
///
/// Writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
///
/// An array of bytes. This method copies bytes from to the current stream.
/// The zero-based byte offset in at which to begin copying bytes to the current stream.
/// The number of bytes to be written to the current stream.
/// The sum of and is greater than the buffer length.
/// is .
/// or is negative.
/// An I/O error occurs.
/// The stream does not support writing.
/// Methods were called after the stream was closed.
public override void Write(byte[] buffer, int offset, int count)
{
ThrowHelper.ThrowIfNull(buffer);
#if NET8_0_OR_GREATER
ArgumentOutOfRangeException.ThrowIfNegative(offset);
ArgumentOutOfRangeException.ThrowIfNegative(count);
#else
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
#endif
if ((buffer.Length - offset) < count)
{
throw new ArgumentException("Invalid array range.");
}
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
// Setup this object for writing.
SetupWrite();
// Write data to the file stream.
while (count > 0)
{
// Determine how many bytes we can write to the buffer.
var tempLen = _writeBufferSize - _bufferPosition;
if (tempLen <= 0)
{
// flush write buffer, and mark it empty
FlushWriteBuffer();
// we can now write or buffer the full buffer size
tempLen = _writeBufferSize;
}
// limit the number of bytes to write to the actual number of bytes requested
if (tempLen > count)
{
tempLen = count;
}
// Can we short-cut the internal buffer?
if (_bufferPosition == 0 && tempLen == _writeBufferSize)
{
using (var wait = new AutoResetEvent(initialState: false))
{
_session.RequestWrite(_handle, (ulong)_position, buffer, offset, tempLen, wait);
}
}
else
{
// No: copy the data to the write buffer first.
Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen);
_bufferPosition += tempLen;
}
// Advance the buffer and stream positions.
_position += tempLen;
offset += tempLen;
count -= tempLen;
}
// If the buffer is full, then do a speculative flush now,
// rather than waiting for the next call to this method.
if (_bufferPosition >= _writeBufferSize)
{
using (var wait = new AutoResetEvent(initialState: false))
{
_session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, wait);
}
_bufferPosition = 0;
}
}
}
///
/// Asynchronously writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
///
/// An array of bytes. This method copies bytes from to the current stream.
/// The zero-based byte offset in at which to begin copying bytes to the current stream.
/// The number of bytes to be written to the current stream.
/// The to observe.
/// A that represents the asynchronous write operation.
/// The sum of and is greater than the buffer length.
/// is .
/// or is negative.
/// An I/O error occurs.
/// The stream does not support writing.
/// Methods were called after the stream was closed.
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ThrowHelper.ThrowIfNull(buffer);
#if NET8_0_OR_GREATER
ArgumentOutOfRangeException.ThrowIfNegative(offset);
ArgumentOutOfRangeException.ThrowIfNegative(count);
#else
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count));
}
#endif
if ((buffer.Length - offset) < count)
{
throw new ArgumentException("Invalid array range.");
}
CheckSessionIsOpen();
// Setup this object for writing.
SetupWrite();
// Write data to the file stream.
while (count > 0)
{
// Determine how many bytes we can write to the buffer.
var tempLen = _writeBufferSize - _bufferPosition;
if (tempLen <= 0)
{
// flush write buffer, and mark it empty
await FlushWriteBufferAsync(cancellationToken).ConfigureAwait(false);
// we can now write or buffer the full buffer size
tempLen = _writeBufferSize;
}
// limit the number of bytes to write to the actual number of bytes requested
if (tempLen > count)
{
tempLen = count;
}
// Can we short-cut the internal buffer?
if (_bufferPosition == 0 && tempLen == _writeBufferSize)
{
await _session.RequestWriteAsync(_handle, (ulong)_position, buffer, offset, tempLen, cancellationToken).ConfigureAwait(false);
}
else
{
// No: copy the data to the write buffer first.
Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen);
_bufferPosition += tempLen;
}
// Advance the buffer and stream positions.
_position += tempLen;
offset += tempLen;
count -= tempLen;
}
// If the buffer is full, then do a speculative flush now,
// rather than waiting for the next call to this method.
if (_bufferPosition >= _writeBufferSize)
{
await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, cancellationToken).ConfigureAwait(false);
_bufferPosition = 0;
}
}
///
/// Writes a byte to the current position in the stream and advances the position within the stream by one byte.
///
/// The byte to write to the stream.
/// An I/O error occurs.
/// The stream does not support writing, or the stream is already closed.
/// Methods were called after the stream was closed.
public override void WriteByte(byte value)
{
// Lock down the file stream while we do this.
lock (_lock)
{
CheckSessionIsOpen();
// Setup the object for writing.
SetupWrite();
var writeBuffer = GetOrCreateWriteBuffer();
// Flush the current buffer if it is full.
if (_bufferPosition >= _writeBufferSize)
{
using (var wait = new AutoResetEvent(initialState: false))
{
_session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), writeBuffer, 0, _bufferPosition, wait);
}
_bufferPosition = 0;
}
// Write the byte into the buffer and advance the posn.
writeBuffer[_bufferPosition++] = value;
++_position;
}
}
///
/// Releases the unmanaged resources used by the and optionally releases the managed resources.
///
/// to release both managed and unmanaged resources; to release only unmanaged resources.
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (_session != null)
{
if (disposing)
{
lock (_lock)
{
if (_session != null)
{
_canRead = false;
_canSeek = false;
_canWrite = false;
if (_handle != null)
{
if (_session.IsOpen)
{
if (_bufferOwnedByWrite)
{
FlushWriteBuffer();
}
_session.RequestClose(_handle);
}
_handle = null;
}
_session = null;
}
}
}
}
}
private byte[] GetOrCreateReadBuffer()
{
_readBuffer ??= new byte[_readBufferSize];
return _readBuffer;
}
private byte[] GetOrCreateWriteBuffer()
{
_writeBuffer ??= new byte[_writeBufferSize];
return _writeBuffer;
}
///
/// Flushes the read data from the buffer.
///
private void FlushReadBuffer()
{
_bufferPosition = 0;
_bufferLen = 0;
}
///
/// Flush any buffered write data to the file.
///
private void FlushWriteBuffer()
{
if (_bufferPosition > 0)
{
using (var wait = new AutoResetEvent(initialState: false))
{
_session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, wait);
}
_bufferPosition = 0;
}
}
private async Task FlushWriteBufferAsync(CancellationToken cancellationToken)
{
if (_bufferPosition > 0)
{
await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, cancellationToken).ConfigureAwait(false);
_bufferPosition = 0;
}
}
///
/// Setups the read.
///
private void SetupRead()
{
if (!CanRead)
{
throw new NotSupportedException("Read not supported.");
}
if (_bufferOwnedByWrite)
{
FlushWriteBuffer();
_bufferOwnedByWrite = false;
}
}
///
/// Setups the write.
///
private void SetupWrite()
{
if (!CanWrite)
{
throw new NotSupportedException("Write not supported.");
}
if (!_bufferOwnedByWrite)
{
FlushReadBuffer();
_bufferOwnedByWrite = true;
}
}
private void CheckSessionIsOpen()
{
ThrowHelper.ThrowObjectDisposedIf(_session is null, this);
if (!_session.IsOpen)
{
throw new ObjectDisposedException(GetType().FullName, "Cannot access a closed SFTP session.");
}
}
}
}