Имплементация Producer/Consumer pattern

244
26 ноября 2017, 11:46

Паттерн producer/consumer достаточно часто встречается в многопоточном программировании. Его смысл состоит в том, что один или несколько потоков производят данные, и параллельно этому один или несколько потоков потребляют их.

Как правильно имплементировать этот паттерн в популярных языках программирования? Задача сама по себе нетривиальна, поскольку включает синхронизацию между потоками, и потенциальную гонку между несколькими производителями и потребителями.

Справка.

Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».

Нетривиальность проблемы заключается в том, что потенциально как создание новых данных, так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.

Примеры:

Произведённые данные могут представлять вычислительно интенсивное задание. В этом случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления).

Или производящие потоки загружают данные из сети, а по окончанию загрузки выполняющие потоки производит разбор загруженных данных. В этом случае разумно иметь по одному производителю на сайт и, и ограничивать число производителей, если предел доступной скорости сети исчерпан.

Этот вопрос — адаптация одноименного исследования с Хэшкода.

Answer 1
Реализация на C с библиотекой pthreads

В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихся коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так:

#include <pthread.h>
// объявляем структуру данных для одного задания
struct producer_consumer_queue_item {
  struct producer_consumer_queue_item *next;
  // здесь идут собственно данные. вы можете поменять этот кусок,
  // использовав структуру, более специфичную для вашей задачи
  void *data;
};
// объявляем очередь с дополнительными структурами для синхронизации.
// в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
struct producer_consumer_queue {
  struct producer_consumer_queue_item *head, *tail;
                              // head == tail == 0, если очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  int is_alive;               // показывает, не закончила ли очередь свою работу
};

Теперь нам нужны процедуры добавления и извлечения заданий из очереди.

void
enqueue (void *data, struct producer_consumer_queue *q)
{
  // упакуем задание в новую структуру
  struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  p->data = data;
  p->next = 0;
  // получим "эксклюзивный" доступ к очереди заданий
  pthread_mutex_lock(&q->lock);
  // ... и добавим новое задание туда:
  if (q->tail)
    q->tail->next = p;
  else {
    q->head = p;
    // очередь была пуста, а теперь нет -- надо разбудить потребителей
    pthread_cond_broadcast(&q->cond);
  }
  q->tail = p;
  // разрешаем доступ всем снова
  pthread_mutex_unlock(&q->lock);
}
void *
dequeue(struct producer_consumer_queue *q)
{
  // получаем эксклюзивный доступ к очереди:
  pthread_mutex_lock(&q->lock);
  while (!q->head && q->is_alive) {
    // очередь пуста, делать нечего, ждем...
    pthread_cond_wait(&q->cond, &q->lock);
    // wait разрешает доступ другим на время ожидания
  }
  // запоминаем текущий элемент или 0, если очередь умерла
  struct producer_consumer_queue_item *p = q->head;
  if (p)
  {
    // и удаляем его из очереди
    q->head = q->head->next;
    if (!q->head)
      q->tail = q->head;
  }
  // возвращаем эксклюзивный доступ другим участникам
  pthread_mutex_unlock(&q->lock);
  // отдаём данные
  void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  free(p);
  return data;
}

Ещё нужна процедура для инициализации очереди:

struct producer_consumer_queue *
producer_consumer_queue_create()
{
  struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->is_alive = 1;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->cond, 0);
  return q;
}

И процедура для закрытия очереди:

void
producer_consumer_queue_stop(struct producer_consumer_queue *q)
{
  // для обращения к разделяемым переменным необходим эксклюзивный доступ
  pthread_mutex_lock(&q->lock);
  q->is_alive = 0;
  pthread_cond_broadcast(&q->cond);
  pthread_mutex_unlock(&q->lock);
}

Отлично, у нас есть всё, что нам надо.

Как использовать это? Нужно:

  • запустить несколько потоков-«производителей» и несколько «потребителей»
  • придумать структуру данных для задания

Пример: (производитель — главный поток, потребители — 2 потока)

