Имплементация Producer/Consumer pattern

179
12 июля 2018, 16:20

Паттерн producer/consumer достаточно часто встречается в многопоточном программировании. Его смысл состоит в том, что один или несколько потоков производят данные, и параллельно этому один или несколько потоков потребляют их.

Как правильно имплементировать этот паттерн в популярных языках программирования? Задача сама по себе нетривиальна, поскольку включает синхронизацию между потоками, и потенциальную гонку между несколькими производителями и потребителями.

Справка.

Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».

Нетривиальность проблемы заключается в том, что потенциально как создание новых данных, так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.

Примеры:

Произведённые данные могут представлять вычислительно интенсивное задание. В этом случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления).

Или производящие потоки загружают данные из сети, а по окончанию загрузки выполняющие потоки производит разбор загруженных данных. В этом случае разумно иметь по одному производителю на сайт и, и ограничивать число производителей, если предел доступной скорости сети исчерпан.

Этот вопрос — адаптация одноименного исследования с Хэшкода.

Answer 1
Реализация на C#

Для современных версий языка (начиная с C# 4.0), имеет смысл не писать реализацию вручную, а (руководствуясь советом @Flammable), воспользоваться классом BlockingCollection, представляющим нужную функциональность.

Для чтения в consumer-потоках используем просто циклы по последовательности, которую даёт GetConsumingEnumerable(). В producer-потоках пользуемся Add, и в конце не забываем CompleteAdding, чтобы consumer-потоки смогли остановиться.

Пример:

class Program
{
    static public void Main()
    {
        new Program().Run();
    }
    BlockingCollection<string> q = new BlockingCollection<string>();
    void Run()
    {
        var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
        foreach (var t in threads)
            t.Start();
        string s;
        while ((s = Console.ReadLine()).Length != 0)
            q.Add(s);
        q.CompleteAdding(); // останавливаем
        foreach (var t in threads)
            t.Join();
    }
    void Consumer()
    {
        foreach (var s in q.GetConsumingEnumerable())
        {
            Console.WriteLine("Processing: {0}", s);
            Thread.Sleep(2000);
            Console.WriteLine("Processed: {0}", s);
        }
    }
}

BlockingCollection<T> позволяет ограничить количество элементов, так что попытка добавить элемент в переполненную очередь также может быть заблокирована до освобождения места.

Заметьте, что GetConsumingEnumerable корректно работает даже в случае, когда у вас много консьюмеров. Это не так уж и очевидно.

Если вы работаете со старой версией C#, вам придётся писать нужную функциональность вручную. Вы можете воспользоваться встроенным классом Monitor (который является аналогом mutex + condition variable из pthreads).

public class ProducerConsumer<T> where T : class
{
    object mutex = new object();
    Queue<T> queue = new Queue<T>();
    bool isDead = false;
    public void Enqueue(T task)
    {
        if (task == null)
            throw new ArgumentNullException("task");
        lock (mutex)
        {
            if (isDead)
                throw new InvalidOperationException("Queue already stopped");
            queue.Enqueue(task);
            Monitor.Pulse(mutex);
        }
    }
    public T Dequeue()
    {
        lock (mutex)
        {
            while (queue.Count == 0 && !isDead)
                Monitor.Wait(mutex);
            if (queue.Count == 0)
                return null;
            return queue.Dequeue();
        }
    }
    public void Stop()
    {
        lock (mutex)
        {
            isDead = true;
            Monitor.PulseAll(mutex);
        }
    }
}

Использование (аналогичный пример):

class Program
{
    static public void Main()
    {
        new Program().Run();
    }
    ProducerConsumer<string> q = new ProducerConsumer<string>();
    void Run()
    {
        var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
        foreach (var t in threads)
            t.Start();
        string s;
        while ((s = Console.ReadLine()).Length != 0)
            q.Enqueue(s);
        q.Stop();
        foreach (var t in threads)
            t.Join();
    }
    void Consumer()
    {
        while (true)
        {
            string s = q.Dequeue();
            if (s == null)
                break;
            Console.WriteLine("Processing: {0}", s);
            Thread.Sleep(2000);
            Console.WriteLine("Processed: {0}", s);
        }
    }
}
Answer 2
Реализация на C с библиотекой pthreads

В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихся коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так:

#include <pthread.h>
// объявляем структуру данных для одного задания
struct producer_consumer_queue_item {
  struct producer_consumer_queue_item *next;
  // здесь идут собственно данные. вы можете поменять этот кусок,
  // использовав структуру, более специфичную для вашей задачи
  void *data;
};
// объявляем очередь с дополнительными структурами для синхронизации.
// в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
struct producer_consumer_queue {
  struct producer_consumer_queue_item *head, *tail;
                              // head == tail == 0, если очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  int is_alive;               // показывает, не закончила ли очередь свою работу
};

Теперь нам нужны процедуры добавления и извлечения заданий из очереди.

void
enqueue (void *data, struct producer_consumer_queue *q)
{
  // упакуем задание в новую структуру
  struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  p->data = data;
  p->next = 0;
  // получим "эксклюзивный" доступ к очереди заданий
  pthread_mutex_lock(&q->lock);
  // ... и добавим новое задание туда:
  if (q->tail)
    q->tail->next = p;
  else {
    q->head = p;
    // очередь была пуста, а теперь нет -- надо разбудить потребителей
    pthread_cond_broadcast(&q->cond);
  }
  q->tail = p;
  // разрешаем доступ всем снова
  pthread_mutex_unlock(&q->lock);
}
void *
dequeue(struct producer_consumer_queue *q)
{
  // получаем эксклюзивный доступ к очереди:
  pthread_mutex_lock(&q->lock);
  while (!q->head && q->is_alive) {
    // очередь пуста, делать нечего, ждем...
    pthread_cond_wait(&q->cond, &q->lock);
    // wait разрешает доступ другим на время ожидания
  }
  // запоминаем текущий элемент или 0, если очередь умерла
  struct producer_consumer_queue_item *p = q->head;
  if (p)
  {
    // и удаляем его из очереди
    q->head = q->head->next;
    if (!q->head)
      q->tail = q->head;
  }
  // возвращаем эксклюзивный доступ другим участникам
  pthread_mutex_unlock(&q->lock);
  // отдаём данные
  void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  free(p);
  return data;
}

Ещё нужна процедура для инициализации очереди:

struct producer_consumer_queue *
producer_consumer_queue_create()
{
  struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->is_alive = 1;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->cond, 0);
  return q;
}

