Опрос новых файлов, избегайте дублирования при обработке файлов

Вопрос:

Я хочу опросить новые файлы в удаленном каталоге (pdf из нескольких MB) из службы Windows.

Каждый файл должен обрабатываться интенсивным заданием ЦП (распознавание образов в файлах PDF). После завершения процесса файл должен быть перемещен в другом месте или удален.

Я хотел бы как можно быстрее запустить свою работу, используя преимущества многопроцессорных возможностей для параллельной работы.

Тем не менее, я столкнулся с проблемой: в то время как легко перечислять файлы в каталоге, как избежать дублирования записей в моей очереди заданий? Фактически, каждый раз, когда я перечисляю свои файлы, возможно, что некоторые файлы еще поставлены в очередь или еще обрабатываются.

Мой первый подход состоял в том, чтобы посмотреть в System.Collections.Concurrent.* Но ни один класс не предоставляет метод contains to test перед добавлением.

Я также посмотрел на HashSet<string>, но я боюсь некоторых проблем с одновременным доступом.

Мой текущий скелет:

    private async void GetNewFiles(CancellationToken cancellationToken)
{
if (!cancellationToken.IsCancellationRequested)
{
var newfiles = Directory.GetFileSystemEntries(inputDirectory, "*.pdf", SearchOption.AllDirectories);
logger.Trace($"{newfiles.Length} new files detected in {inputDirectory}");

foreach (var file in newfiles)
{
Task.Factory.StartNew(()=>ProcessFile(file), cancellationToken);
}

await Task.Delay(frequency, cancellationToken);
if (!cancellationToken.IsCancellationRequested)
{
GetNewFiles(cancellationToken);
}
}
}

Однако этот код не позволяет дважды ставить файл в очередь.

Если я Task.Delay вызов Task.Delay и дождитесь обработки всех файлов, он будет работать, но он может привести к выполнению только одной запущенной задачи, даже если новые файлы будут добавлены (каждая итерация обработки новых файлов должна быть полностью обработана до проверка новых файлов).

Лучший ответ:

Самый простой способ, требующий наименьшего количества изменений в вашем текущем коде, — использовать ConcurrentDictionary я думаю:

private readonly ConcurrentDictionary<string, byte> _filesInProgress = new ConcurrentDictionary<string, byte>();
private async Task GetNewFiles(CancellationToken cancellationToken) {            
    if (!cancellationToken.IsCancellationRequested) {
        var newfiles = Directory.GetFileSystemEntries(inputDirectory, "*.pdf", SearchOption.AllDirectories);
        foreach (var file in newfiles) {
            // TryAdd returns true if key was not already in dictionary
            if (_filesInProgress.TryAdd(file, 0) && File.Exists(file)) {
                Task.Factory.StartNew(() => {
                    ProcessFile(file);                         
                    _filesInProgress.TryRemove(file, out _);
                }, cancellationToken);
            }
        }
        await Task.Delay(frequency, cancellationToken);
        if (!cancellationToken.IsCancellationRequested) {
            GetNewFiles(cancellationToken);
        }
    }
}

Помните, что в идеале вы хотите обрабатывать элементы с ограниченным количеством потоков (равными количеству ваших ядер\виртуальных ядер). На данный момент, если вы находите 100 файлов в каталоге, вы можете создать 100 потоков, которые являются самыми тяжелыми для процессора, и поэтому будут бороться друг с другом за ресурсы без уважительной причины.

Оцените статью
TechArks.Ru
Добавить комментарий