// это поток-потребитель
void *
consumer_thread (void *arg)
{
  struct producer_consumer_queue *q = (typeof(q))arg;
  for (;;) {
    void *data = dequeue(q);
    // это сигнал, что очередь окончена
    if (!data)
      break; // значит, пора закрывать поток
    char *str = (char *)data;
    // тут наша обработка данных
    printf ("consuming: %s\n", str);
    sleep(2);
    printf ("consumed: %s\n", str);
    free(str);
  }
  return 0;
}
int
main ()
{
  pthread_t consumer_threads[2];
  void *res = 0;
  char *in = NULL;
  size_t sz;
  // создадим очередь:
  struct producer_consumer_queue *q = producer_consumer_queue_create();
  // и потоки-«потребители»
  pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  // главный цикл
  // получаем данные с клавиатуры:
  while (getline(&in, &sz, stdin) > 0) {
    enqueue(in, q);
    in = NULL;
  }
  producer_consumer_queue_stop(q);
  if (pthread_join(consumer_threads[0], &res) ||
      pthread_join(consumer_threads[1], &res))
    perror("join");
  return (long)res;
}

Это реализация задачи с "бесконечной" очередью. На практике же иногда (или почти всегда?) более полезно ограничить размер очереди и таким образом сбалансировать скорость производителей, иногда переводя их в спящее состояние, с возможностями потребителей.

Для этого немного изменим нашу producer_consumer_queue

struct producer_consumer_queue {
  struct producer_consumer_queue_item *head, *tail;
                              // head == tail == 0, если очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t condp;       // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  pthread_cond_t condc;       // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
  int is_alive;               // показывает, не закончила ли очередь свою работу
  int max, cnt,               // максимальный размер очереди и число заданий в ней
    pqcnt;                    // количество производителей, ждущих свободного места в очереди
};

Добавляем pthread_cond_t condc для "засыпания/пробуждения" потоков производителей, их счетчик в очереди на отправку сообщения и пару переменных, содержащих максимальный размер очереди и текущее количество заданий в ней.

Соответственно меняются функции для постановки задания в очередь (enqueue), выборки его из очереди (dequeue), инициализации очереди (producer_consumer_queue_create) и ее остановки (producer_consumer_queue_stop):

void
enqueue (void *data, struct producer_consumer_queue *aq)
{
  volatile struct producer_consumer_queue *q = aq;
  // упакуем задание в новую структуру
  struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  p->data = data;
  p->next = 0;
  // получим "эксклюзивный" доступ к очереди заданий
  pthread_mutex_lock(&aq->lock);
  // проверим не переполнена ли она
  if (q->max <= q->cnt) {
    q->pqcnt++;
    asm volatile ("" : : : "memory");
    // зафиксируем изменения очереди в памяти
    // будем ждать пока потребители ее слегка не опустошат 
    while(q->max <= q->cnt & q->is_alive)
      pthread_cond_wait(&aq->condc, &aq->lock);
    q->pqcnt--;
    asm volatile ("" : : : "memory");
  }
  // ... и добавим новое задание туда:
  if (q->tail)
    q->tail->next = p;
  else {
    q->head = p;
    // очередь была пуста, а теперь нет -- надо разбудить потребителей
    pthread_cond_broadcast(&aq->condp);
  }
  q->tail = p;
  q->cnt++;
  asm volatile ("" : : : "memory");
  // разрешаем доступ всем снова
  pthread_mutex_unlock(&aq->lock);
}

void *
dequeue(struct producer_consumer_queue *aq)
{
  volatile struct producer_consumer_queue *q = aq;
  // получаем эксклюзивный доступ к очереди:
  pthread_mutex_lock(&aq->lock);
  if (q->pqcnt && q->max > q->cnt)
    // в очереди есть место, а кто-то спит, разбудим их
    pthread_cond_broadcast(&aq->condc);
  while (!q->head && q->is_alive) {
    // очередь пуста, делать нечего, ждем...
    pthread_cond_wait(&aq->condp, &aq->lock);
    // wait разрешает доступ другим на время ожидания
  }
  // запоминаем текущий элемент или 0, если очередь умерла
  struct producer_consumer_queue_item *p = q->head;
  if (p) {
    // и удаляем его из очереди
    q->head = q->head->next;
    if (!q->head)
      q->tail = q->head;
    q->cnt--;
    asm volatile ("" : : : "memory");
    // зафиксируем изменения очереди в памяти
    // разбудим поставщиков в их очереди
    pthread_cond_broadcast(&aq->condc);
  }
  // возвращаем эксклюзивный доступ другим участникам
  pthread_mutex_unlock(&aq->lock);
  // отдаём данные
  void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  free(p);
  return data;
}
struct producer_consumer_queue * 
producer_consumer_queue_create(int max)
{
  struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->is_alive = 1;
  q->max = max;
  q->cnt = 0;
  q->pqcnt = 0;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->condc, 0);
  pthread_cond_init(&q->condp, 0);
  return q;
}
// И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *aq)
{
  volatile struct producer_consumer_queue *q = aq;
  // для обращения к разделяемым переменным необходим эксклюзивный доступ
  pthread_mutex_lock(&aq->lock);
  q->is_alive = 0;
  asm volatile ("" : : : "memory");
  pthread_cond_broadcast(&aq->condc);
  pthread_cond_broadcast(&aq->condp);
  pthread_mutex_unlock(&aq->lock);
}

