|
|
@@ -1,145 +1,90 @@
|
|
|
-using System;
|
|
|
+#nullable enable
|
|
|
+using System;
|
|
|
+using System.ComponentModel;
|
|
|
using System.Diagnostics;
|
|
|
-using System.Diagnostics.CodeAnalysis;
|
|
|
-using System.Globalization;
|
|
|
using System.IO;
|
|
|
using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
+using Microsoft.Extensions.Logging;
|
|
|
+
|
|
|
using Renci.SshNet.Common;
|
|
|
|
|
|
namespace Renci.SshNet.Sftp
|
|
|
{
|
|
|
/// <summary>
|
|
|
- /// Exposes a <see cref="Stream"/> around a remote SFTP file, supporting both synchronous and asynchronous read and write operations.
|
|
|
+ /// Exposes a <see cref="Stream"/> around a remote SFTP file, supporting
|
|
|
+ /// both synchronous and asynchronous read and write operations.
|
|
|
/// </summary>
|
|
|
- /// <threadsafety static="true" instance="false"/>
|
|
|
-#pragma warning disable IDE0079 // We intentionally want to suppress the below warning.
|
|
|
- [SuppressMessage("Performance", "CA1844: Provide memory-based overrides of async methods when subclassing 'Stream'", Justification = "TODO: This should be addressed in the future.")]
|
|
|
-#pragma warning restore IDE0079
|
|
|
- public sealed class SftpFileStream : Stream
|
|
|
+ public sealed partial class SftpFileStream : Stream
|
|
|
{
|
|
|
- private readonly Lock _lock = new Lock();
|
|
|
+ private const int MaxPendingReads = 100;
|
|
|
+
|
|
|
+ private readonly ISftpSession _session;
|
|
|
+ private readonly FileAccess _access;
|
|
|
+ private readonly bool _canSeek;
|
|
|
private readonly int _readBufferSize;
|
|
|
- private readonly int _writeBufferSize;
|
|
|
|
|
|
- // Internal state.
|
|
|
- private byte[] _handle;
|
|
|
- private ISftpSession _session;
|
|
|
+ private SftpFileReader? _sftpFileReader;
|
|
|
+ private ReadOnlyMemory<byte> _readBuffer;
|
|
|
+ private System.Net.ArrayBuffer _writeBuffer;
|
|
|
|
|
|
- // Buffer information.
|
|
|
- private byte[] _readBuffer;
|
|
|
- private byte[] _writeBuffer;
|
|
|
- private int _bufferPosition;
|
|
|
- private int _bufferLen;
|
|
|
private long _position;
|
|
|
- private bool _bufferOwnedByWrite;
|
|
|
- private bool _canRead;
|
|
|
- private bool _canSeek;
|
|
|
- private bool _canWrite;
|
|
|
private TimeSpan _timeout;
|
|
|
+ private bool _disposed;
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Gets a value indicating whether the current stream supports reading.
|
|
|
- /// </summary>
|
|
|
- /// <value>
|
|
|
- /// <see langword="true"/> if the stream supports reading; otherwise, <see langword="false"/>.
|
|
|
- /// </value>
|
|
|
+ /// <inheritdoc/>
|
|
|
public override bool CanRead
|
|
|
{
|
|
|
- get { return _canRead; }
|
|
|
+ get { return !_disposed && (_access & FileAccess.Read) == FileAccess.Read; }
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Gets a value indicating whether the current stream supports seeking.
|
|
|
- /// </summary>
|
|
|
- /// <value>
|
|
|
- /// <see langword="true"/> if the stream supports seeking; otherwise, <see langword="false"/>.
|
|
|
- /// </value>
|
|
|
+ /// <inheritdoc/>
|
|
|
public override bool CanSeek
|
|
|
{
|
|
|
- get { return _canSeek; }
|
|
|
+ get { return !_disposed && _canSeek; }
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Gets a value indicating whether the current stream supports writing.
|
|
|
- /// </summary>
|
|
|
- /// <value>
|
|
|
- /// <see langword="true"/> if the stream supports writing; otherwise, <see langword="false"/>.
|
|
|
- /// </value>
|
|
|
+ /// <inheritdoc/>
|
|
|
public override bool CanWrite
|
|
|
{
|
|
|
- get { return _canWrite; }
|
|
|
+ get { return !_disposed && (_access & FileAccess.Write) == FileAccess.Write; }
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Gets a value indicating whether timeout properties are usable for <see cref="SftpFileStream"/>.
|
|
|
/// </summary>
|
|
|
/// <value>
|
|
|
- /// <see langword="true"/> in all cases.
|
|
|
+ /// <see langword="false"/> in all cases.
|
|
|
/// </value>
|
|
|
public override bool CanTimeout
|
|
|
{
|
|
|
- get { return true; }
|
|
|
+ get { return false; }
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Gets the length in bytes of the stream.
|
|
|
- /// </summary>
|
|
|
- /// <value>A long value representing the length of the stream in bytes.</value>
|
|
|
- /// <exception cref="NotSupportedException">A class derived from Stream does not support seeking. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed. </exception>
|
|
|
- /// <exception cref="IOException">IO operation failed. </exception>
|
|
|
+ /// <inheritdoc/>
|
|
|
public override long Length
|
|
|
{
|
|
|
get
|
|
|
{
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
+ ThrowIfNotSeekable();
|
|
|
|
|
|
- if (!CanSeek)
|
|
|
- {
|
|
|
- throw new NotSupportedException("Seek operation is not supported.");
|
|
|
- }
|
|
|
+ Flush();
|
|
|
|
|
|
- // Flush the write buffer, because it may
|
|
|
- // affect the length of the stream.
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
- {
|
|
|
- FlushWriteBuffer();
|
|
|
- }
|
|
|
+ var size = _session.RequestFStat(Handle).Size;
|
|
|
|
|
|
- // obtain file attributes
|
|
|
- var attributes = _session.RequestFStat(_handle, nullOnError: true);
|
|
|
- if (attributes != null)
|
|
|
- {
|
|
|
- return attributes.Size;
|
|
|
- }
|
|
|
+ Debug.Assert(size >= 0, "fstat should return size as checked in ctor");
|
|
|
|
|
|
- throw new IOException("Seek operation failed.");
|
|
|
- }
|
|
|
+ return size;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Gets or sets the position within the current stream.
|
|
|
- /// </summary>
|
|
|
- /// <value>The current position within the stream.</value>
|
|
|
- /// <exception cref="IOException">An I/O error occurs. </exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support seeking. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed. </exception>
|
|
|
+ /// <inheritdoc/>
|
|
|
public override long Position
|
|
|
{
|
|
|
get
|
|
|
{
|
|
|
- CheckSessionIsOpen();
|
|
|
-
|
|
|
- if (!CanSeek)
|
|
|
- {
|
|
|
- throw new NotSupportedException("Seek operation not supported.");
|
|
|
- }
|
|
|
+ ThrowIfNotSeekable();
|
|
|
|
|
|
return _position;
|
|
|
}
|
|
|
@@ -155,7 +100,7 @@ namespace Renci.SshNet.Sftp
|
|
|
/// <value>
|
|
|
/// The name of the path that was used to construct the current <see cref="SftpFileStream"/>.
|
|
|
/// </value>
|
|
|
- public string Name { get; private set; }
|
|
|
+ public string Name { get; }
|
|
|
|
|
|
/// <summary>
|
|
|
/// Gets the operating system file handle for the file that the current <see cref="SftpFileStream"/> encapsulates.
|
|
|
@@ -163,14 +108,7 @@ namespace Renci.SshNet.Sftp
|
|
|
/// <value>
|
|
|
/// The operating system file handle for the file that the current <see cref="SftpFileStream"/> encapsulates.
|
|
|
/// </value>
|
|
|
- public byte[] Handle
|
|
|
- {
|
|
|
- get
|
|
|
- {
|
|
|
- Flush();
|
|
|
- return _handle;
|
|
|
- }
|
|
|
- }
|
|
|
+ public byte[] Handle { get; }
|
|
|
|
|
|
/// <summary>
|
|
|
/// Gets or sets the operation timeout.
|
|
|
@@ -178,6 +116,7 @@ namespace Renci.SshNet.Sftp
|
|
|
/// <value>
|
|
|
/// The timeout.
|
|
|
/// </value>
|
|
|
+ [EditorBrowsable(EditorBrowsableState.Never)] // Unused
|
|
|
public TimeSpan Timeout
|
|
|
{
|
|
|
get
|
|
|
@@ -192,33 +131,63 @@ namespace Renci.SshNet.Sftp
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private SftpFileStream(ISftpSession session, string path, FileAccess access, int readBufferSize, int writeBufferSize, byte[] handle, long position)
|
|
|
+ private SftpFileStream(
|
|
|
+ ISftpSession session,
|
|
|
+ string path,
|
|
|
+ FileAccess access,
|
|
|
+ bool canSeek,
|
|
|
+ int readBufferSize,
|
|
|
+ int writeBufferSize,
|
|
|
+ byte[] handle,
|
|
|
+ long position,
|
|
|
+ SftpFileReader? initialReader)
|
|
|
{
|
|
|
Timeout = TimeSpan.FromSeconds(30);
|
|
|
Name = path;
|
|
|
|
|
|
_session = session;
|
|
|
- _canRead = (access & FileAccess.Read) == FileAccess.Read;
|
|
|
- _canSeek = true;
|
|
|
- _canWrite = (access & FileAccess.Write) == FileAccess.Write;
|
|
|
+ _access = access;
|
|
|
+ _canSeek = canSeek;
|
|
|
|
|
|
- _handle = handle;
|
|
|
+ Handle = handle;
|
|
|
_readBufferSize = readBufferSize;
|
|
|
- _writeBufferSize = writeBufferSize;
|
|
|
_position = position;
|
|
|
+ _writeBuffer = new System.Net.ArrayBuffer(writeBufferSize);
|
|
|
+ _sftpFileReader = initialReader;
|
|
|
}
|
|
|
|
|
|
- internal static SftpFileStream Open(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize)
|
|
|
+ internal static SftpFileStream Open(
|
|
|
+ ISftpSession? session,
|
|
|
+ string path,
|
|
|
+ FileMode mode,
|
|
|
+ FileAccess access,
|
|
|
+ int bufferSize,
|
|
|
+ bool isDownloadFile = false)
|
|
|
{
|
|
|
- return Open(session, path, mode, access, bufferSize, isAsync: false, CancellationToken.None).GetAwaiter().GetResult();
|
|
|
+ return Open(session, path, mode, access, bufferSize, isDownloadFile, isAsync: false, CancellationToken.None).GetAwaiter().GetResult();
|
|
|
}
|
|
|
|
|
|
- internal static Task<SftpFileStream> OpenAsync(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken)
|
|
|
+ internal static Task<SftpFileStream> OpenAsync(
|
|
|
+ ISftpSession? session,
|
|
|
+ string path,
|
|
|
+ FileMode mode,
|
|
|
+ FileAccess access,
|
|
|
+ int bufferSize,
|
|
|
+ CancellationToken cancellationToken,
|
|
|
+ bool isDownloadFile = false)
|
|
|
{
|
|
|
- return Open(session, path, mode, access, bufferSize, isAsync: true, cancellationToken);
|
|
|
+ return Open(session, path, mode, access, bufferSize, isDownloadFile, isAsync: true, cancellationToken);
|
|
|
}
|
|
|
|
|
|
- private static async Task<SftpFileStream> Open(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, bool isAsync, CancellationToken cancellationToken)
|
|
|
+ private static async Task<SftpFileStream> Open(
|
|
|
+ ISftpSession? session,
|
|
|
+ string path,
|
|
|
+ FileMode mode,
|
|
|
+ FileAccess access,
|
|
|
+ int bufferSize,
|
|
|
+ bool isDownloadFile,
|
|
|
+ bool isAsync,
|
|
|
+ CancellationToken cancellationToken)
|
|
|
{
|
|
|
Debug.Assert(isAsync || cancellationToken == default);
|
|
|
|
|
|
@@ -234,44 +203,27 @@ namespace Renci.SshNet.Sftp
|
|
|
throw new SshConnectionException("Client not connected.");
|
|
|
}
|
|
|
|
|
|
- var flags = Flags.None;
|
|
|
-
|
|
|
- switch (access)
|
|
|
+ var flags = access switch
|
|
|
{
|
|
|
- case FileAccess.Read:
|
|
|
- flags |= Flags.Read;
|
|
|
- break;
|
|
|
- case FileAccess.Write:
|
|
|
- flags |= Flags.Write;
|
|
|
- break;
|
|
|
- case FileAccess.ReadWrite:
|
|
|
- flags |= Flags.Read;
|
|
|
- flags |= Flags.Write;
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new ArgumentOutOfRangeException(nameof(access));
|
|
|
- }
|
|
|
+ FileAccess.Read => Flags.Read,
|
|
|
+ FileAccess.Write => Flags.Write,
|
|
|
+ FileAccess.ReadWrite => Flags.Read | Flags.Write,
|
|
|
+ _ => throw new ArgumentOutOfRangeException(nameof(access))
|
|
|
+ };
|
|
|
|
|
|
- if ((access & FileAccess.Read) == FileAccess.Read && mode == FileMode.Append)
|
|
|
+ if (mode == FileMode.Append && access != FileAccess.Write)
|
|
|
{
|
|
|
- throw new ArgumentException(string.Format(CultureInfo.InvariantCulture,
|
|
|
- "{0} mode can be requested only when combined with write-only access.",
|
|
|
- mode.ToString("G")),
|
|
|
- nameof(mode));
|
|
|
+ throw new ArgumentException(
|
|
|
+ "Append mode can be requested only with write-only access.",
|
|
|
+ nameof(access));
|
|
|
}
|
|
|
|
|
|
- if ((access & FileAccess.Write) != FileAccess.Write)
|
|
|
+ if (access == FileAccess.Read &&
|
|
|
+ mode is FileMode.Create or FileMode.CreateNew or FileMode.Truncate or FileMode.Append)
|
|
|
{
|
|
|
- if (mode is FileMode.Create or FileMode.CreateNew or FileMode.Truncate or FileMode.Append)
|
|
|
- {
|
|
|
- throw new ArgumentException(string.Format(CultureInfo.InvariantCulture,
|
|
|
- "Combining {0}: {1} with {2}: {3} is invalid.",
|
|
|
- nameof(FileMode),
|
|
|
- mode,
|
|
|
- nameof(FileAccess),
|
|
|
- access),
|
|
|
- nameof(mode));
|
|
|
- }
|
|
|
+ throw new ArgumentException(
|
|
|
+ $"Combining {nameof(FileMode)}: {mode} with {nameof(FileAccess)}: {access} is invalid.",
|
|
|
+ nameof(access));
|
|
|
}
|
|
|
|
|
|
switch (mode)
|
|
|
@@ -317,869 +269,527 @@ namespace Renci.SshNet.Sftp
|
|
|
var readBufferSize = (int)session.CalculateOptimalReadLength((uint)bufferSize);
|
|
|
var writeBufferSize = (int)session.CalculateOptimalWriteLength((uint)bufferSize, handle);
|
|
|
|
|
|
- long position = 0;
|
|
|
- if (mode == FileMode.Append)
|
|
|
- {
|
|
|
- SftpFileAttributes attributes;
|
|
|
+ SftpFileAttributes? attributes;
|
|
|
|
|
|
+ try
|
|
|
+ {
|
|
|
if (isAsync)
|
|
|
{
|
|
|
attributes = await session.RequestFStatAsync(handle, cancellationToken).ConfigureAwait(false);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- attributes = session.RequestFStat(handle, nullOnError: false);
|
|
|
+ attributes = session.RequestFStat(handle);
|
|
|
}
|
|
|
+ }
|
|
|
+ catch (SshException ex)
|
|
|
+ {
|
|
|
+ session.SessionLoggerFactory.CreateLogger<SftpFileStream>().LogInformation(
|
|
|
+ ex, "fstat failed after opening {Path}. Will set CanSeek=false.", path);
|
|
|
|
|
|
- position = attributes.Size;
|
|
|
+ attributes = null;
|
|
|
}
|
|
|
|
|
|
- return new SftpFileStream(session, path, access, readBufferSize, writeBufferSize, handle, position);
|
|
|
- }
|
|
|
+ bool canSeek;
|
|
|
+ long position = 0;
|
|
|
+ SftpFileReader? initialReader = null;
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Clears all buffers for this stream and causes any buffered data to be written to the file.
|
|
|
- /// </summary>
|
|
|
- /// <exception cref="IOException">An I/O error occurs. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Stream is closed.</exception>
|
|
|
- public override void Flush()
|
|
|
- {
|
|
|
- lock (_lock)
|
|
|
+ if (attributes?.Size >= 0)
|
|
|
{
|
|
|
- CheckSessionIsOpen();
|
|
|
+ canSeek = true;
|
|
|
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
+ if (mode == FileMode.Append)
|
|
|
{
|
|
|
- FlushWriteBuffer();
|
|
|
+ position = attributes.Size;
|
|
|
}
|
|
|
- else
|
|
|
+ else if (isDownloadFile)
|
|
|
+ {
|
|
|
+ // If we are in a call to SftpClient.DownloadFile, then we know that we will read the whole file,
|
|
|
+ // so we can let there be several in-flight requests from the get go.
|
|
|
+ // This optimisation is mostly only beneficial to smaller files on higher latency connections.
|
|
|
+ // The +2 is +1 for rounding up to cover the whole file, and +1 for the final request to receive EOF.
|
|
|
+ var initialPendingReads = (int)Math.Max(1, Math.Min(MaxPendingReads, 2 + (attributes.Size / readBufferSize)));
|
|
|
+
|
|
|
+ initialReader = new(handle, session, readBufferSize, position, MaxPendingReads, (ulong)attributes.Size, initialPendingReads);
|
|
|
+ }
|
|
|
+ else if ((access & FileAccess.Read) == FileAccess.Read)
|
|
|
{
|
|
|
- FlushReadBuffer();
|
|
|
+ // The reader can use the size information to reduce in-flight requests near the expected EOF,
|
|
|
+ // so pass it in here.
|
|
|
+ initialReader = new(handle, session, readBufferSize, position, MaxPendingReads, (ulong)attributes.Size);
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// Asynchronously clears all buffers for this stream and causes any buffered data to be written to the file.
|
|
|
- /// </summary>
|
|
|
- /// <param name="cancellationToken">The <see cref="CancellationToken"/> to observe.</param>
|
|
|
- /// <returns>A <see cref="Task"/> that represents the asynchronous flush operation.</returns>
|
|
|
- /// <exception cref="IOException">An I/O error occurs. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Stream is closed.</exception>
|
|
|
- public override Task FlushAsync(CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
-
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
+ else
|
|
|
{
|
|
|
- return FlushWriteBufferAsync(cancellationToken);
|
|
|
+ // Either fstat is failing or it doesn't return the size, in which case we can't support Length,
|
|
|
+ // so CanSeek must return false.
|
|
|
+ canSeek = false;
|
|
|
}
|
|
|
|
|
|
- FlushReadBuffer();
|
|
|
-
|
|
|
- return Task.CompletedTask;
|
|
|
+ return new SftpFileStream(session, path, access, canSeek, readBufferSize, writeBufferSize, handle, position, initialReader);
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Reads a sequence of bytes from the current stream and advances the position within the stream by the
|
|
|
- /// number of bytes read.
|
|
|
- /// </summary>
|
|
|
- /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between <paramref name="offset"/> and (<paramref name="offset"/> + <paramref name="count"/> - 1) replaced by the bytes read from the current source.</param>
|
|
|
- /// <param name="offset">The zero-based byte offset in <paramref name="buffer"/> at which to begin storing the data read from the current stream.</param>
|
|
|
- /// <param name="count">The maximum number of bytes to be read from the current stream.</param>
|
|
|
- /// <returns>
|
|
|
- /// The total number of bytes read into the buffer. This can be less than the number of bytes requested
|
|
|
- /// if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
|
|
|
- /// </returns>
|
|
|
- /// <exception cref="ArgumentException">The sum of <paramref name="offset"/> and <paramref name="count"/> is larger than the buffer length.</exception>
|
|
|
- /// <exception cref="ArgumentNullException"><paramref name="buffer"/> is <see langword="null"/>. </exception>
|
|
|
- /// <exception cref="ArgumentOutOfRangeException"><paramref name="offset"/> or <paramref name="count"/> is negative.</exception>
|
|
|
- /// <exception cref="IOException">An I/O error occurs. </exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support reading. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed. </exception>
|
|
|
- /// <remarks>
|
|
|
- /// <para>
|
|
|
- /// This method attempts to read up to <paramref name="count"/> bytes. This either from the buffer, from the
|
|
|
- /// server (using one or more <c>SSH_FXP_READ</c> requests) or using a combination of both.
|
|
|
- /// </para>
|
|
|
- /// <para>
|
|
|
- /// The read loop is interrupted when either <paramref name="count"/> bytes are read, the server returns zero
|
|
|
- /// bytes (EOF) or less bytes than the read buffer size.
|
|
|
- /// </para>
|
|
|
- /// <para>
|
|
|
- /// When a server returns less number of bytes than the read buffer size, this <c>may</c> indicate that EOF has
|
|
|
- /// been reached. A subsequent (<c>SSH_FXP_READ</c>) server request is necessary to make sure EOF has effectively
|
|
|
- /// been reached. Breaking out of the read loop avoids reading from the server twice to determine EOF: once in
|
|
|
- /// the read loop, and once upon the next <see cref="Read"/> or <see cref="ReadByte"/> invocation.
|
|
|
- /// </para>
|
|
|
- /// </remarks>
|
|
|
- public override int Read(byte[] buffer, int offset, int count)
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override void Flush()
|
|
|
{
|
|
|
-#if !NET
|
|
|
- ThrowHelper.
|
|
|
-#endif
|
|
|
- ValidateBufferArguments(buffer, offset, count);
|
|
|
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
|
|
|
|
|
|
- var readLen = 0;
|
|
|
+ var writeLength = _writeBuffer.ActiveLength;
|
|
|
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
+ if (writeLength == 0)
|
|
|
{
|
|
|
- CheckSessionIsOpen();
|
|
|
-
|
|
|
- // Set up for the read operation.
|
|
|
- SetupRead();
|
|
|
-
|
|
|
- // Read data into the caller's buffer.
|
|
|
- while (count > 0)
|
|
|
- {
|
|
|
- // How much data do we have available in the buffer?
|
|
|
- var bytesAvailableInBuffer = _bufferLen - _bufferPosition;
|
|
|
- if (bytesAvailableInBuffer <= 0)
|
|
|
- {
|
|
|
- var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
|
|
|
-
|
|
|
- if (data.Length == 0)
|
|
|
- {
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = 0;
|
|
|
-
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- var bytesToWriteToCallerBuffer = count;
|
|
|
- if (bytesToWriteToCallerBuffer >= data.Length)
|
|
|
- {
|
|
|
- // write all data read to caller-provided buffer
|
|
|
- bytesToWriteToCallerBuffer = data.Length;
|
|
|
-
|
|
|
- // reset buffer since we will skip buffering
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = 0;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- // determine number of bytes that we should write into read buffer
|
|
|
- var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer;
|
|
|
-
|
|
|
- // write remaining bytes to read buffer
|
|
|
- Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer);
|
|
|
-
|
|
|
- // update position in read buffer
|
|
|
- _bufferPosition = 0;
|
|
|
-
|
|
|
- // update number of bytes in read buffer
|
|
|
- _bufferLen = bytesToWriteToReadBuffer;
|
|
|
- }
|
|
|
-
|
|
|
- // write bytes to caller-provided buffer
|
|
|
- Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer);
|
|
|
-
|
|
|
- // update stream position
|
|
|
- _position += bytesToWriteToCallerBuffer;
|
|
|
-
|
|
|
- // record total number of bytes read into caller-provided buffer
|
|
|
- readLen += bytesToWriteToCallerBuffer;
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // break out of the read loop when the server returned less than the request number of bytes
|
|
|
- // as that *may* indicate that we've reached EOF
|
|
|
- //
|
|
|
- // doing this avoids reading from server twice to determine EOF: once in the read loop, and
|
|
|
- // once upon the next Read or ReadByte invocation by the caller
|
|
|
- if (data.Length < _readBufferSize)
|
|
|
- {
|
|
|
- break;
|
|
|
- }
|
|
|
+ // Under normal usage the offset will be nonnegative, but we nevertheless
|
|
|
+ // perform a checked conversion to prevent writing to a very large offset
|
|
|
+ // in case of corruption due to e.g. invalid multithreaded usage.
|
|
|
+ var serverOffset = checked((ulong)(_position - writeLength));
|
|
|
|
|
|
- // advance offset to start writing bytes into caller-provided buffer
|
|
|
- offset += bytesToWriteToCallerBuffer;
|
|
|
+ using (var wait = new AutoResetEvent(initialState: false))
|
|
|
+ {
|
|
|
+ _session.RequestWrite(
|
|
|
+ Handle,
|
|
|
+ serverOffset,
|
|
|
+ _writeBuffer.DangerousGetUnderlyingBuffer(),
|
|
|
+ _writeBuffer.ActiveStartOffset,
|
|
|
+ writeLength,
|
|
|
+ wait);
|
|
|
|
|
|
- // update number of bytes left to read into caller-provided buffer
|
|
|
- count -= bytesToWriteToCallerBuffer;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- // limit the number of bytes to use from read buffer to the caller-request number of bytes
|
|
|
- if (bytesAvailableInBuffer > count)
|
|
|
- {
|
|
|
- bytesAvailableInBuffer = count;
|
|
|
- }
|
|
|
+ _writeBuffer.Discard(writeLength);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // copy data from read buffer to the caller-provided buffer
|
|
|
- Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer);
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override async Task FlushAsync(CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
|
|
|
|
|
|
- // update position in read buffer
|
|
|
- _bufferPosition += bytesAvailableInBuffer;
|
|
|
+ var writeLength = _writeBuffer.ActiveLength;
|
|
|
|
|
|
- // update stream position
|
|
|
- _position += bytesAvailableInBuffer;
|
|
|
+ if (writeLength == 0)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // record total number of bytes read into caller-provided buffer
|
|
|
- readLen += bytesAvailableInBuffer;
|
|
|
+ // Under normal usage the offset will be nonnegative, but we nevertheless
|
|
|
+ // perform a checked conversion to prevent writing to a very large offset
|
|
|
+ // in case of corruption due to e.g. invalid multithreaded usage.
|
|
|
+ var serverOffset = checked((ulong)(_position - writeLength));
|
|
|
|
|
|
- // advance offset to start writing bytes into caller-provided buffer
|
|
|
- offset += bytesAvailableInBuffer;
|
|
|
+ await _session.RequestWriteAsync(
|
|
|
+ Handle,
|
|
|
+ serverOffset,
|
|
|
+ _writeBuffer.DangerousGetUnderlyingBuffer(),
|
|
|
+ _writeBuffer.ActiveStartOffset,
|
|
|
+ writeLength,
|
|
|
+ cancellationToken).ConfigureAwait(false);
|
|
|
|
|
|
- // update number of bytes left to read
|
|
|
- count -= bytesAvailableInBuffer;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ _writeBuffer.Discard(writeLength);
|
|
|
+ }
|
|
|
|
|
|
- // return the number of bytes that were read to the caller.
|
|
|
- return readLen;
|
|
|
+ private void InvalidateReads()
|
|
|
+ {
|
|
|
+ _readBuffer = ReadOnlyMemory<byte>.Empty;
|
|
|
+ _sftpFileReader?.Dispose();
|
|
|
+ _sftpFileReader = null;
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Asynchronously reads a sequence of bytes from the current stream and advances the position within the stream by the
|
|
|
- /// number of bytes read.
|
|
|
- /// </summary>
|
|
|
- /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between <paramref name="offset"/> and (<paramref name="offset"/> + <paramref name="count"/> - 1) replaced by the bytes read from the current source.</param>
|
|
|
- /// <param name="offset">The zero-based byte offset in <paramref name="buffer"/> at which to begin storing the data read from the current stream.</param>
|
|
|
- /// <param name="count">The maximum number of bytes to be read from the current stream.</param>
|
|
|
- /// <param name="cancellationToken">The <see cref="CancellationToken" /> to observe.</param>
|
|
|
- /// <returns>A <see cref="Task" /> that represents the asynchronous read operation.</returns>
|
|
|
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override int Read(byte[] buffer, int offset, int count)
|
|
|
{
|
|
|
#if !NET
|
|
|
ThrowHelper.
|
|
|
#endif
|
|
|
ValidateBufferArguments(buffer, offset, count);
|
|
|
|
|
|
- cancellationToken.ThrowIfCancellationRequested();
|
|
|
-
|
|
|
- var readLen = 0;
|
|
|
-
|
|
|
- CheckSessionIsOpen();
|
|
|
+ return Read(buffer.AsSpan(offset, count));
|
|
|
+ }
|
|
|
|
|
|
- // Set up for the read operation.
|
|
|
- SetupRead();
|
|
|
+#if NET
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override int Read(Span<byte> buffer)
|
|
|
+#else
|
|
|
+ private int Read(Span<byte> buffer)
|
|
|
+#endif
|
|
|
+ {
|
|
|
+ ThrowIfNotReadable();
|
|
|
|
|
|
- // Read data into the caller's buffer.
|
|
|
- while (count > 0)
|
|
|
+ if (_readBuffer.IsEmpty)
|
|
|
{
|
|
|
- // How much data do we have available in the buffer?
|
|
|
- var bytesAvailableInBuffer = _bufferLen - _bufferPosition;
|
|
|
- if (bytesAvailableInBuffer <= 0)
|
|
|
+ if (_sftpFileReader is null)
|
|
|
{
|
|
|
- var data = await _session.RequestReadAsync(_handle, (ulong)_position, (uint)_readBufferSize, cancellationToken).ConfigureAwait(false);
|
|
|
+ Flush();
|
|
|
+ _sftpFileReader = new(Handle, _session, _readBufferSize, _position, MaxPendingReads);
|
|
|
+ }
|
|
|
|
|
|
- if (data.Length == 0)
|
|
|
- {
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = 0;
|
|
|
+ _readBuffer = _sftpFileReader.ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
|
|
|
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- var bytesToWriteToCallerBuffer = count;
|
|
|
- if (bytesToWriteToCallerBuffer >= data.Length)
|
|
|
- {
|
|
|
- // write all data read to caller-provided buffer
|
|
|
- bytesToWriteToCallerBuffer = data.Length;
|
|
|
+ if (_readBuffer.IsEmpty)
|
|
|
+ {
|
|
|
+ // If we've hit EOF then throw away this reader instance.
|
|
|
+ // If Read is called again we will create a new reader.
|
|
|
+ // This takes care of the case when a file is expanding
|
|
|
+ // during reading.
|
|
|
+ _sftpFileReader.Dispose();
|
|
|
+ _sftpFileReader = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // reset buffer since we will skip buffering
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = 0;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- // determine number of bytes that we should write into read buffer
|
|
|
- var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer;
|
|
|
+ Debug.Assert(_writeBuffer.ActiveLength == 0, "Write buffer should be empty when reading.");
|
|
|
|
|
|
- // write remaining bytes to read buffer
|
|
|
- Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer);
|
|
|
+ var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
|
|
|
|
|
|
- // update position in read buffer
|
|
|
- _bufferPosition = 0;
|
|
|
+ _readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer);
|
|
|
+ _readBuffer = _readBuffer.Slice(bytesRead);
|
|
|
|
|
|
- // update number of bytes in read buffer
|
|
|
- _bufferLen = bytesToWriteToReadBuffer;
|
|
|
- }
|
|
|
+ _position += bytesRead;
|
|
|
|
|
|
- // write bytes to caller-provided buffer
|
|
|
- Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer);
|
|
|
+ return bytesRead;
|
|
|
+ }
|
|
|
|
|
|
- // update stream position
|
|
|
- _position += bytesToWriteToCallerBuffer;
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
|
|
+ {
|
|
|
+#if !NET
|
|
|
+ ThrowHelper.
|
|
|
+#endif
|
|
|
+ ValidateBufferArguments(buffer, offset, count);
|
|
|
|
|
|
- // record total number of bytes read into caller-provided buffer
|
|
|
- readLen += bytesToWriteToCallerBuffer;
|
|
|
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
|
|
|
+ }
|
|
|
|
|
|
- // break out of the read loop when the server returned less than the request number of bytes
|
|
|
- // as that *may* indicate that we've reached EOF
|
|
|
- //
|
|
|
- // doing this avoids reading from server twice to determine EOF: once in the read loop, and
|
|
|
- // once upon the next Read or ReadByte invocation by the caller
|
|
|
- if (data.Length < _readBufferSize)
|
|
|
- {
|
|
|
- break;
|
|
|
- }
|
|
|
+#if NET
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
|
|
+#else
|
|
|
+ private async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
|
|
|
+#endif
|
|
|
+ {
|
|
|
+ ThrowIfNotReadable();
|
|
|
|
|
|
- // advance offset to start writing bytes into caller-provided buffer
|
|
|
- offset += bytesToWriteToCallerBuffer;
|
|
|
+ if (_readBuffer.IsEmpty)
|
|
|
+ {
|
|
|
+ if (_sftpFileReader is null)
|
|
|
+ {
|
|
|
+ await FlushAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
|
|
- // update number of bytes left to read into caller-provided buffer
|
|
|
- count -= bytesToWriteToCallerBuffer;
|
|
|
+ _sftpFileReader = new(Handle, _session, _readBufferSize, _position, MaxPendingReads);
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- // limit the number of bytes to use from read buffer to the caller-request number of bytes
|
|
|
- if (bytesAvailableInBuffer > count)
|
|
|
- {
|
|
|
- bytesAvailableInBuffer = count;
|
|
|
- }
|
|
|
|
|
|
- // copy data from read buffer to the caller-provided buffer
|
|
|
- Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer);
|
|
|
+ _readBuffer = await _sftpFileReader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
|
|
- // update position in read buffer
|
|
|
- _bufferPosition += bytesAvailableInBuffer;
|
|
|
+ if (_readBuffer.IsEmpty)
|
|
|
+ {
|
|
|
+ // If we've hit EOF then throw away this reader instance.
|
|
|
+ // If Read is called again we will create a new reader.
|
|
|
+ // This takes care of the case when a file is expanding
|
|
|
+ // during reading.
|
|
|
+ _sftpFileReader.Dispose();
|
|
|
+ _sftpFileReader = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // update stream position
|
|
|
- _position += bytesAvailableInBuffer;
|
|
|
+ Debug.Assert(_writeBuffer.ActiveLength == 0, "Write buffer should be empty when reading.");
|
|
|
|
|
|
- // record total number of bytes read into caller-provided buffer
|
|
|
- readLen += bytesAvailableInBuffer;
|
|
|
+ var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
|
|
|
|
|
|
- // advance offset to start writing bytes into caller-provided buffer
|
|
|
- offset += bytesAvailableInBuffer;
|
|
|
+ _readBuffer.Slice(0, bytesRead).CopyTo(buffer);
|
|
|
+ _readBuffer = _readBuffer.Slice(bytesRead);
|
|
|
|
|
|
- // update number of bytes left to read
|
|
|
- count -= bytesAvailableInBuffer;
|
|
|
- }
|
|
|
- }
|
|
|
+ _position += bytesRead;
|
|
|
|
|
|
- // return the number of bytes that were read to the caller.
|
|
|
- return readLen;
|
|
|
+ return bytesRead;
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Reads a byte from the stream and advances the position within the stream by one byte, or returns -1 if at the end of the stream.
|
|
|
- /// </summary>
|
|
|
- /// <returns>
|
|
|
- /// The unsigned byte cast to an <see cref="int"/>, or -1 if at the end of the stream.
|
|
|
- /// </returns>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support reading. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed. </exception>
|
|
|
- /// <exception cref="IOException">Read operation failed.</exception>
|
|
|
+#if NET
|
|
|
+ /// <inheritdoc/>
|
|
|
public override int ReadByte()
|
|
|
{
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
-
|
|
|
- // Setup the object for reading.
|
|
|
- SetupRead();
|
|
|
-
|
|
|
- byte[] readBuffer;
|
|
|
-
|
|
|
- // Read more data into the internal buffer if necessary.
|
|
|
- if (_bufferPosition >= _bufferLen)
|
|
|
- {
|
|
|
- var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
|
|
|
- if (data.Length == 0)
|
|
|
- {
|
|
|
- // We've reached EOF.
|
|
|
- return -1;
|
|
|
- }
|
|
|
+ byte b = default;
|
|
|
+ var read = Read(new Span<byte>(ref b));
|
|
|
+ return read == 0 ? -1 : b;
|
|
|
+ }
|
|
|
+#endif
|
|
|
|
|
|
- readBuffer = GetOrCreateReadBuffer();
|
|
|
- Buffer.BlockCopy(data, 0, readBuffer, 0, data.Length);
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
|
|
|
+ {
|
|
|
+ return TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count), callback, state);
|
|
|
+ }
|
|
|
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = data.Length;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- readBuffer = GetOrCreateReadBuffer();
|
|
|
- }
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override int EndRead(IAsyncResult asyncResult)
|
|
|
+ {
|
|
|
+ return TaskToAsyncResult.End<int>(asyncResult);
|
|
|
+ }
|
|
|
|
|
|
- // Extract the next byte from the buffer.
|
|
|
- ++_position;
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override void Write(byte[] buffer, int offset, int count)
|
|
|
+ {
|
|
|
+#if !NET
|
|
|
+ ThrowHelper.
|
|
|
+#endif
|
|
|
+ ValidateBufferArguments(buffer, offset, count);
|
|
|
|
|
|
- return readBuffer[_bufferPosition++];
|
|
|
- }
|
|
|
+ Write(buffer.AsSpan(offset, count));
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Sets the position within the current stream.
|
|
|
- /// </summary>
|
|
|
- /// <param name="offset">A byte offset relative to the <paramref name="origin"/> parameter.</param>
|
|
|
- /// <param name="origin">A value of type <see cref="SeekOrigin"/> indicating the reference point used to obtain the new position.</param>
|
|
|
- /// <returns>
|
|
|
- /// The new position within the current stream.
|
|
|
- /// </returns>
|
|
|
- /// <exception cref="IOException">An I/O error occurs. </exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support seeking, such as if the stream is constructed from a pipe or console output. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed. </exception>
|
|
|
- public override long Seek(long offset, SeekOrigin origin)
|
|
|
+#if NET
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override void Write(ReadOnlySpan<byte> buffer)
|
|
|
+#else
|
|
|
+ private void Write(ReadOnlySpan<byte> buffer)
|
|
|
+#endif
|
|
|
{
|
|
|
- long newPosn;
|
|
|
+ ThrowIfNotWriteable();
|
|
|
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
+ InvalidateReads();
|
|
|
|
|
|
- if (!CanSeek)
|
|
|
- {
|
|
|
- throw new NotSupportedException("Seek is not supported.");
|
|
|
- }
|
|
|
-
|
|
|
- // Don't do anything if the position won't be moving.
|
|
|
- if (origin == SeekOrigin.Begin && offset == _position)
|
|
|
- {
|
|
|
- return offset;
|
|
|
- }
|
|
|
+ while (!buffer.IsEmpty)
|
|
|
+ {
|
|
|
+ var byteCount = Math.Min(buffer.Length, _writeBuffer.AvailableLength);
|
|
|
|
|
|
- if (origin == SeekOrigin.Current && offset == 0)
|
|
|
- {
|
|
|
- return _position;
|
|
|
- }
|
|
|
+ buffer.Slice(0, byteCount).CopyTo(_writeBuffer.AvailableSpan);
|
|
|
|
|
|
- // The behaviour depends upon the read/write mode.
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
- {
|
|
|
- // Flush the write buffer and then seek.
|
|
|
- FlushWriteBuffer();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- // Determine if the seek is to somewhere inside
|
|
|
- // the current read buffer bounds.
|
|
|
- if (origin == SeekOrigin.Begin)
|
|
|
- {
|
|
|
- newPosn = _position - _bufferPosition;
|
|
|
- if (offset >= newPosn && offset < (newPosn + _bufferLen))
|
|
|
- {
|
|
|
- _bufferPosition = (int)(offset - newPosn);
|
|
|
- _position = offset;
|
|
|
- return _position;
|
|
|
- }
|
|
|
- }
|
|
|
- else if (origin == SeekOrigin.Current)
|
|
|
- {
|
|
|
- newPosn = _position + offset;
|
|
|
- if (newPosn >= (_position - _bufferPosition) &&
|
|
|
- newPosn < (_position - _bufferPosition + _bufferLen))
|
|
|
- {
|
|
|
- _bufferPosition = (int)(newPosn - (_position - _bufferPosition));
|
|
|
- _position = newPosn;
|
|
|
- return _position;
|
|
|
- }
|
|
|
- }
|
|
|
+ buffer = buffer.Slice(byteCount);
|
|
|
|
|
|
- // Abandon the read buffer.
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = 0;
|
|
|
- }
|
|
|
+ _writeBuffer.Commit(byteCount);
|
|
|
|
|
|
- // Seek to the new position.
|
|
|
- switch (origin)
|
|
|
- {
|
|
|
- case SeekOrigin.Begin:
|
|
|
- newPosn = offset;
|
|
|
- break;
|
|
|
- case SeekOrigin.Current:
|
|
|
- newPosn = _position + offset;
|
|
|
- break;
|
|
|
- case SeekOrigin.End:
|
|
|
- var attributes = _session.RequestFStat(_handle, nullOnError: false);
|
|
|
- newPosn = attributes.Size + offset;
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new ArgumentException("Invalid seek origin.", nameof(origin));
|
|
|
- }
|
|
|
+ _position += byteCount;
|
|
|
|
|
|
- if (newPosn < 0)
|
|
|
+ if (_writeBuffer.AvailableLength == 0)
|
|
|
{
|
|
|
- throw new EndOfStreamException();
|
|
|
+ Flush();
|
|
|
}
|
|
|
-
|
|
|
- _position = newPosn;
|
|
|
- return _position;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Sets the length of the current stream.
|
|
|
- /// </summary>
|
|
|
- /// <param name="value">The desired length of the current stream in bytes.</param>
|
|
|
- /// <exception cref="IOException">An I/O error occurs.</exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support both writing and seeking.</exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
|
|
|
- /// <exception cref="ArgumentOutOfRangeException"><paramref name="value"/> must be greater than zero.</exception>
|
|
|
- /// <remarks>
|
|
|
- /// <para>
|
|
|
- /// Buffers are first flushed.
|
|
|
- /// </para>
|
|
|
- /// <para>
|
|
|
- /// If the specified value is less than the current length of the stream, the stream is truncated and - if the
|
|
|
- /// current position is greater than the new length - the current position is moved to the last byte of the stream.
|
|
|
- /// </para>
|
|
|
- /// <para>
|
|
|
- /// If the given value is greater than the current length of the stream, the stream is expanded and the current
|
|
|
- /// position remains the same.
|
|
|
- /// </para>
|
|
|
- /// </remarks>
|
|
|
- public override void SetLength(long value)
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override void WriteByte(byte value)
|
|
|
{
|
|
|
- ThrowHelper.ThrowIfNegative(value);
|
|
|
-
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
-
|
|
|
- if (!CanSeek)
|
|
|
- {
|
|
|
- throw new NotSupportedException("Seek is not supported.");
|
|
|
- }
|
|
|
-
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
- {
|
|
|
- FlushWriteBuffer();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- SetupWrite();
|
|
|
- }
|
|
|
-
|
|
|
- var attributes = _session.RequestFStat(_handle, nullOnError: false);
|
|
|
- attributes.Size = value;
|
|
|
- _session.RequestFSetStat(_handle, attributes);
|
|
|
-
|
|
|
- if (_position > value)
|
|
|
- {
|
|
|
- _position = value;
|
|
|
- }
|
|
|
- }
|
|
|
+ Write([value]);
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
|
|
|
- /// </summary>
|
|
|
- /// <param name="buffer">An array of bytes. This method copies <paramref name="count"/> bytes from <paramref name="buffer"/> to the current stream.</param>
|
|
|
- /// <param name="offset">The zero-based byte offset in <paramref name="buffer"/> at which to begin copying bytes to the current stream.</param>
|
|
|
- /// <param name="count">The number of bytes to be written to the current stream.</param>
|
|
|
- /// <exception cref="ArgumentException">The sum of <paramref name="offset"/> and <paramref name="count"/> is greater than the buffer length.</exception>
|
|
|
- /// <exception cref="ArgumentNullException"><paramref name="buffer"/> is <see langword="null"/>.</exception>
|
|
|
- /// <exception cref="ArgumentOutOfRangeException"><paramref name="offset"/> or <paramref name="count"/> is negative.</exception>
|
|
|
- /// <exception cref="IOException">An I/O error occurs.</exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support writing.</exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
|
|
|
- public override void Write(byte[] buffer, int offset, int count)
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
|
|
{
|
|
|
#if !NET
|
|
|
ThrowHelper.
|
|
|
#endif
|
|
|
ValidateBufferArguments(buffer, offset, count);
|
|
|
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
+ return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
|
|
|
+ }
|
|
|
|
|
|
- // Setup this object for writing.
|
|
|
- SetupWrite();
|
|
|
+#if NET
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
|
|
+#else
|
|
|
+ private async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
|
|
|
+#endif
|
|
|
+ {
|
|
|
+ ThrowIfNotWriteable();
|
|
|
|
|
|
- // Write data to the file stream.
|
|
|
- while (count > 0)
|
|
|
- {
|
|
|
- // Determine how many bytes we can write to the buffer.
|
|
|
- var tempLen = _writeBufferSize - _bufferPosition;
|
|
|
- if (tempLen <= 0)
|
|
|
- {
|
|
|
- // flush write buffer, and mark it empty
|
|
|
- FlushWriteBuffer();
|
|
|
+ InvalidateReads();
|
|
|
|
|
|
- // we can now write or buffer the full buffer size
|
|
|
- tempLen = _writeBufferSize;
|
|
|
- }
|
|
|
+ while (!buffer.IsEmpty)
|
|
|
+ {
|
|
|
+ var byteCount = Math.Min(buffer.Length, _writeBuffer.AvailableLength);
|
|
|
|
|
|
- // limit the number of bytes to write to the actual number of bytes requested
|
|
|
- if (tempLen > count)
|
|
|
- {
|
|
|
- tempLen = count;
|
|
|
- }
|
|
|
+ buffer.Slice(0, byteCount).CopyTo(_writeBuffer.AvailableMemory);
|
|
|
|
|
|
- // Can we short-cut the internal buffer?
|
|
|
- if (_bufferPosition == 0 && tempLen == _writeBufferSize)
|
|
|
- {
|
|
|
- using (var wait = new AutoResetEvent(initialState: false))
|
|
|
- {
|
|
|
- _session.RequestWrite(_handle, (ulong)_position, buffer, offset, tempLen, wait);
|
|
|
- }
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- // No: copy the data to the write buffer first.
|
|
|
- Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen);
|
|
|
- _bufferPosition += tempLen;
|
|
|
- }
|
|
|
+ buffer = buffer.Slice(byteCount);
|
|
|
|
|
|
- // Advance the buffer and stream positions.
|
|
|
- _position += tempLen;
|
|
|
- offset += tempLen;
|
|
|
- count -= tempLen;
|
|
|
- }
|
|
|
+ _writeBuffer.Commit(byteCount);
|
|
|
|
|
|
- // If the buffer is full, then do a speculative flush now,
|
|
|
- // rather than waiting for the next call to this method.
|
|
|
- if (_bufferPosition >= _writeBufferSize)
|
|
|
- {
|
|
|
- using (var wait = new AutoResetEvent(initialState: false))
|
|
|
- {
|
|
|
- _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, wait);
|
|
|
- }
|
|
|
+ _position += byteCount;
|
|
|
|
|
|
- _bufferPosition = 0;
|
|
|
+ if (_writeBuffer.AvailableLength == 0)
|
|
|
+ {
|
|
|
+ await FlushAsync(cancellationToken).ConfigureAwait(false);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Asynchronously writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
|
|
|
- /// </summary>
|
|
|
- /// <param name="buffer">An array of bytes. This method copies <paramref name="count"/> bytes from <paramref name="buffer"/> to the current stream.</param>
|
|
|
- /// <param name="offset">The zero-based byte offset in <paramref name="buffer"/> at which to begin copying bytes to the current stream.</param>
|
|
|
- /// <param name="count">The number of bytes to be written to the current stream.</param>
|
|
|
- /// <param name="cancellationToken">The <see cref="CancellationToken"/> to observe.</param>
|
|
|
- /// <returns>A <see cref="Task"/> that represents the asynchronous write operation.</returns>
|
|
|
- /// <exception cref="ArgumentException">The sum of <paramref name="offset"/> and <paramref name="count"/> is greater than the buffer length.</exception>
|
|
|
- /// <exception cref="ArgumentNullException"><paramref name="buffer"/> is <see langword="null"/>.</exception>
|
|
|
- /// <exception cref="ArgumentOutOfRangeException"><paramref name="offset"/> or <paramref name="count"/> is negative.</exception>
|
|
|
- /// <exception cref="IOException">An I/O error occurs.</exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support writing.</exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
|
|
|
- public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
|
|
|
{
|
|
|
-#if !NET
|
|
|
- ThrowHelper.
|
|
|
-#endif
|
|
|
- ValidateBufferArguments(buffer, offset, count);
|
|
|
+ return TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count), callback, state);
|
|
|
+ }
|
|
|
|
|
|
- cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override void EndWrite(IAsyncResult asyncResult)
|
|
|
+ {
|
|
|
+ TaskToAsyncResult.End(asyncResult);
|
|
|
+ }
|
|
|
|
|
|
- CheckSessionIsOpen();
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override long Seek(long offset, SeekOrigin origin)
|
|
|
+ {
|
|
|
+ ThrowIfNotSeekable();
|
|
|
|
|
|
- // Setup this object for writing.
|
|
|
- SetupWrite();
|
|
|
+ Flush();
|
|
|
|
|
|
- // Write data to the file stream.
|
|
|
- while (count > 0)
|
|
|
+ var newPosition = origin switch
|
|
|
{
|
|
|
- // Determine how many bytes we can write to the buffer.
|
|
|
- var tempLen = _writeBufferSize - _bufferPosition;
|
|
|
- if (tempLen <= 0)
|
|
|
- {
|
|
|
- // flush write buffer, and mark it empty
|
|
|
- await FlushWriteBufferAsync(cancellationToken).ConfigureAwait(false);
|
|
|
-
|
|
|
- // we can now write or buffer the full buffer size
|
|
|
- tempLen = _writeBufferSize;
|
|
|
- }
|
|
|
+ SeekOrigin.Begin => offset,
|
|
|
+ SeekOrigin.Current => _position + offset,
|
|
|
+ SeekOrigin.End => _session.RequestFStat(Handle).Size + offset,
|
|
|
+ _ => throw new ArgumentOutOfRangeException(nameof(origin))
|
|
|
+ };
|
|
|
|
|
|
- // limit the number of bytes to write to the actual number of bytes requested
|
|
|
- if (tempLen > count)
|
|
|
- {
|
|
|
- tempLen = count;
|
|
|
- }
|
|
|
+ if (newPosition < 0)
|
|
|
+ {
|
|
|
+ throw new IOException("An attempt was made to move the position before the beginning of the stream.");
|
|
|
+ }
|
|
|
|
|
|
- // Can we short-cut the internal buffer?
|
|
|
- if (_bufferPosition == 0 && tempLen == _writeBufferSize)
|
|
|
- {
|
|
|
- await _session.RequestWriteAsync(_handle, (ulong)_position, buffer, offset, tempLen, cancellationToken).ConfigureAwait(false);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- // No: copy the data to the write buffer first.
|
|
|
- Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen);
|
|
|
- _bufferPosition += tempLen;
|
|
|
- }
|
|
|
+ var readBufferStart = _position; // inclusive
|
|
|
+ var readBufferEnd = _position + _readBuffer.Length; // exclusive
|
|
|
|
|
|
- // Advance the buffer and stream positions.
|
|
|
- _position += tempLen;
|
|
|
- offset += tempLen;
|
|
|
- count -= tempLen;
|
|
|
+ if (readBufferStart <= newPosition && newPosition <= readBufferEnd)
|
|
|
+ {
|
|
|
+ _readBuffer = _readBuffer.Slice((int)(newPosition - readBufferStart));
|
|
|
}
|
|
|
-
|
|
|
- // If the buffer is full, then do a speculative flush now,
|
|
|
- // rather than waiting for the next call to this method.
|
|
|
- if (_bufferPosition >= _writeBufferSize)
|
|
|
+ else
|
|
|
{
|
|
|
- await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, cancellationToken).ConfigureAwait(false);
|
|
|
- _bufferPosition = 0;
|
|
|
+ InvalidateReads();
|
|
|
}
|
|
|
+
|
|
|
+ return _position = newPosition;
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Writes a byte to the current position in the stream and advances the position within the stream by one byte.
|
|
|
- /// </summary>
|
|
|
- /// <param name="value">The byte to write to the stream.</param>
|
|
|
- /// <exception cref="IOException">An I/O error occurs. </exception>
|
|
|
- /// <exception cref="NotSupportedException">The stream does not support writing, or the stream is already closed. </exception>
|
|
|
- /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed. </exception>
|
|
|
- public override void WriteByte(byte value)
|
|
|
+ /// <inheritdoc/>
|
|
|
+ public override void SetLength(long value)
|
|
|
{
|
|
|
- // Lock down the file stream while we do this.
|
|
|
- lock (_lock)
|
|
|
- {
|
|
|
- CheckSessionIsOpen();
|
|
|
-
|
|
|
- // Setup the object for writing.
|
|
|
- SetupWrite();
|
|
|
-
|
|
|
- var writeBuffer = GetOrCreateWriteBuffer();
|
|
|
+ ThrowHelper.ThrowIfNegative(value);
|
|
|
+ ThrowIfNotWriteable();
|
|
|
+ ThrowIfNotSeekable();
|
|
|
|
|
|
- // Flush the current buffer if it is full.
|
|
|
- if (_bufferPosition >= _writeBufferSize)
|
|
|
- {
|
|
|
- using (var wait = new AutoResetEvent(initialState: false))
|
|
|
- {
|
|
|
- _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), writeBuffer, 0, _bufferPosition, wait);
|
|
|
- }
|
|
|
+ Flush();
|
|
|
+ InvalidateReads();
|
|
|
|
|
|
- _bufferPosition = 0;
|
|
|
- }
|
|
|
+ var attributes = _session.RequestFStat(Handle);
|
|
|
+ attributes.Size = value;
|
|
|
+ _session.RequestFSetStat(Handle, attributes);
|
|
|
|
|
|
- // Write the byte into the buffer and advance the posn.
|
|
|
- writeBuffer[_bufferPosition++] = value;
|
|
|
- ++_position;
|
|
|
+ if (_position > value)
|
|
|
+ {
|
|
|
+ _position = value;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Releases the unmanaged resources used by the <see cref="Stream"/> and optionally releases the managed resources.
|
|
|
- /// </summary>
|
|
|
- /// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
|
|
|
+ /// <inheritdoc/>
|
|
|
protected override void Dispose(bool disposing)
|
|
|
{
|
|
|
- base.Dispose(disposing);
|
|
|
+ if (_disposed)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (_session != null)
|
|
|
+ try
|
|
|
{
|
|
|
- if (disposing)
|
|
|
+ if (disposing && _session.IsOpen)
|
|
|
{
|
|
|
- lock (_lock)
|
|
|
+ try
|
|
|
{
|
|
|
- if (_session != null)
|
|
|
+ Flush();
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ if (_session.IsOpen)
|
|
|
{
|
|
|
- _canRead = false;
|
|
|
- _canSeek = false;
|
|
|
- _canWrite = false;
|
|
|
-
|
|
|
- if (_handle != null)
|
|
|
- {
|
|
|
- if (_session.IsOpen)
|
|
|
- {
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
- {
|
|
|
- FlushWriteBuffer();
|
|
|
- }
|
|
|
-
|
|
|
- _session.RequestClose(_handle);
|
|
|
- }
|
|
|
-
|
|
|
- _handle = null;
|
|
|
- }
|
|
|
-
|
|
|
- _session = null;
|
|
|
+ _session.RequestClose(Handle);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ _disposed = true;
|
|
|
+ InvalidateReads();
|
|
|
+ base.Dispose(disposing);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private byte[] GetOrCreateReadBuffer()
|
|
|
- {
|
|
|
- _readBuffer ??= new byte[_readBufferSize];
|
|
|
- return _readBuffer;
|
|
|
- }
|
|
|
-
|
|
|
- private byte[] GetOrCreateWriteBuffer()
|
|
|
- {
|
|
|
- _writeBuffer ??= new byte[_writeBufferSize];
|
|
|
- return _writeBuffer;
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// Flushes the read data from the buffer.
|
|
|
- /// </summary>
|
|
|
- private void FlushReadBuffer()
|
|
|
+#if NET
|
|
|
+ /// <inheritdoc/>
|
|
|
+#pragma warning disable CA2215 // Dispose methods should call base class dispose
|
|
|
+ public override async ValueTask DisposeAsync()
|
|
|
+#pragma warning restore CA2215 // Dispose methods should call base class dispose
|
|
|
+#else
|
|
|
+ internal async ValueTask DisposeAsync()
|
|
|
+#endif
|
|
|
{
|
|
|
- _bufferPosition = 0;
|
|
|
- _bufferLen = 0;
|
|
|
- }
|
|
|
+ if (_disposed)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Flush any buffered write data to the file.
|
|
|
- /// </summary>
|
|
|
- private void FlushWriteBuffer()
|
|
|
- {
|
|
|
- if (_bufferPosition > 0)
|
|
|
+ try
|
|
|
{
|
|
|
- using (var wait = new AutoResetEvent(initialState: false))
|
|
|
+ if (_session.IsOpen)
|
|
|
{
|
|
|
- _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, wait);
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await FlushAsync().ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ finally
|
|
|
+ {
|
|
|
+ if (_session.IsOpen)
|
|
|
+ {
|
|
|
+ await _session.RequestCloseAsync(Handle, CancellationToken.None).ConfigureAwait(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- _bufferPosition = 0;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private async Task FlushWriteBufferAsync(CancellationToken cancellationToken)
|
|
|
- {
|
|
|
- if (_bufferPosition > 0)
|
|
|
+ finally
|
|
|
{
|
|
|
- await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, cancellationToken).ConfigureAwait(false);
|
|
|
- _bufferPosition = 0;
|
|
|
+ _disposed = true;
|
|
|
+ InvalidateReads();
|
|
|
+ base.Dispose(disposing: false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Setups the read.
|
|
|
- /// </summary>
|
|
|
- private void SetupRead()
|
|
|
+ private void ThrowIfNotSeekable()
|
|
|
{
|
|
|
- if (!CanRead)
|
|
|
+ if (!CanSeek)
|
|
|
{
|
|
|
- throw new NotSupportedException("Read not supported.");
|
|
|
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
|
|
|
+ Throw();
|
|
|
}
|
|
|
|
|
|
- if (_bufferOwnedByWrite)
|
|
|
+ static void Throw()
|
|
|
{
|
|
|
- FlushWriteBuffer();
|
|
|
- _bufferOwnedByWrite = false;
|
|
|
+ throw new NotSupportedException("Stream does not support seeking.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// Setups the write.
|
|
|
- /// </summary>
|
|
|
- private void SetupWrite()
|
|
|
+ private void ThrowIfNotWriteable()
|
|
|
{
|
|
|
if (!CanWrite)
|
|
|
{
|
|
|
- throw new NotSupportedException("Write not supported.");
|
|
|
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
|
|
|
+ Throw();
|
|
|
}
|
|
|
|
|
|
- if (!_bufferOwnedByWrite)
|
|
|
+ static void Throw()
|
|
|
{
|
|
|
- FlushReadBuffer();
|
|
|
- _bufferOwnedByWrite = true;
|
|
|
+ throw new NotSupportedException("Stream does not support writing.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void CheckSessionIsOpen()
|
|
|
+ private void ThrowIfNotReadable()
|
|
|
{
|
|
|
- ThrowHelper.ThrowObjectDisposedIf(_session is null, this);
|
|
|
+ if (!CanRead)
|
|
|
+ {
|
|
|
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
|
|
|
+ Throw();
|
|
|
+ }
|
|
|
|
|
|
- if (!_session.IsOpen)
|
|
|
+ static void Throw()
|
|
|
{
|
|
|
- throw new ObjectDisposedException(GetType().FullName, "Cannot access a closed SFTP session.");
|
|
|
+ throw new NotSupportedException("Stream does not support reading.");
|
|
|
}
|
|
|
}
|
|
|
}
|