И процедура для закрытия очереди:

void
producer_consumer_queue_stop(struct producer_consumer_queue *q)
{
  // для обращения к разделяемым переменным необходим эксклюзивный доступ
  pthread_mutex_lock(&q->lock);
  q->is_alive = 0;
  pthread_cond_broadcast(&q->cond);
  pthread_mutex_unlock(&q->lock);
}

Отлично, у нас есть всё, что нам надо.

Как использовать это? Нужно:

  • запустить несколько потоков-«производителей» и несколько «потребителей»
  • придумать структуру данных для задания

Пример: (производитель — главный поток, потребители — 2 потока)

// это поток-потребитель
void *
consumer_thread (void *arg)
{
  struct producer_consumer_queue *q = (typeof(q))arg;
  for (;;) {
    void *data = dequeue(q);
    // это сигнал, что очередь окончена
    if (!data)
      break; // значит, пора закрывать поток
    char *str = (char *)data;
    // тут наша обработка данных
    printf ("consuming: %s\n", str);
    sleep(2);
    printf ("consumed: %s\n", str);
    free(str);
  }
  return 0;
}
int
main ()
{
  pthread_t consumer_threads[2];
  void *res = 0;
  char *in = NULL;
  size_t sz;
  // создадим очередь:
  struct producer_consumer_queue *q = producer_consumer_queue_create();
  // и потоки-«потребители»
  pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  // главный цикл
  // получаем данные с клавиатуры:
  while (getline(&in, &sz, stdin) > 0) {
    enqueue(in, q);
    in = NULL;
  }
  producer_consumer_queue_stop(q);
  if (pthread_join(consumer_threads[0], &res) ||
      pthread_join(consumer_threads[1], &res))
    perror("join");
  return (long)res;
}

Это реализация задачи с "бесконечной" очередью. На практике же иногда (или почти всегда?) более полезно ограничить размер очереди и таким образом сбалансировать скорость производителей, иногда переводя их в спящее состояние, с возможностями потребителей.

Для этого немного изменим нашу producer_consumer_queue

