|
|
@@ -2,11 +2,10 @@
|
|
|
using System;
|
|
|
using System.Diagnostics;
|
|
|
using System.IO;
|
|
|
-using System.Runtime.ExceptionServices;
|
|
|
using System.Text;
|
|
|
using System.Threading;
|
|
|
+using System.Threading.Tasks;
|
|
|
|
|
|
-using Renci.SshNet.Abstractions;
|
|
|
using Renci.SshNet.Channels;
|
|
|
using Renci.SshNet.Common;
|
|
|
using Renci.SshNet.Messages.Connection;
|
|
|
@@ -19,20 +18,13 @@ namespace Renci.SshNet
|
|
|
/// </summary>
|
|
|
public class SshCommand : IDisposable
|
|
|
{
|
|
|
- private static readonly object CompletedResult = new();
|
|
|
-
|
|
|
private readonly ISession _session;
|
|
|
private readonly Encoding _encoding;
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// The result of the command: an exception, <see cref="CompletedResult"/>
|
|
|
- /// or <see langword="null"/>.
|
|
|
- /// </summary>
|
|
|
- private object? _result;
|
|
|
-
|
|
|
private IChannelSession? _channel;
|
|
|
- private CommandAsyncResult? _asyncResult;
|
|
|
- private AsyncCallback? _callback;
|
|
|
+ private TaskCompletionSource<object>? _tcs;
|
|
|
+ private CancellationTokenSource? _cts;
|
|
|
+ private CancellationTokenRegistration _tokenRegistration;
|
|
|
private string? _stdOut;
|
|
|
private string? _stdErr;
|
|
|
private bool _hasError;
|
|
|
@@ -40,6 +32,17 @@ namespace Renci.SshNet
|
|
|
private ChannelInputStream? _inputStream;
|
|
|
private TimeSpan _commandTimeout;
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// The token supplied as an argument to <see cref="ExecuteAsync(CancellationToken)"/>.
|
|
|
+ /// </summary>
|
|
|
+ private CancellationToken _userToken;
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Whether <see cref="CancelAsync(bool, int)"/> has been called
|
|
|
+ /// (either by a token or manually).
|
|
|
+ /// </summary>
|
|
|
+ private bool _cancellationRequested;
|
|
|
+
|
|
|
private int _exitStatus;
|
|
|
private volatile bool _haveExitStatus; // volatile to prevent re-ordering of reads/writes of _exitStatus.
|
|
|
|
|
|
@@ -113,6 +116,30 @@ namespace Renci.SshNet
|
|
|
/// <returns>
|
|
|
/// The stream that can be used to transfer data to the command's input stream.
|
|
|
/// </returns>
|
|
|
+ /// <remarks>
|
|
|
+ /// Callers should ensure that <see cref="Stream.Dispose()"/> is called on the
|
|
|
+ /// returned instance in order to notify the command that no more data will be sent.
|
|
|
+ /// Failure to do so may result in the command executing indefinitely.
|
|
|
+ /// </remarks>
|
|
|
+ /// <example>
|
|
|
+ /// This example shows how to stream some data to 'cat' and have the server echo it back.
|
|
|
+ /// <code>
|
|
|
+ /// using (SshCommand command = mySshClient.CreateCommand("cat"))
|
|
|
+ /// {
|
|
|
+ /// Task executeTask = command.ExecuteAsync(CancellationToken.None);
|
|
|
+ ///
|
|
|
+ /// using (Stream inputStream = command.CreateInputStream())
|
|
|
+ /// {
|
|
|
+ /// inputStream.Write("Hello World!"u8);
|
|
|
+ /// }
|
|
|
+ ///
|
|
|
+ /// await executeTask;
|
|
|
+ ///
|
|
|
+ /// Console.WriteLine(command.ExitStatus); // 0
|
|
|
+ /// Console.WriteLine(command.Result); // "Hello World!"
|
|
|
+ /// }
|
|
|
+ /// </code>
|
|
|
+ /// </example>
|
|
|
public Stream CreateInputStream()
|
|
|
{
|
|
|
if (_channel == null)
|
|
|
@@ -130,7 +157,7 @@ namespace Renci.SshNet
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
- /// Gets the command execution result.
|
|
|
+ /// Gets the standard output of the command by reading <see cref="OutputStream"/>.
|
|
|
/// </summary>
|
|
|
public string Result
|
|
|
{
|
|
|
@@ -141,7 +168,7 @@ namespace Renci.SshNet
|
|
|
return _stdOut;
|
|
|
}
|
|
|
|
|
|
- if (_asyncResult is null)
|
|
|
+ if (_tcs is null)
|
|
|
{
|
|
|
return string.Empty;
|
|
|
}
|
|
|
@@ -154,7 +181,8 @@ namespace Renci.SshNet
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
- /// Gets the command execution error.
|
|
|
+ /// Gets the standard error of the command by reading <see cref="ExtendedOutputStream"/>,
|
|
|
+ /// when extended data has been sent which has been marked as stderr.
|
|
|
/// </summary>
|
|
|
public string Error
|
|
|
{
|
|
|
@@ -165,7 +193,7 @@ namespace Renci.SshNet
|
|
|
return _stdErr;
|
|
|
}
|
|
|
|
|
|
- if (_asyncResult is null || !_hasError)
|
|
|
+ if (_tcs is null || !_hasError)
|
|
|
{
|
|
|
return string.Empty;
|
|
|
}
|
|
|
@@ -211,6 +239,92 @@ namespace Renci.SshNet
|
|
|
_session.ErrorOccured += Session_ErrorOccured;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Executes the command asynchronously.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="cancellationToken">
|
|
|
+ /// The <see cref="CancellationToken"/>. When triggered, attempts to terminate the
|
|
|
+ /// remote command by sending a signal.
|
|
|
+ /// </param>
|
|
|
+ /// <returns>A <see cref="Task"/> representing the lifetime of the command.</returns>
|
|
|
+ /// <exception cref="InvalidOperationException">Command is already executing. Thrown synchronously.</exception>
|
|
|
+ /// <exception cref="ObjectDisposedException">Instance has been disposed. Thrown synchronously.</exception>
|
|
|
+ /// <exception cref="OperationCanceledException">The <see cref="Task"/> has been cancelled.</exception>
|
|
|
+ /// <exception cref="SshOperationTimeoutException">The command timed out according to <see cref="CommandTimeout"/>.</exception>
|
|
|
+#pragma warning disable CA1849 // Call async methods when in an async method; PipeStream.DisposeAsync would complete synchronously anyway.
|
|
|
+ public Task ExecuteAsync(CancellationToken cancellationToken = default)
|
|
|
+ {
|
|
|
+#if NET7_0_OR_GREATER
|
|
|
+ ObjectDisposedException.ThrowIf(_isDisposed, this);
|
|
|
+#else
|
|
|
+ if (_isDisposed)
|
|
|
+ {
|
|
|
+ throw new ObjectDisposedException(GetType().FullName);
|
|
|
+ }
|
|
|
+#endif
|
|
|
+
|
|
|
+ if (cancellationToken.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ return Task.FromCanceled(cancellationToken);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (_tcs is not null)
|
|
|
+ {
|
|
|
+ if (!_tcs.Task.IsCompleted)
|
|
|
+ {
|
|
|
+ throw new InvalidOperationException("Asynchronous operation is already in progress.");
|
|
|
+ }
|
|
|
+
|
|
|
+ OutputStream.Dispose();
|
|
|
+ ExtendedOutputStream.Dispose();
|
|
|
+
|
|
|
+ // Initialize output streams. We already initialised them for the first
|
|
|
+ // execution in the constructor (to allow passing them around before execution)
|
|
|
+ // so we just need to reinitialise them for subsequent executions.
|
|
|
+ OutputStream = new PipeStream();
|
|
|
+ ExtendedOutputStream = new PipeStream();
|
|
|
+ }
|
|
|
+
|
|
|
+ _exitStatus = default;
|
|
|
+ _haveExitStatus = false;
|
|
|
+ ExitSignal = null;
|
|
|
+ _stdOut = null;
|
|
|
+ _stdErr = null;
|
|
|
+ _hasError = false;
|
|
|
+ _tokenRegistration.Dispose();
|
|
|
+ _tokenRegistration = default;
|
|
|
+ _cts?.Dispose();
|
|
|
+ _cts = null;
|
|
|
+ _cancellationRequested = false;
|
|
|
+
|
|
|
+ _tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
+ _userToken = cancellationToken;
|
|
|
+
|
|
|
+ _channel = _session.CreateChannelSession();
|
|
|
+ _channel.DataReceived += Channel_DataReceived;
|
|
|
+ _channel.ExtendedDataReceived += Channel_ExtendedDataReceived;
|
|
|
+ _channel.RequestReceived += Channel_RequestReceived;
|
|
|
+ _channel.Closed += Channel_Closed;
|
|
|
+ _channel.Open();
|
|
|
+
|
|
|
+ _ = _channel.SendExecRequest(CommandText);
|
|
|
+
|
|
|
+ if (CommandTimeout != Timeout.InfiniteTimeSpan)
|
|
|
+ {
|
|
|
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
|
+ _cts.CancelAfter(CommandTimeout);
|
|
|
+ cancellationToken = _cts.Token;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cancellationToken.CanBeCanceled)
|
|
|
+ {
|
|
|
+ _tokenRegistration = cancellationToken.Register(static cmd => ((SshCommand)cmd!).CancelAsync(), this);
|
|
|
+ }
|
|
|
+
|
|
|
+ return _tcs.Task;
|
|
|
+ }
|
|
|
+#pragma warning restore CA1849
|
|
|
+
|
|
|
/// <summary>
|
|
|
/// Begins an asynchronous command execution.
|
|
|
/// </summary>
|
|
|
@@ -259,58 +373,7 @@ namespace Renci.SshNet
|
|
|
/// <exception cref="SshOperationTimeoutException">Operation has timed out.</exception>
|
|
|
public IAsyncResult BeginExecute(AsyncCallback? callback, object? state)
|
|
|
{
|
|
|
-#if NET7_0_OR_GREATER
|
|
|
- ObjectDisposedException.ThrowIf(_isDisposed, this);
|
|
|
-#else
|
|
|
- if (_isDisposed)
|
|
|
- {
|
|
|
- throw new ObjectDisposedException(GetType().FullName);
|
|
|
- }
|
|
|
-#endif
|
|
|
-
|
|
|
- if (_asyncResult is not null)
|
|
|
- {
|
|
|
- if (!_asyncResult.AsyncWaitHandle.WaitOne(0))
|
|
|
- {
|
|
|
- throw new InvalidOperationException("Asynchronous operation is already in progress.");
|
|
|
- }
|
|
|
-
|
|
|
- OutputStream.Dispose();
|
|
|
- ExtendedOutputStream.Dispose();
|
|
|
-
|
|
|
- // Initialize output streams. We already initialised them for the first
|
|
|
- // execution in the constructor (to allow passing them around before execution)
|
|
|
- // so we just need to reinitialise them for subsequent executions.
|
|
|
- OutputStream = new PipeStream();
|
|
|
- ExtendedOutputStream = new PipeStream();
|
|
|
- }
|
|
|
-
|
|
|
- // Create new AsyncResult object
|
|
|
- _asyncResult = new CommandAsyncResult
|
|
|
- {
|
|
|
- AsyncWaitHandle = new ManualResetEvent(initialState: false),
|
|
|
- AsyncState = state,
|
|
|
- };
|
|
|
-
|
|
|
- _exitStatus = default;
|
|
|
- _haveExitStatus = false;
|
|
|
- ExitSignal = null;
|
|
|
- _result = null;
|
|
|
- _stdOut = null;
|
|
|
- _stdErr = null;
|
|
|
- _hasError = false;
|
|
|
- _callback = callback;
|
|
|
-
|
|
|
- _channel = _session.CreateChannelSession();
|
|
|
- _channel.DataReceived += Channel_DataReceived;
|
|
|
- _channel.ExtendedDataReceived += Channel_ExtendedDataReceived;
|
|
|
- _channel.RequestReceived += Channel_RequestReceived;
|
|
|
- _channel.Closed += Channel_Closed;
|
|
|
- _channel.Open();
|
|
|
-
|
|
|
- _ = _channel.SendExecRequest(CommandText);
|
|
|
-
|
|
|
- return _asyncResult;
|
|
|
+ return TaskToAsyncResult.Begin(ExecuteAsync(), callback, state);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -340,38 +403,19 @@ namespace Renci.SshNet
|
|
|
/// Waits for the pending asynchronous command execution to complete.
|
|
|
/// </summary>
|
|
|
/// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param>
|
|
|
- /// <returns>Command execution result.</returns>
|
|
|
+ /// <returns><see cref="Result"/>.</returns>
|
|
|
/// <exception cref="ArgumentException">Either the IAsyncResult object did not come from the corresponding async method on this type, or EndExecute was called multiple times with the same IAsyncResult.</exception>
|
|
|
/// <exception cref="ArgumentNullException"><paramref name="asyncResult"/> is <see langword="null"/>.</exception>
|
|
|
public string EndExecute(IAsyncResult asyncResult)
|
|
|
{
|
|
|
- if (asyncResult is null)
|
|
|
- {
|
|
|
- throw new ArgumentNullException(nameof(asyncResult));
|
|
|
- }
|
|
|
+ var executeTask = TaskToAsyncResult.Unwrap(asyncResult);
|
|
|
|
|
|
- if (_asyncResult != asyncResult)
|
|
|
+ if (executeTask != _tcs?.Task)
|
|
|
{
|
|
|
throw new ArgumentException("Argument does not correspond to the currently executing command.", nameof(asyncResult));
|
|
|
}
|
|
|
|
|
|
- _inputStream?.Dispose();
|
|
|
-
|
|
|
- if (!_asyncResult.AsyncWaitHandle.WaitOne(CommandTimeout))
|
|
|
- {
|
|
|
- // Complete the operation with a TimeoutException (which will be thrown below).
|
|
|
- SetAsyncComplete(new SshOperationTimeoutException($"Command '{CommandText}' timed out. ({nameof(CommandTimeout)}: {CommandTimeout})."));
|
|
|
- }
|
|
|
-
|
|
|
- Debug.Assert(_asyncResult.IsCompleted);
|
|
|
-
|
|
|
- if (_result is Exception exception)
|
|
|
- {
|
|
|
- ExceptionDispatchInfo.Capture(exception).Throw();
|
|
|
- }
|
|
|
-
|
|
|
- Debug.Assert(_result == CompletedResult);
|
|
|
- Debug.Assert(!OutputStream.CanWrite, $"{nameof(OutputStream)} should have been disposed (else we will block).");
|
|
|
+ executeTask.GetAwaiter().GetResult();
|
|
|
|
|
|
return Result;
|
|
|
}
|
|
|
@@ -401,50 +445,59 @@ namespace Renci.SshNet
|
|
|
/// <exception cref="InvalidOperationException">Command has not been started.</exception>
|
|
|
public void CancelAsync(bool forceKill = false, int millisecondsTimeout = 500)
|
|
|
{
|
|
|
- if (_asyncResult is not { } asyncResult)
|
|
|
+ if (_tcs is null)
|
|
|
{
|
|
|
throw new InvalidOperationException("Command has not been started.");
|
|
|
}
|
|
|
|
|
|
- var exception = new OperationCanceledException($"Command '{CommandText}' was cancelled.");
|
|
|
-
|
|
|
- if (Interlocked.CompareExchange(ref _result, exception, comparand: null) is not null)
|
|
|
+ if (_tcs.Task.IsCompleted)
|
|
|
{
|
|
|
- // Command has already completed.
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ _cancellationRequested = true;
|
|
|
+ Interlocked.MemoryBarrier(); // ensure fresh read in SetAsyncComplete (possibly unnecessary)
|
|
|
+
|
|
|
// Try to send the cancellation signal.
|
|
|
if (_channel?.SendSignalRequest(forceKill ? "KILL" : "TERM") is null)
|
|
|
{
|
|
|
// Command has completed (in the meantime since the last check).
|
|
|
- // We won the race above and the command has finished by some other means,
|
|
|
- // but will throw the OperationCanceledException.
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// Having sent the "signal" message, we expect to receive "exit-signal"
|
|
|
// and then a close message. But since a server may not implement signals,
|
|
|
// we can't guarantee that, so we wait a short time for that to happen and
|
|
|
- // if it doesn't, just set the WaitHandle ourselves to unblock EndExecute.
|
|
|
+ // if it doesn't, just complete the task ourselves to unblock waiters.
|
|
|
|
|
|
- if (!asyncResult.AsyncWaitHandle.WaitOne(millisecondsTimeout))
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (_tcs.Task.Wait(millisecondsTimeout))
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (AggregateException)
|
|
|
{
|
|
|
- SetAsyncComplete(asyncResult);
|
|
|
+ // We expect to be here if the server implements signals.
|
|
|
+ // But we don't want to propagate the exception on the task from here.
|
|
|
+ return;
|
|
|
}
|
|
|
+
|
|
|
+ SetAsyncComplete();
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Executes command specified by <see cref="CommandText"/> property.
|
|
|
/// </summary>
|
|
|
- /// <returns>
|
|
|
- /// Command execution result.
|
|
|
- /// </returns>
|
|
|
+ /// <returns><see cref="Result"/>.</returns>
|
|
|
/// <exception cref="SshConnectionException">Client is not connected.</exception>
|
|
|
/// <exception cref="SshOperationTimeoutException">Operation has timed out.</exception>
|
|
|
public string Execute()
|
|
|
{
|
|
|
- return EndExecute(BeginExecute(callback: null, state: null));
|
|
|
+ ExecuteAsync().GetAwaiter().GetResult();
|
|
|
+
|
|
|
+ return Result;
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -465,44 +518,53 @@ namespace Renci.SshNet
|
|
|
|
|
|
private void Session_Disconnected(object? sender, EventArgs e)
|
|
|
{
|
|
|
- SetAsyncComplete(new SshConnectionException("An established connection was aborted by the software in your host machine.", DisconnectReason.ConnectionLost));
|
|
|
+ _ = _tcs?.TrySetException(new SshConnectionException("An established connection was aborted by the software in your host machine.", DisconnectReason.ConnectionLost));
|
|
|
+
|
|
|
+ SetAsyncComplete(setResult: false);
|
|
|
}
|
|
|
|
|
|
private void Session_ErrorOccured(object? sender, ExceptionEventArgs e)
|
|
|
{
|
|
|
- SetAsyncComplete(e.Exception);
|
|
|
+ _ = _tcs?.TrySetException(e.Exception);
|
|
|
+
|
|
|
+ SetAsyncComplete(setResult: false);
|
|
|
}
|
|
|
|
|
|
- private void SetAsyncComplete(object result)
|
|
|
+ private void SetAsyncComplete(bool setResult = true)
|
|
|
{
|
|
|
- _ = Interlocked.CompareExchange(ref _result, result, comparand: null);
|
|
|
+ Interlocked.MemoryBarrier(); // ensure fresh read of _cancellationRequested (possibly unnecessary)
|
|
|
|
|
|
- if (_asyncResult is CommandAsyncResult asyncResult)
|
|
|
+ if (setResult)
|
|
|
{
|
|
|
- SetAsyncComplete(asyncResult);
|
|
|
+ Debug.Assert(_tcs is not null, "Should only be completing the task if we've started one.");
|
|
|
+
|
|
|
+ if (_userToken.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ _ = _tcs.TrySetCanceled(_userToken);
|
|
|
+ }
|
|
|
+ else if (_cts?.Token.IsCancellationRequested == true)
|
|
|
+ {
|
|
|
+ _ = _tcs.TrySetException(new SshOperationTimeoutException($"Command '{CommandText}' timed out. ({nameof(CommandTimeout)}: {CommandTimeout})."));
|
|
|
+ }
|
|
|
+ else if (_cancellationRequested)
|
|
|
+ {
|
|
|
+ _ = _tcs.TrySetCanceled();
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ _ = _tcs.TrySetResult(null!);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- private void SetAsyncComplete(CommandAsyncResult asyncResult)
|
|
|
- {
|
|
|
UnsubscribeFromEventsAndDisposeChannel();
|
|
|
|
|
|
OutputStream.Dispose();
|
|
|
ExtendedOutputStream.Dispose();
|
|
|
-
|
|
|
- asyncResult.IsCompleted = true;
|
|
|
-
|
|
|
- _ = ((EventWaitHandle)asyncResult.AsyncWaitHandle).Set();
|
|
|
-
|
|
|
- if (Interlocked.Exchange(ref _callback, value: null) is AsyncCallback callback)
|
|
|
- {
|
|
|
- ThreadAbstraction.ExecuteThread(() => callback(asyncResult));
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
private void Channel_Closed(object? sender, ChannelEventArgs e)
|
|
|
{
|
|
|
- SetAsyncComplete(CompletedResult);
|
|
|
+ SetAsyncComplete();
|
|
|
}
|
|
|
|
|
|
private void Channel_RequestReceived(object? sender, ChannelRequestEventArgs e)
|
|
|
@@ -540,14 +602,6 @@ namespace Renci.SshNet
|
|
|
private void Channel_DataReceived(object? sender, ChannelDataEventArgs e)
|
|
|
{
|
|
|
OutputStream.Write(e.Data, 0, e.Data.Length);
|
|
|
-
|
|
|
- if (_asyncResult is CommandAsyncResult asyncResult)
|
|
|
- {
|
|
|
- lock (asyncResult)
|
|
|
- {
|
|
|
- asyncResult.BytesReceived += e.Data.Length;
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -613,10 +667,15 @@ namespace Renci.SshNet
|
|
|
OutputStream.Dispose();
|
|
|
ExtendedOutputStream.Dispose();
|
|
|
|
|
|
- if (_asyncResult is not null && _result is null)
|
|
|
+ _tokenRegistration.Dispose();
|
|
|
+ _tokenRegistration = default;
|
|
|
+ _cts?.Dispose();
|
|
|
+ _cts = null;
|
|
|
+
|
|
|
+ if (_tcs is { Task.IsCompleted: false } tcs)
|
|
|
{
|
|
|
// In case an operation is still running, try to complete it with an ObjectDisposedException.
|
|
|
- SetAsyncComplete(new ObjectDisposedException(GetType().FullName));
|
|
|
+ _ = tcs.TrySetException(new ObjectDisposedException(GetType().FullName));
|
|
|
}
|
|
|
|
|
|
_isDisposed = true;
|