Здесь же показан memory barrier (asm volatile ("" : : : "memory");), использование которого запрещает компилятору переупорядочивать операции чтения-записи из RAM.

Данная реализация не обеспечивает "упорядоченность" производителей, ожидающих своей очереди для отправки сообщения. Т.е. поток производитель, "заснувший" первым из-за отсутствия свободного места в очереди не обязательно проснется первым.

Если такое поведение нас не устраивает, то придется внести некоторые изменения в наши данные, прежде всего добавив очередь поставщиков из структур producer_queue_item (которая будет частью структуры producer_consumer_queue.

Получаем следующие структуры данных:

// объявляем структуру данных для одного задания
struct producer_consumer_queue_item {
  struct producer_consumer_queue_item *next;
  // здесь идут собственно данные. вы можете поменять этот кусок,
  // использовав структуру, более специфичную для вашей задачи
  void *data;
};
// струкура данных для спящего (ждущего свободного места) потока-производителя
struct producer_queue_item {
  struct producer_queue_item *next;
  struct producer_consumer_queue_item *item; // данные для которых нет места
  pthread_cond_t cond;  // этот cond "сигналим", когда в очереди появилось место
#if DEBUG
  pid_t tid;    // linux thread id for debug print
  int signaled; // индикатор "побудки" for debug print
#endif
};
// объявляем очередь данных с дополнительными структурами для синхронизации.
// в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
struct producer_consumer_queue {
  struct producer_consumer_queue_item *head, *tail;
                              // head == tail == 0, если очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  int is_alive;               // показывает, не закончила ли очередь свою работу
  int max, cnt;               // максимальный размер очереди и число заданий в ней
  // очередь  потоков-производителей, ждущих свободного места для своих данных
  struct producer_queue_item *pqhead,
    *pqtail;
};

и реализацию основных функций:

void
enqueue (void *data, struct producer_consumer_queue *q)
{
  volatile struct producer_consumer_queue *vq = q;
  // упакуем задание в новую структуру
  struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  p->data = data;
  p->next = 0;
  // получим "эксклюзивный" доступ к очереди заданий
  pthread_mutex_lock(&q->lock);
#if DEBUG
  printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
#endif
  // ... и добавим новое задание туда:
  if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
#if DEBUG
    if (vq->cnt < vq->max) {
      puts("========================");
      print_queue(q, 0);
      puts("========================");
    }
#endif
    struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
    pthread_cond_init(&pq->cond, 0);  // cond по которому его разбудят
    pq->next = 0;
    pq->item = p;  // сохраним данные на время сна
#if DEBUG
    pq->tid = gettid();
#endif
    // поместим себя в очередь спящих производителей
    if (vq->pqtail)
      vq->pqtail->next = pq;
    else
      vq->pqhead = pq;
    vq->pqtail = pq;
    asm volatile ("" : : : "memory");
    // зафиксируем изменения очереди в памяти
#if DEBUG
    int at = 0; // счетчик циклов пробуждения
#endif
    do { // пойдем спать до появления свободного места в очереди данных
#if DEBUG
      printf ("%ld prod cond wait (cnt: %d  at: %d) %s", 
          (long)gettid(), vq->cnt, at++, (char *)(p->data));
      pq->signaled = 0;
#endif
      pthread_cond_wait(&pq->cond, &q->lock);
    } while(vq->max <= vq->cnt && vq->is_alive);
    // проснулись и владеем очередью
    /*
      Вот тонкий момент. Порядок активизации потоков не определен,
      а нам надо соблюдать очередность данных.
      Поэтому переустановим локальные переменные из очереди, 
      хотя это могут быть данные, положенные туда другим потоком.
    */
#if DEBUG
    if (pq != vq->pqhead) {
      printf ("BAAAD %ld (cnt: %d  at: %d) %s",
          (long)gettid(), vq->cnt, at, (char *)(p->data));
      print_queue(q, 0);
      if (vq->is_alive)
    exit(1); // совсем плохо, такого быть не должно
      else
    puts("CONTINUE");
    }
#endif
    pq = vq->pqhead;  // в любом случае берем голову очереди производителей
    if ((vq->pqhead = pq->next) == 0) // и удаляем ее
      vq->pqtail = 0;
    asm volatile ("" : : : "memory");
    p = pq->item;
    free(pq);
#if DEBUG
    printf ("%ld prod enqueued after wait (cnt: %d  at: %d) %s", 
        (long)gettid(), vq->cnt, at, (char *)(p->data));    
#endif
  }
  // вот тут реально кладем data в очередь для потребителей
  if (vq->tail)
    vq->tail->next = p;
  else {
    vq->head = p;
    // очередь была пуста, а теперь нет -- надо разбудить потребителей
    pthread_cond_broadcast(&q->cond);
  }
  vq->tail = p;
  vq->cnt++;
  asm volatile ("" : : : "memory");
  // сбросим изменения очереди в память
  // разрешаем доступ всем снова
  pthread_mutex_unlock(&q->lock);
}
#if DEBUG                   
#define cond_signal_producer(q) ({          \
      if ((q)->pqhead) {                \
      (q)->pqhead->signaled = 1;            \
      pthread_cond_signal(&(q)->pqhead->cond);  \
      }                         \
    })