struct producer_consumer_queue {
  struct producer_consumer_queue_item *head, *tail;
                              // head == tail == 0, если очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t condp;       // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  pthread_cond_t condc;       // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
  int is_alive;               // показывает, не закончила ли очередь свою работу
  int max, cnt,               // максимальный размер очереди и число заданий в ней
    pqcnt;                    // количество производителей, ждущих свободного места в очереди
};

Добавляем pthread_cond_t condc для "засыпания/пробуждения" потоков производителей, их счетчик в очереди на отправку сообщения и пару переменных, содержащих максимальный размер очереди и текущее количество заданий в ней.

Соответственно меняются функции для постановки задания в очередь (enqueue), выборки его из очереди (dequeue), инициализации очереди (producer_consumer_queue_create) и ее остановки (producer_consumer_queue_stop):

void
enqueue (void *data, struct producer_consumer_queue *aq)
{
  volatile struct producer_consumer_queue *q = aq;
  // упакуем задание в новую структуру
  struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  p->data = data;
  p->next = 0;
  // получим "эксклюзивный" доступ к очереди заданий
  pthread_mutex_lock(&aq->lock);
  // проверим не переполнена ли она
  if (q->max <= q->cnt) {
    q->pqcnt++;
    asm volatile ("" : : : "memory");
    // зафиксируем изменения очереди в памяти
    // будем ждать пока потребители ее слегка не опустошат 
    while(q->max <= q->cnt & q->is_alive)
      pthread_cond_wait(&aq->condc, &aq->lock);
    q->pqcnt--;
    asm volatile ("" : : : "memory");
  }
  // ... и добавим новое задание туда:
  if (q->tail)
    q->tail->next = p;
  else {
    q->head = p;
    // очередь была пуста, а теперь нет -- надо разбудить потребителей
    pthread_cond_broadcast(&aq->condp);
  }
  q->tail = p;
  q->cnt++;
  asm volatile ("" : : : "memory");
  // разрешаем доступ всем снова
  pthread_mutex_unlock(&aq->lock);
}

void *
dequeue(struct producer_consumer_queue *aq)
{
  volatile struct producer_consumer_queue *q = aq;
  // получаем эксклюзивный доступ к очереди:
  pthread_mutex_lock(&aq->lock);
  if (q->pqcnt && q->max > q->cnt)
    // в очереди есть место, а кто-то спит, разбудим их
    pthread_cond_broadcast(&aq->condc);
  while (!q->head && q->is_alive) {
    // очередь пуста, делать нечего, ждем...
    pthread_cond_wait(&aq->condp, &aq->lock);
    // wait разрешает доступ другим на время ожидания
  }
  // запоминаем текущий элемент или 0, если очередь умерла
  struct producer_consumer_queue_item *p = q->head;
  if (p) {
    // и удаляем его из очереди
    q->head = q->head->next;
    if (!q->head)
      q->tail = q->head;
    q->cnt--;
    asm volatile ("" : : : "memory");
    // зафиксируем изменения очереди в памяти
    // разбудим поставщиков в их очереди
    pthread_cond_broadcast(&aq->condc);
  }
  // возвращаем эксклюзивный доступ другим участникам
  pthread_mutex_unlock(&aq->lock);
  // отдаём данные
  void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  free(p);
  return data;
}
struct producer_consumer_queue * 
producer_consumer_queue_create(int max)
{
  struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->is_alive = 1;
  q->max = max;
  q->cnt = 0;
  q->pqcnt = 0;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->condc, 0);
  pthread_cond_init(&q->condp, 0);
  return q;
}
// И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *aq)
{
  volatile struct producer_consumer_queue *q = aq;
  // для обращения к разделяемым переменным необходим эксклюзивный доступ
  pthread_mutex_lock(&aq->lock);
  q->is_alive = 0;
  asm volatile ("" : : : "memory");
  pthread_cond_broadcast(&aq->condc);
  pthread_cond_broadcast(&aq->condp);
  pthread_mutex_unlock(&aq->lock);
}

Здесь же показан memory barrier (asm volatile ("" : : : "memory");), использование которого запрещает компилятору переупорядочивать операции чтения-записи из RAM.

Данная реализация не обеспечивает "упорядоченность" производителей, ожидающих своей очереди для отправки сообщения. Т.е. поток производитель, "заснувший" первым из-за отсутствия свободного места в очереди не обязательно проснется первым.

