Планировщик задач: когда вы ожидаете в Task.Factory.StartNew, является ли поток, возвращаемый в пул?

Вопрос:

Я реализую рабочий движок с верхним пределом параллелизма. Я использую семафор, чтобы ждать, пока параллелизм не упадет ниже максимума, а затем используйте Task.Factory.StartNew чтобы обернуть обработчик async в try/catch, и, finally освободит семафор.

Я понимаю, что это создает потоки в пуле потоков, но мой вопрос заключается в том, что когда один из тех задач, которые выполняются на самом деле (на реальном вызове ввода-вывода или в режиме ожидания), является потоком, возвращаемым в пул, так как я надеюсь было бы?

Если есть лучший способ реализовать планировщик задач с ограниченным параллелизмом, когда обработчик работы является методом асинхронного (возвращает Task), мне тоже хотелось бы его услышать. Или, скажем, в идеале, если есть способ поставить очередь на асинхронный метод (опять же, это асинхронный метод Task -returning), который чувствует себя менее изворотливым, чем перенос его в синхронный делегат и передача его в Task.Factory.StartNew, это казалось бы совершенным..?

(Это также заставляет меня думать, что здесь есть два вида параллелизма: сколько задач обрабатывается в целом, но также и сколько продолжений выполняется на разных потоках одновременно. Можете быть классными, чтобы иметь настраиваемые параметры для обоих, хотя это и не фиксированное требование..)

Изменение: фрагмент:

                    concurrencySemaphore.Wait(cancelToken);
deferRelease = false;
try
{
var result = GetWorkItem();
if (result == null)
{ // no work, wait for new work or exit signal
signal = WaitHandle.WaitAny(signals);
continue;
}

deferRelease = true;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
DoWorkHereAsync(result); // guess I'd think to .GetAwaiter().GetResult() here.. not run this yet
}
finally
{
concurrencySemaphore.Release();
}
}, cancelToken));
}
finally
{
if (!deferRelease)
{
concurrencySemaphore.Release();
}
}

Лучший ответ:

Вот пример TaskWorker, который не будет создавать бесчисленные рабочие потоки.

Магия выполняется, ожидая SemaphoreSlim.WaitAsync() которая является задачей ввода-вывода (и нет нити).

class TaskWorker
{
    private readonly SemaphoreSlim _semaphore;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        // No ConfigureAwait(false) here to keep the SyncContext if any
        // for the real task
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        await _semaphore.WaitAsync(cancellationToken);
        try
        {
            return await taskFactory().ConfigureAwait(false);
        }
        finally
        {
            _semaphore.Release(1);
        }
    }
}

и простое консольное приложение для тестирования

class Program
{
    static void Main(string[] args)
    {
        var worker = new TaskWorker(1);
        var cts = new CancellationTokenSource();
        var token = cts.Token;

        var tasks = Enumerable.Range(1, 10)
            .Select(e => worker.RunAsync(() => SomeWorkAsync(e, token), token))
            .ToArray();

        Task.WhenAll(tasks).GetAwaiter().GetResult();
    }

    static async Task SomeWorkAsync(int id, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Some Started {id}");
        await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
        Console.WriteLine($"Some Finished {id}");
    }
}

Обновить

TaskWorker реализует IDisposable

class TaskWorker : IDisposable
{
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();
    private readonly SemaphoreSlim _semaphore;
    private readonly int _maxDegreeOfParallelism;

    public TaskWorker(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
        }

        _maxDegreeOfParallelism = maxDegreeOfParallelism;
        _semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);
    }

    public async Task RunAsync(Func<Task> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            // No ConfigureAwait(false) here to keep the SyncContext if any
            // for the real task
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    public async Task<T> RunAsync<T>(Func<Task<T>> taskFactory, CancellationToken cancellationToken = default(CancellationToken))
    {
        ThrowIfDisposed();

        using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cts.Token))
        {
            await _semaphore.WaitAsync(cts.Token);
            try
            {
                return await taskFactory().ConfigureAwait(false);
            }
            finally
            {
                _semaphore.Release(1);
            }
        }
    }

    private void ThrowIfDisposed()
    {
        if (disposedValue)
        {
            throw new ObjectDisposedException(this.GetType().FullName);
        }
    }

    #region IDisposable Support
    private bool disposedValue = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                _cts.Cancel();
                // consume all semaphore slots
                for (int i = 0; i < _maxDegreeOfParallelism; i++)
                {
                    _semaphore.WaitAsync().GetAwaiter().GetResult();
                }
                _semaphore.Dispose();
                _cts.Dispose();
            }
            disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
    }
    #endregion
}

Ответ №1

Вы можете подумать, что поток возвращается в ThreadPool даже если он неактуально возвращается. Поток просто выбирает следующий объект в очереди при запуске асинхронной операции.

Я бы предложил вам посмотреть на Task.Run вместо Task.Factory.StartNew Task.Run vs Task.Factory.StartNew.

А также взгляните на TPL DataFlow. Я думаю, это будет соответствовать вашим требованиям.

Оцените статью
TechArks.Ru
Добавить комментарий