#else
#define cond_signal_producer(q) ({          \
      if ((q)->pqhead)                  \
      pthread_cond_signal(&(q)->pqhead->cond);  \
    })
#endif
void *
dequeue(struct producer_consumer_queue *q)
{
  volatile struct producer_consumer_queue *vq = q;
  // получаем эксклюзивный доступ к очереди:
  pthread_mutex_lock(&q->lock);
  // если есть спящие производители, то разбудим первого
  cond_signal_producer(vq);  
  while (!vq->head && vq->is_alive) {
    // очередь пуста, делать нечего, ждем...
    pthread_cond_wait(&q->cond, &q->lock);
    // wait разрешает доступ другим на время ожидания
  }
  // запоминаем текущий элемент или 0, если очередь умерла
  struct producer_consumer_queue_item *p = vq->head;
  if (p) {
    // и удаляем его из очереди
    vq->head = vq->head->next;
    if (!vq->head)
      vq->tail = vq->head;
    vq->cnt--;
    asm volatile ("" : : : "memory");
    // сбросим изменения очереди в память
    // разбудим первого поставщика в их очереди
    cond_signal_producer(vq);
  }
  // возвращаем эксклюзивный доступ другим участникам
  pthread_mutex_unlock(&q->lock);
  // отдаём данные
  void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  // согласно 7.20.3.2/2, можно не проверять на 0
  free(p);
  return data;
}
struct producer_consumer_queue * 
producer_consumer_queue_create(int max)
{
  struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->pqhead = q->pqtail = 0;
  q->is_alive = 1;
  q->max = max;
  q->cnt = 0;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->cond, 0);
  return q;
}
// И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *q)
{
  volatile struct producer_consumer_queue *vq = q;
  // для обращения к разделяемым переменным необходим эксклюзивный доступ
  pthread_mutex_lock(&q->lock);
  vq->is_alive = 0;
  pthread_cond_broadcast(&q->cond); // разбудим потребителей
  volatile struct producer_queue_item *pq;
  for (pq = vq->pqhead; pq; pq = pq->next) {
#if DEBUG
    pq->signaled = 1;
    asm volatile ("" : : : "memory");
#endif
    // будим каждого ждущего производителя
    pthread_cond_signal((pthread_cond_t *)&pq->cond);
  }
  pthread_mutex_unlock(&q->lock);
}