Если такое поведение нас не устраивает, то придется внести некоторые изменения в наши данные, прежде всего добавив очередь поставщиков из структур producer_queue_item (которая будет частью структуры producer_consumer_queue.

Получаем следующие структуры данных:

// объявляем структуру данных для одного задания
struct producer_consumer_queue_item {
  struct producer_consumer_queue_item *next;
  // здесь идут собственно данные. вы можете поменять этот кусок,
  // использовав структуру, более специфичную для вашей задачи
  void *data;
};
// струкура данных для спящего (ждущего свободного места) потока-производителя
struct producer_queue_item {
  struct producer_queue_item *next;
  struct producer_consumer_queue_item *item; // данные для которых нет места
  pthread_cond_t cond;  // этот cond "сигналим", когда в очереди появилось место
#if DEBUG
  pid_t tid;    // linux thread id for debug print
  int signaled; // индикатор "побудки" for debug print
#endif
};
// объявляем очередь данных с дополнительными структурами для синхронизации.
// в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
struct producer_consumer_queue {
  struct producer_consumer_queue_item *head, *tail;
                              // head == tail == 0, если очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  int is_alive;               // показывает, не закончила ли очередь свою работу
  int max, cnt;               // максимальный размер очереди и число заданий в ней
  // очередь  потоков-производителей, ждущих свободного места для своих данных
  struct producer_queue_item *pqhead,
    *pqtail;
};

и реализацию основных функций:

void
enqueue (void *data, struct producer_consumer_queue *q)
{
  volatile struct producer_consumer_queue *vq = q;
  // упакуем задание в новую структуру
  struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  p->data = data;
  p->next = 0;
  // получим "эксклюзивный" доступ к очереди заданий
  pthread_mutex_lock(&q->lock);
#if DEBUG
  printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
#endif
  // ... и добавим новое задание туда:
  if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
#if DEBUG
    if (vq->cnt < vq->max) {
      puts("========================");
      print_queue(q, 0);
      puts("========================");
    }
#endif
    struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
    pthread_cond_init(&pq->cond, 0);  // cond по которому его разбудят
    pq->next = 0;
    pq->item = p;  // сохраним данные на время сна
#if DEBUG
    pq->tid = gettid();
#endif
    // поместим себя в очередь спящих производителей
    if (vq->pqtail)
      vq->pqtail->next = pq;
    else
      vq->pqhead = pq;
    vq->pqtail = pq;
    asm volatile ("" : : : "memory");
    // зафиксируем изменения очереди в памяти
#if DEBUG
    int at = 0; // счетчик циклов пробуждения
#endif
    do { // пойдем спать до появления свободного места в очереди данных
#if DEBUG
      printf ("%ld prod cond wait (cnt: %d  at: %d) %s", 
          (long)gettid(), vq->cnt, at++, (char *)(p->data));
      pq->signaled = 0;
#endif
      pthread_cond_wait(&pq->cond, &q->lock);
    } while(vq->max <= vq->cnt && vq->is_alive);
    // проснулись и владеем очередью
    /*
      Вот тонкий момент. Порядок активизации потоков не определен,
      а нам надо соблюдать очередность данных.
      Поэтому переустановим локальные переменные из очереди, 
      хотя это могут быть данные, положенные туда другим потоком.
    */
#if DEBUG
    if (pq != vq->pqhead) {
      printf ("BAAAD %ld (cnt: %d  at: %d) %s",
          (long)gettid(), vq->cnt, at, (char *)(p->data));
      print_queue(q, 0);
      if (vq->is_alive)
    exit(1); // совсем плохо, такого быть не должно
      else
    puts("CONTINUE");
    }
#endif
    pq = vq->pqhead;  // в любом случае берем голову очереди производителей
    if ((vq->pqhead = pq->next) == 0) // и удаляем ее
      vq->pqtail = 0;
    asm volatile ("" : : : "memory");
    p = pq->item;
    free(pq);
#if DEBUG
    printf ("%ld prod enqueued after wait (cnt: %d  at: %d) %s", 
        (long)gettid(), vq->cnt, at, (char *)(p->data));    
#endif
  }
  // вот тут реально кладем data в очередь для потребителей
  if (vq->tail)
    vq->tail->next = p;
  else {
    vq->head = p;
    // очередь была пуста, а теперь нет -- надо разбудить потребителей
    pthread_cond_broadcast(&q->cond);
  }
  vq->tail = p;
  vq->cnt++;
  asm volatile ("" : : : "memory");
  // сбросим изменения очереди в память
  // разрешаем доступ всем снова
  pthread_mutex_unlock(&q->lock);
}
#if DEBUG                   
#define cond_signal_producer(q) ({          \
      if ((q)->pqhead) {                \
      (q)->pqhead->signaled = 1;            \
      pthread_cond_signal(&(q)->pqhead->cond);  \
      }                         \
    })
