Thread-safe очередь, и все-все-все

272
19 декабря 2016, 19:57

Опять у меня идиотский вопрос (простите, но впадаю в старческий маразм, "бабушка ничего не помнит").

Дано: поток данных поступает в очередь (com-порт). Другой поток разгребает эту очередь, складывает разобраные данные в базу. Всё в рамках одного приложения.

Вопрос: что юзать в качестве очереди на C (возможно, C++)

Answer 1

Видимо что-то в таком духе (увидел знакомую тему и не удержался).

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
struct qitem {
  struct qitem *next;
  int len;
  char data[1];  // реально здесь будет len+1 байт данных
};
struct queue {
  struct qitem *head, *tail;  // head == tail == 0  очередь пуста
  pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  pthread_cond_t cond;        // этот cond "сигналим" когда очередь стала НЕ ПУСТОЙ
  pthread_t th;               // tid обработчика
};
void
inqueue (const char *str, int len, struct queue *q)
{
  struct qitem *p = (typeof(p))malloc(sizeof(*p) + len);
  strcpy(p->data, str);
  p->len = len;
  p->next = 0;
  pthread_mutex_lock(&q->lock);
  if (q->tail)
    q->tail->next = p;
  else {
    q->head = p;
    pthread_cond_broadcast(&q->cond);  // теперь очередь не пуста, сигнализируем
  }
  q->tail = p;
  pthread_mutex_unlock(&q->lock);
}
void
processit (struct qitem *p)
{
  int t = atoi(p->data);
  if (t > 0) {
    printf ("Sleep %d\n", t);
    sleep(t);
  }
}
// обработчик очереди в отдельном потоке
void *
consumer (void *arg)
{
  struct queue *q = (typeof(q))arg;
  for (;;) {
    pthread_mutex_lock(&q->lock);
    if (!q->head) { // очередь пуста, делать нечего, ждем...
      pthread_cond_wait(&q->cond, &q->lock);
      //pthread_mutex_unlock(&q->lock);  // это для нескольких обработчиков очереди
      //continue;                        // в нашем случае не нужно
      // для нескольких "concurrent" обработчиков нужна последовательность 
      // lock/check/wait/unlock/continue/lock/check...
    }
    struct qitem *p = q->head;
    q->head = q->head->next;
    q->nitems--;
    if (!q->head)
      q->tail = q->head;
    pthread_mutex_unlock(&q->lock);
    printf ("consume: %s", p->data);
    if (strcmp(p->data, "STOP") == 0)
      break;
    processit(p);
    free(p);
  }
  return 0;
}
// сделаем пустую очередь и запустим ее обработчик в новом потоке
struct queue *
run_consumer()
{
  struct queue *q = (typeof(q))malloc(sizeof(*q));
  q->head = q->tail = 0;
  q->nitems = 0;
  pthread_mutex_init(&q->lock, 0);
  pthread_cond_init(&q->cond, 0);
  pthread_create(&q->th, 0, consumer, (void *)q);
  return q;
}
int
main ()
{
  void *res = 0;
  char *in = NULL;
  size_t sz;
  int l;
  struct queue *q = run_consumer();
  while ((l = getline(&in, &sz, stdin)) > 0) {
    inqueue(in, l, q);
    in = NULL;
  }
  inqueue("STOP", 5, q);
  if (pthread_join(q->th,  &res))
    perror("join");
  return (long)res;
}

Для тестирования просто читаем строки с клавиатуры, и ставим их в очередь. Обрааботчик их печатает. Если в начале строки число, то обработчик делает sleep, позволяя написать в очередь несколько строк.

Answer 2

Я бы использовал просто queue<char>. Поскольку у вас доступ к очереди из нескольких потоков, вам нужно синхронизировать обращения к очереди.

Для синхронизации подошёл бы std::mutex, если он доступен на вашем компиляторе. В любом случае, какая-то синхронизация вам всё равно нужна, т.к. в её отсутствие у разных потоков может быть разное представление о содержимом памяти (например потому, что в многопроцессорной системе каждый процессор сбрасывает кэш независимо).

Answer 3

Используйте boost::lockfree::queue

READ ALSO
Как сгенерировать синусоиду на СИ

Как сгенерировать синусоиду на СИ

Привет всем участникам форума! В прошлый раз я задавал вопрос по поводу генерации пилы с использованием СИ на AVRКроме @vanyamelikov мне никто не отвечал,...

486
Односвязный список в список

Односвязный список в список

Всем приветОбъясните пожалуйста, как добавить один односвязный список в другой

387
Изъятия текста из диалогового окна (winApi)

Изъятия текста из диалогового окна (winApi)

Почему, когда я считываю текст из BoxEditor диалогового окна, или же вызываю диалог открытия/сохранения файла, то текст у меня сохраняется в элементы...

355