Все три программы (pq1.c, pq2.c и pq3.c) вместе с функцией gettid() находятся в http://pastebin.com/E23r9DZk . Для экспериментов скопируйте их в разные файлы и компилируйте, например, gcc pq3.c -pthread gettid.o

Answer 2
Реализация на C#

Для современных версий языка (начиная с C# 4.0), имеет смысл не писать реализацию вручную, а (руководствуясь советом @Flammable), воспользоваться классом BlockingCollection, представляющим нужную функциональность.

Для чтения в consumer-потоках используем просто циклы по последовательности, которую даёт GetConsumingEnumerable(). В producer-потоках пользуемся Add, и в конце не забываем CompleteAdding, чтобы consumer-потоки смогли остановиться.

Пример:

class Program
{
    static public void Main()
    {
        new Program().Run();
    }
    BlockingCollection<string> q = new BlockingCollection<string>();
    void Run()
    {
        var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
        foreach (var t in threads)
            t.Start();
        string s;
        while ((s = Console.ReadLine()).Length != 0)
            q.Add(s);
        q.CompleteAdding(); // останавливаем
        foreach (var t in threads)
            t.Join();
    }
    void Consumer()
    {
        foreach (var s in q.GetConsumingEnumerable())
        {
            Console.WriteLine("Processing: {0}", s);
            Thread.Sleep(2000);
            Console.WriteLine("Processed: {0}", s);
        }
    }
}

BlockingCollection<T> позволяет ограничить количество элементов, так что попытка добавить элемент в переполненную очередь также может быть заблокирована до освобождения места.

Заметьте, что GetConsumingEnumerable корректно работает даже в случае, когда у вас много консьюмеров. Это не так уж и очевидно.

Если вы работаете со старой версией C#, вам придётся писать нужную функциональность вручную. Вы можете воспользоваться встроенным классом Monitor (который является аналогом mutex + condition variable из pthreads).

public class ProducerConsumer<T> where T : class
{
    object mutex = new object();
    Queue<T> queue = new Queue<T>();
    bool isDead = false;
    public void Enqueue(T task)
    {
        if (task == null)
            throw new ArgumentNullException("task");
        lock (mutex)
        {
            if (isDead)
                throw new InvalidOperationException("Queue already stopped");
            queue.Enqueue(task);
            Monitor.Pulse(mutex);
        }
    }
    public T Dequeue()
    {
        lock (mutex)
        {
            while (queue.Count == 0 && !isDead)
                Monitor.Wait(mutex);
            if (queue.Count == 0)
                return null;
            return queue.Dequeue();
        }
    }
    public void Stop()
    {
        lock (mutex)
        {
            isDead = true;
            Monitor.PulseAll(mutex);
        }
    }
}

Использование (аналогичный пример):

class Program
{
    static public void Main()
    {
        new Program().Run();
    }
    ProducerConsumer<string> q = new ProducerConsumer<string>();
    void Run()
    {
        var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
        foreach (var t in threads)
            t.Start();
        string s;
        while ((s = Console.ReadLine()).Length != 0)
            q.Enqueue(s);
        q.Stop();
        foreach (var t in threads)
            t.Join();
    }
    void Consumer()
    {
        while (true)
        {
            string s = q.Dequeue();
            if (s == null)
                break;
            Console.WriteLine("Processing: {0}", s);
            Thread.Sleep(2000);
            Console.WriteLine("Processed: {0}", s);
        }
    }
}
Answer 3
Реализация на C#, библиотека Dataflow

Ещё одной альтернативой является использование Майкрософтовской библиотеки Dataflow, которая собственно и создана для того, чтобы управлять потоками данных. Для использования кода в примерах, вам нужно подключить NuGet-пакет Microsoft.Tpl.Dataflow. Класс BufferBlock<T> практически является готовым producer/consumer'ом, кроме того, он имеет async-интерфейс.

Для асинхронного добавления задания поставщик может воспользоваться SendAsync. Асинхронное добавление нужно, так как очередь может быть ограниченной длины, а значит, добавление должно будет дожидаться наличия свободного места! По окончанию добавления нужно вызвать Complete.

async Task ProduceSingle(ITargetBlock<string> queue, int howmuch)
{
    Random r = new Random();
    while (howmuch-- > 0)
    {
        // эмулируем длительную работу по подготовке следующего задания
        // длительность выбираем случайно, чтобы задания приходили в
        // непредсказуемые моменты времени
        await Task.Delay(1000 * r.Next(1, 3));
        var v = string.Format("automatic {0}", r.Next(1, 10));
        await queue.SendAsync(v);
    }
    queue.Complete();
}

Если у вас несколько поставщиков, закрывать очередь нужно лишь когда они все отработают:

async Task Produce1(ITargetBlock<string> queue, int howmuch)
{
    Random r = new Random();
    while (howmuch-- > 0)
    {
        await Task.Delay(1000 * r.Next(1, 3));
        var v = string.Format("automatic {0}", r.Next(1, 10));
        await queue.SendAsync(v);
    }
}
// функция Console.ReadLine() -- блокирующая, поэтому выполняем её асинхронно
// (иначе она заблокирует вызывающий поток)
// у Console нет async-интерфейса.
Task<string> ReadConsole()
{
    // блокирующую функцию выгружаем в thread pool
    return Task.Run(() => Console.ReadLine());
}
async Task Produce2(ITargetBlock<string> queue)
{
    string s;
    while ((s = await ReadConsole()).Length != 0)
        await queue.SendAsync("manual " + s);
}
async Task ProduceAll(ITargetBlock<string> queue)
{
    var p1 = Produce1(queue, 20);
    var p2 = Produce2(queue);
    await Task.WhenAll(p1, p2);
    queue.Complete();
}

Теперь, потребитель. Если потребитель лишь один, всё просто:

async Task ConsumeSingle(ISourceBlock<string> queue)
{
    while (await queue.OutputAvailableAsync())
        Console.WriteLine(await queue.ReceiveAsync());
}

Для случая нескольких потребителей использовать ReceiveAsync — неверно, так как задание может быть взято из очереди другим потребителем! Функции TryReceiveAsync тоже нету, поэтому после асинхронного выяснения того, что очередь не пуста, используем TryReceive:

async Task ConsumeCooperative(IReceivableSourceBlock<string> queue, int number)
{
    Random r = new Random();
    while (await queue.OutputAvailableAsync())
    {
        string v;
        // в этой точке данные могут быть уже уйти другому потребителю
        if (!queue.TryReceive(out v))
            continue; // продолжаем ждать
        // цветной вывод и прочие плюшки
        // мне лень синхронизировать вывод на консоль, хотя конечно это разделяемый ресурс
        if (Console.CursorLeft != 0)
            Console.WriteLine();
        var savedColor = Console.ForegroundColor;
        Console.ForegroundColor = (ConsoleColor)(number + 8);
        Console.WriteLine(string.Format("{0}[{1}]: {2}",
                          new string(' ', number * 4), number, v));
        Console.ForegroundColor = savedColor;
        // симулируем длительную обработку результата клиентом
        await Task.Delay(1000 * r.Next(1, 3));
    }
}
Task ConsumeAll(IReceivableSourceBlock<string> queue)
{
    var c1 = ConsumeCooperative(queue, 1);
    var c2 = ConsumeCooperative(queue, 2);
    return Task.WhenAll(c1, c2);
}

Осталась обвязка:

class Program
{
    static void Main(string[] args)
    {
        new Program().RunAll().Wait();
    }
    async Task RunAll()
    {
        BufferBlock<string> queue = new BufferBlock<string>();
        var p = ProduceAll(queue);
        var c = ConsumeAll(queue);
        await Task.WhenAll(p, c, queue.Completion);
    }
    // остальные методы
}

В своей статье Async Producer/Consumer Queue using Dataflow Stephen Cleary предлагает другой подход, более в духе библиотеки Dataflow. В ней заложена симметрия между блоками-источниками (ISourceBlock<T>), блоками-приёмниками (ITargetBlock<T>), и блоками-преобразователями (IPropagatorBlock<TInput, TOutput>). В соответствии с этой идеологией, мы применяем для поставщика блок-приёмник ActionBlock<T>:

Task Consume2(ISourceBlock<string> queue, int number)
{
    var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
    var consumer = new ActionBlock<string>(v => ConsumeImpl2(v, number), consumerOptions);
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    queue.LinkTo(consumer, linkOptions);
    return consumer.Completion;
}
void ConsumeImpl2(string v, int number)
{
    Console.WriteLine(string.Format("[{0}]: {1}", number, v));
    Thread.Sleep(1500);
}
Task ConsumeAll2(ISourceBlock<string> queue)
{
    var c1 = Consume2(queue, 1);
    var c2 = Consume2(queue, 2);
    return Task.WhenAll(c1, c2);
}

Для чего нужно BoundedCapacity = 1? Дел в том, что по умолчанию ActionBlock<T> имеет «неограниченную» ёмкость, и таким образом за раз потребит все данные из очереди. Таким образом, если мы ввели ограничение на объём очереди

queue = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 20 });