#else
#define cond_signal_producer(q) ({          \
      if ((q)->pqhead)                  \
      pthread_cond_signal(&(q)->pqhead->cond);  \
    })
#endif
void *
dequeue(struct producer_consumer_queue *q)
{
  volatile struct producer_consumer_queue *vq = q;
  // получаем эксклюзивный доступ к очереди:
  pthread_mutex_lock(&q->lock);
  // если есть спящие производители, то разбудим первого
  cond_signal_producer(vq);  
  while (!vq->head && vq->is_alive) {
    // очередь пуста, делать нечего, ждем...
    pthread_cond_wait(&q->cond, &q->lock);
    // wait разрешает доступ другим на время ожидания
  }
  // запоминаем текущий элемент или 0, если очередь умерла
  struct producer_consumer_queue_item *p = vq->head;
  if (p) {
    // и удаляем его из очереди
    vq->head = vq->head->next;
    if (!vq->head)
      vq->tail = vq->head;
    vq->cnt--;
    asm volatile ("" : : : "memory");
    // сбросим изменения очереди в память
    // разбудим первого поставщика в их очереди
    cond_signal_producer(vq);
  }
  // возвращаем эксклюзивный доступ другим участникам
  pthread_mutex_unlock(&q->lock);
  // отдаём данные
  void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  // согласно 7.20.3.2/2, можно не проверять на 0
  free(p);
  return data;
}
struct producer_consumer_queue * 
producer_consumer_queue_create(int max)
{
  struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->pqhead = q->pqtail = 0;
  q->is_alive = 1;
  q->max = max;
  q->cnt = 0;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->cond, 0);
  return q;
}
// И процедура для закрытия очереди:
void
producer_consumer_queue_stop(struct producer_consumer_queue *q)
{
  volatile struct producer_consumer_queue *vq = q;
  // для обращения к разделяемым переменным необходим эксклюзивный доступ
  pthread_mutex_lock(&q->lock);
  vq->is_alive = 0;
  pthread_cond_broadcast(&q->cond); // разбудим потребителей
  volatile struct producer_queue_item *pq;
  for (pq = vq->pqhead; pq; pq = pq->next) {
#if DEBUG
    pq->signaled = 1;
    asm volatile ("" : : : "memory");
#endif
    // будим каждого ждущего производителя
    pthread_cond_signal((pthread_cond_t *)&pq->cond);
  }
  pthread_mutex_unlock(&q->lock);
}

Все три программы (pq1.c, pq2.c и pq3.c) вместе с функцией gettid() находятся в http://pastebin.com/E23r9DZk . Для экспериментов скопируйте их в разные файлы и компилируйте, например, gcc pq3.c -pthread gettid.o

Answer 3
Реализация на C#, библиотека Dataflow

Ещё одной альтернативой является использование Майкрософтовской библиотеки Dataflow, которая собственно и создана для того, чтобы управлять потоками данных. Для использования кода в примерах, вам нужно подключить NuGet-пакет Microsoft.Tpl.Dataflow. Класс BufferBlock<T> практически является готовым producer/consumer'ом, кроме того, он имеет async-интерфейс.

Для асинхронного добавления задания поставщик может воспользоваться SendAsync. Асинхронное добавление нужно, так как очередь может быть ограниченной длины, а значит, добавление должно будет дожидаться наличия свободного места! По окончанию добавления нужно вызвать Complete.

async Task ProduceSingle(ITargetBlock<string> queue, int howmuch)
{
    Random r = new Random();
    while (howmuch-- > 0)
    {
        // эмулируем длительную работу по подготовке следующего задания
        // длительность выбираем случайно, чтобы задания приходили в
        // непредсказуемые моменты времени
        await Task.Delay(1000 * r.Next(1, 3));
        var v = string.Format("automatic {0}", r.Next(1, 10));
        await queue.SendAsync(v);
    }
    queue.Complete();
}

