Как выполнить цикл еще раз в Parallel.For?

201
22 февраля 2017, 19:40

Допустим я в 100 потоков качаю картинки. Если какая либо итерация вызвало исключение - как его повторить по новой?

Parallel.For(0, newLst.Count, new ParallelOptions { MaxDegreeOfParallelism = 100 }, (i) =>
      {
           DownloadImage(newLst[i]);
      });
Answer 1

Сделять DownloadImageAsync, который дергать не в Parallel.For, а просто ограничив число одновременно выполняемых задач, например с помощью такого кода:

    public static IEnumerable<Task<TTask>> ForEachAsync<TItem, TTask>(
        this IEnumerable<TItem> source, Func<TItem, Task<TTask>> selector, 
        int degreeOfParallelism)
    {
        Contract.Requires(source != null);
        Contract.Requires(selector != null);
        // We need to know all the items in the source before starting tasks
        var tasks = source.ToList();
        int completedTask = -1;
        // Creating an array of TaskCompletionSource that would holds
        // the results for each operations
        var taskCompletions = new TaskCompletionSource<TTask>[tasks.Count];
        for(int n = 0; n < taskCompletions.Length; n++) 
            taskCompletions[n] = new TaskCompletionSource<TTask>();
        // Partitioner would do all grunt work for us and split
        // the source into appropriate number of chunks for parallel processing
        foreach (var partition in 
            Partitioner.Create(tasks).GetPartitions(degreeOfParallelism))
        {
            var p = partition;
            // Loosing sync context and starting asynchronous 
            // computation for each partition
            Task.Run(async () =>
            {
                while (p.MoveNext())
                {
                    var task = selector(p.Current);
                    // Don't want to use empty catch . 
                    // This trick just swallows an exception
                    await task.ContinueWith(_ => { });
                    int finishedTaskIndex = Interlocked.Increment(ref completedTask);
                    taskCompletions[finishedTaskIndex].FromTask(task);
                }
            });
        }
        return taskCompletions.Select(tcs => tcs.Task);
    }

Теперь можно будет сделать так:

var tasks = newLst.ForEachAsync(n => DownloadImageAsyncWithRetry(n, retryCount), 100).ToList()

И логику ретраинга просто впихнуть в DownloadImageAsyncWithRetry:

public Task<Result> DownloadImageAsyncWithRetry(input)
{
  var tsk = DownloadImageAsync(input);
  // Тут все зависит от того, каким именно образом определяется неудача.
  // Если это исключение, то вешаем ContinueWith, если это код возврата,
  // то проверяем его и пробуем повторить запрос.
}

Да, Parallel.For в этом случае идея - не очень, поскольку он предназначен прежде всего для CPU Intensive операций, а здесь явно IO Intensive. В этом случае логично сделать сам метод асинхронным, который будет дергать асинхронный API для загрузки картинок.

З.Ы. Код ForEachAsync взять отсюда.

READ ALSO
TCP/IP скорость обмена

TCP/IP скорость обмена

Делаю своего рода RPCИнтересует скорость обмена по TCP/IP

246
Удаление определённой строки в файле

Удаление определённой строки в файле

Ситуация такая: У меня есть файл, в нем записанны строкиЯ пытаюсь найти строку по её содержанию и её удалить

306
WPF туннелирование событий

WPF туннелирование событий

Отрабатывает только событие OnDown3, а как заставить получить события от других канвасов?

264
Регулярное выражение вида &ldquo;Maxim_Ivanov&rdquo;

Регулярное выражение вида “Maxim_Ivanov”

1-ый раз работаю с регулярными выражениями

262