то данные будут всё равно накапливаться в ActionBlock'е. Чтобы роль хранилища исполнялась BufferBlock'ом, а потребителя — ActionBlock, и нужно ограничить его объём. Заметьте также, что ограничение ёмкости ActionBlock'а позволяет библиотеке Dataflow балансировать нагрузку, отправляя данные свободному блоку.

Заметьте, что в этом случае то, в каком контексте (пул потоков? выделенный поток?) исполняется ActionBlock, управляется не через стандартный механизм async/await, а посредством TaskScheduler'а в настройках.

Подход со стыковкой блоков выполнения может быстро стать слишком тяжеловесным, поэтому я советовал бы использовать его лишь для достаточно сложных задач.

Answer 4

Реализация на C#, библиотека Dataflow ("инверсный" алгоритм)

Отличие данного алгоритма от приведенного VladD "прямого" - в том, что используется очередь потребителей вместо очереди элементов.

Это позволяет избавиться от цикла приема в потребителе - ценой появления знания списка потребителей поставщиком данных. Иными словами, в такой конфигурации потребители выходят не активными - а пассивными.

Кроме того, сам алгоритм получается очень простым.

Потребитель в таком алгоритме не имеет никакого алгоритма, это лишь класс (или интерфейс либо вовсе делегат), в котором можно вызвать метод:

class Consumer
{
  public void Consume(string str)
  {
    Console.WriteLine(str);
  }
}

На стороне поставщика алгоритм немного сложнее:

BufferBlock<Consumer> consumers = new BufferBlock<Consumer>();
public async void SendToConsumer(string str)
{
  var consumer = await consumers.ReceiveAsync();
  try
  {
    consumer.Consume(str);
  }
  finally
  {
    consumers.Post(consumer);
  }
}
async Task Produce(int howmuch)
{
    Random r = new Random();
    while (howmuch-- > 0)
    {
        // эмулируем длительную работу по подготовке следующего задания
        // длительность выбираем случайно, чтобы задания приходили в
        // непредсказуемые моменты времени
        await Task.Delay(1000 * r.Next(1, 3));
        var v = string.Format("automatic {0}", r.Next(1, 10));
        SendToConsumer(v);
    }
}

К сожалению, это еще не все. Дело в том, что в таком виде любое исключение, возникшее в методе Consume, обрушит программу.

Поэтому надо сделать одну из двух вещей:

  • добавить в метод контракт класса Consumer требование обязательной обработки исключений;

  • или же установить глобальный обработчик неперехваченных исключений.

А еще лучше - воспользоваться сразу обоими вариантами.

И еще один важный момент. Пассивность потребителя означает, в частности, что потребитель будет запущен в контексте синхронизации поставщика. Иногда это может быть нежелательным - в таком случае в код SendToConsumer следует добавить вызов Task.Run.

Иногда же, напротив, производителю нужно дождаться полной обработки порции данных. В таком случае надо сменить возвращаемое значение у метода SendToConsumer с void на Task.

READ ALSO
Заморозка одной оси вращения

Заморозка одной оси вращения

Интересует вопрос, как заморозить одну ось вращения, чтобы третья не меняласьНапример, если использовать две оси вращения (X и Y ), то объест...

203
C# алгоритм перебора паролей

C# алгоритм перебора паролей

Всем привет! По учебе нужно написать код (обязательно рекурсивный!), который бы по заданному слову перебирал все возможные пароли, полученные...

247
Unity управление мышью под WSA

Unity управление мышью под WSA

При сборке игры под Windows 10, метод

217
Парсинг цвета с помощью Html Аgility Pack

Парсинг цвета с помощью Html Аgility Pack

Требуется добыть значения цвета с палитры сайта coloradobe

169