Если у вас несколько поставщиков, закрывать очередь нужно лишь когда они все отработают:

async Task Produce1(ITargetBlock<string> queue, int howmuch)
{
    Random r = new Random();
    while (howmuch-- > 0)
    {
        await Task.Delay(1000 * r.Next(1, 3));
        var v = string.Format("automatic {0}", r.Next(1, 10));
        await queue.SendAsync(v);
    }
}
// функция Console.ReadLine() -- блокирующая, поэтому выполняем её асинхронно
// (иначе она заблокирует вызывающий поток)
// у Console нет async-интерфейса.
Task<string> ReadConsole()
{
    // блокирующую функцию выгружаем в thread pool
    return Task.Run(() => Console.ReadLine());
}
async Task Produce2(ITargetBlock<string> queue)
{
    string s;
    while ((s = await ReadConsole()).Length != 0)
        await queue.SendAsync("manual " + s);
}
async Task ProduceAll(ITargetBlock<string> queue)
{
    var p1 = Produce1(queue, 20);
    var p2 = Produce2(queue);
    await Task.WhenAll(p1, p2);
    queue.Complete();
}

Теперь, потребитель. Если потребитель лишь один, всё просто:

async Task ConsumeSingle(ISourceBlock<string> queue)
{
    while (await queue.OutputAvailableAsync())
        Console.WriteLine(await queue.ReceiveAsync());
}

Для случая нескольких потребителей использовать ReceiveAsync — неверно, так как задание может быть взято из очереди другим потребителем! Функции TryReceiveAsync тоже нету, поэтому после асинхронного выяснения того, что очередь не пуста, используем TryReceive:

async Task ConsumeCooperative(IReceivableSourceBlock<string> queue, int number)
{
    Random r = new Random();
    while (await queue.OutputAvailableAsync())
    {
        string v;
        // в этой точке данные могут быть уже уйти другому потребителю
        if (!queue.TryReceive(out v))
            continue; // продолжаем ждать
        // цветной вывод и прочие плюшки
        // мне лень синхронизировать вывод на консоль, хотя конечно это разделяемый ресурс
        if (Console.CursorLeft != 0)
            Console.WriteLine();
        var savedColor = Console.ForegroundColor;
        Console.ForegroundColor = (ConsoleColor)(number + 8);
        Console.WriteLine(string.Format("{0}[{1}]: {2}",
                          new string(' ', number * 4), number, v));
        Console.ForegroundColor = savedColor;
        // симулируем длительную обработку результата клиентом
        await Task.Delay(1000 * r.Next(1, 3));
    }
}
Task ConsumeAll(IReceivableSourceBlock<string> queue)
{
    var c1 = ConsumeCooperative(queue, 1);
    var c2 = ConsumeCooperative(queue, 2);
    return Task.WhenAll(c1, c2);
}

Осталась обвязка:

class Program
{
    static void Main(string[] args)
    {
        new Program().RunAll().Wait();
    }
    async Task RunAll()
    {
        BufferBlock<string> queue = new BufferBlock<string>();
        var p = ProduceAll(queue);
        var c = ConsumeAll(queue);
        await Task.WhenAll(p, c, queue.Completion);
    }
    // остальные методы
}

В своей статье Async Producer/Consumer Queue using Dataflow Stephen Cleary предлагает другой подход, более в духе библиотеки Dataflow. В ней заложена симметрия между блоками-источниками (ISourceBlock<T>), блоками-приёмниками (ITargetBlock<T>), и блоками-преобразователями (IPropagatorBlock<TInput, TOutput>). В соответствии с этой идеологией, мы применяем для поставщика блок-приёмник ActionBlock<T>:

Task Consume2(ISourceBlock<string> queue, int number)
{
    var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1 };
    var consumer = new ActionBlock<string>(v => ConsumeImpl2(v, number), consumerOptions);
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    queue.LinkTo(consumer, linkOptions);
    return consumer.Completion;
}
void ConsumeImpl2(string v, int number)
{
    Console.WriteLine(string.Format("[{0}]: {1}", number, v));
    Thread.Sleep(1500);
}
Task ConsumeAll2(ISourceBlock<string> queue)
{
    var c1 = Consume2(queue, 1);
    var c2 = Consume2(queue, 2);
    return Task.WhenAll(c1, c2);
}

