Я реализую рабочий движок с верхним пределом параллелизма. Я использую семафор, чтобы ждать, пока параллелизм не упадет ниже максимума, а затем используйте Task.Factory.StartNew
чтобы обернуть обработчик async в try
/catch
, и, finally
освободит семафор.
Я понимаю, что это создает потоки в пуле потоков, но мой вопрос заключается в том, что когда один из тех задач, которые выполняются на самом деле (на реальном вызове ввода-вывода или в режиме ожидания), является потоком, возвращаемым в пул, так как я надеюсь было бы?
Если есть лучший способ реализовать планировщик задач с ограниченным параллелизмом, когда обработчик работы является методом асинхронного (возвращает Task
), мне тоже хотелось бы его услышать. Или, скажем, в идеале, если есть способ поставить очередь на асинхронный метод (опять же, это асинхронный метод Task
-returning), который чувствует себя менее изворотливым, чем перенос его в синхронный делегат и передача его в Task.Factory.StartNew
, это казалось бы совершенным..?
(Это также заставляет меня думать, что здесь есть два вида параллелизма: сколько задач обрабатывается в целом, но также и сколько продолжений выполняется на разных потоках одновременно. Можете быть классными, чтобы иметь настраиваемые параметры для обоих, хотя это и не фиксированное требование..)
Изменение: фрагмент:
concurrencySemaphore.Wait(cancelToken);
deferRelease = false;
try
{
var result = GetWorkItem();
if (result == null)
{ // no work, wait for new work or exit signal
signal = WaitHandle.WaitAny(signals);
continue;
}
deferRelease = true;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
}
finally
{
concurrencySemaphore.Release();
}
}, cancelToken));
}
finally
{
if (!deferRelease)
{
concurrencySemaphore.Release();
}
}
Вот пример TaskWorker, который не будет создавать бесчисленные рабочие потоки.
Магия выполняется, ожидая SemaphoreSlim.WaitAsync()
которая является задачей ввода-вывода (и нет нити).
class TaskWorker
{
private readonly SemaphoreSlim _semaphore;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cancellationToken);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
await _semaphore.WaitAsync(cancellationToken);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
и простое консольное приложение для тестирования
class Program
{
static void Main(string[] args)
{
var worker = new TaskWorker(1);
var cts = new CancellationTokenSource();
var token = cts.Token;
var tasks = Enumerable.Range(1, 10)
.Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
.ToArray();
Task.WhenAll(tasks).GetAwaiter().GetResult();
}
static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
{
Console.WriteLine($"Some Started {id}");
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Some Finished {id}");
}
}
Обновить
TaskWorker
реализует IDisposable
class TaskWorker : IDisposable
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly SemaphoreSlim _semaphore;
private readonly int _maxDegreeOfParallelism;
public TaskWorker(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
}
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
}
public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
// No ConfigureAwait(false) here to keep the SyncContext if any
// for the real task
await _semaphore.WaitAsync(cts.Token);
try
{
await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfDisposed();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
{
await _semaphore.WaitAsync(cts.Token);
try
{
return await taskFactory().ConfigureAwait(false);
}
finally
{
_semaphore.Release(1);
}
}
}
private void ThrowIfDisposed()
{
if (disposedValue)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
}
#region IDisposable Support
private bool disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_cts.Cancel();
// consume all semaphore slots
for (int i = 0; i < _maxDegreeOfParallelism; i++)
{
_semaphore.WaitAsync().GetAwaiter().GetResult();
}
_semaphore.Dispose();
_cts.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}
Вы можете подумать, что поток возвращается в ThreadPool
даже если он неактуально возвращается. Поток просто выбирает следующий объект в очереди при запуске асинхронной операции.
Я бы предложил вам посмотреть на Task.Run
вместо Task.Factory.StartNew
Task.Run vs Task.Factory.StartNew.
А также взгляните на TPL DataFlow. Я думаю, это будет соответствовать вашим требованиям.