Для чего нужно BoundedCapacity = 1? Дел в том, что по умолчанию ActionBlock<T> имеет «неограниченную» ёмкость, и таким образом за раз потребит все данные из очереди. Таким образом, если мы ввели ограничение на объём очереди

queue = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 20 });

то данные будут всё равно накапливаться в ActionBlock'е. Чтобы роль хранилища исполнялась BufferBlock'ом, а потребителя — ActionBlock, и нужно ограничить его объём. Заметьте также, что ограничение ёмкости ActionBlock'а позволяет библиотеке Dataflow балансировать нагрузку, отправляя данные свободному блоку.

Заметьте, что в этом случае то, в каком контексте (пул потоков? выделенный поток?) исполняется ActionBlock, управляется не через стандартный механизм async/await, а посредством TaskScheduler'а в настройках.

Подход со стыковкой блоков выполнения может быстро стать слишком тяжеловесным, поэтому я советовал бы использовать его лишь для достаточно сложных задач.

Answer 4

Реализация на C#, библиотека Dataflow ("инверсный" алгоритм)

Отличие данного алгоритма от приведенного VladD "прямого" - в том, что используется очередь потребителей вместо очереди элементов.

Это позволяет избавиться от цикла приема в потребителе - ценой появления знания списка потребителей поставщиком данных. Иными словами, в такой конфигурации потребители выходят не активными - а пассивными.

Кроме того, сам алгоритм получается очень простым.

Потребитель в таком алгоритме не имеет никакого алгоритма, это лишь класс (или интерфейс либо вовсе делегат), в котором можно вызвать метод:

class Consumer
{
  public void Consume(string str)
  {
    Console.WriteLine(str);
  }
}

На стороне поставщика алгоритм немного сложнее:

BufferBlock<Consumer> consumers = new BufferBlock<Consumer>();
public async void SendToConsumer(string str)
{
  var consumer = await consumers.ReceiveAsync();
  try
  {
    consumer.Consume(str);
  }
  finally
  {
    consumers.Post(consumer);
  }
}
async Task Produce(int howmuch)
{
    Random r = new Random();
    while (howmuch-- > 0)
    {
        // эмулируем длительную работу по подготовке следующего задания
        // длительность выбираем случайно, чтобы задания приходили в
        // непредсказуемые моменты времени
        await Task.Delay(1000 * r.Next(1, 3));
        var v = string.Format("automatic {0}", r.Next(1, 10));
        SendToConsumer(v);
    }
}

К сожалению, это еще не все. Дело в том, что в таком виде любое исключение, возникшее в методе Consume, обрушит программу.

Поэтому надо сделать одну из двух вещей:

  • добавить в метод контракт класса Consumer требование обязательной обработки исключений;

  • или же установить глобальный обработчик неперехваченных исключений.

А еще лучше - воспользоваться сразу обоими вариантами.

И еще один важный момент. Пассивность потребителя означает, в частности, что потребитель будет запущен в контексте синхронизации поставщика. Иногда это может быть нежелательным - в таком случае в код SendToConsumer следует добавить вызов Task.Run.

Иногда же, напротив, производителю нужно дождаться полной обработки порции данных. В таком случае надо сменить возвращаемое значение у метода SendToConsumer с void на Task.

READ ALSO
ASP.NET MVC 5 Identity.Несовпадение хэш-кодов объектов

ASP.NET MVC 5 Identity.Несовпадение хэш-кодов объектов

Для работы с Identity в проекте существуют: контроллер(AccountController) и сервис (IdentityService)Оба класса имеют поля: UserManager (Type: UserManager<AuthUser>) и AuthenticationManager(Type:...

166
Не удаётся ввести с клавиатуры в строковый элемент массива

Не удаётся ввести с клавиатуры в строковый элемент массива

Хочу ввести с клавиатуры значения в строку, которая является элементом массива:string words[1]=ConsoleReadLine(); Выдает ошибку, неудивительно

134
В чем суть ковариантности и контравариантности делегатов?

В чем суть ковариантности и контравариантности делегатов?

Изучаю по книге работу с делегатами и есть там пример, объясняющий, что такое ковариантность и контравариантностьРешил подробнее поискать...

258