Опять у меня идиотский вопрос (простите, но впадаю в старческий маразм, "бабушка ничего не помнит").
Дано: поток данных поступает в очередь (com-порт). Другой поток разгребает эту очередь, складывает разобраные данные в базу. Всё в рамках одного приложения.
Вопрос: что юзать в качестве очереди на C (возможно, C++)
Видимо что-то в таком духе (увидел знакомую тему и не удержался).
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
struct qitem {
struct qitem *next;
int len;
char data[1]; // реально здесь будет len+1 байт данных
};
struct queue {
struct qitem *head, *tail; // head == tail == 0 очередь пуста
pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
pthread_cond_t cond; // этот cond "сигналим" когда очередь стала НЕ ПУСТОЙ
pthread_t th; // tid обработчика
};
void
inqueue (const char *str, int len, struct queue *q)
{
struct qitem *p = (typeof(p))malloc(sizeof(*p) + len);
strcpy(p->data, str);
p->len = len;
p->next = 0;
pthread_mutex_lock(&q->lock);
if (q->tail)
q->tail->next = p;
else {
q->head = p;
pthread_cond_broadcast(&q->cond); // теперь очередь не пуста, сигнализируем
}
q->tail = p;
pthread_mutex_unlock(&q->lock);
}
void
processit (struct qitem *p)
{
int t = atoi(p->data);
if (t > 0) {
printf ("Sleep %d\n", t);
sleep(t);
}
}
// обработчик очереди в отдельном потоке
void *
consumer (void *arg)
{
struct queue *q = (typeof(q))arg;
for (;;) {
pthread_mutex_lock(&q->lock);
if (!q->head) { // очередь пуста, делать нечего, ждем...
pthread_cond_wait(&q->cond, &q->lock);
//pthread_mutex_unlock(&q->lock); // это для нескольких обработчиков очереди
//continue; // в нашем случае не нужно
// для нескольких "concurrent" обработчиков нужна последовательность
// lock/check/wait/unlock/continue/lock/check...
}
struct qitem *p = q->head;
q->head = q->head->next;
q->nitems--;
if (!q->head)
q->tail = q->head;
pthread_mutex_unlock(&q->lock);
printf ("consume: %s", p->data);
if (strcmp(p->data, "STOP") == 0)
break;
processit(p);
free(p);
}
return 0;
}
// сделаем пустую очередь и запустим ее обработчик в новом потоке
struct queue *
run_consumer()
{
struct queue *q = (typeof(q))malloc(sizeof(*q));
q->head = q->tail = 0;
q->nitems = 0;
pthread_mutex_init(&q->lock, 0);
pthread_cond_init(&q->cond, 0);
pthread_create(&q->th, 0, consumer, (void *)q);
return q;
}
int
main ()
{
void *res = 0;
char *in = NULL;
size_t sz;
int l;
struct queue *q = run_consumer();
while ((l = getline(&in, &sz, stdin)) > 0) {
inqueue(in, l, q);
in = NULL;
}
inqueue("STOP", 5, q);
if (pthread_join(q->th, &res))
perror("join");
return (long)res;
}
Для тестирования просто читаем строки с клавиатуры, и ставим их в очередь. Обрааботчик их печатает. Если в начале строки число, то обработчик делает sleep, позволяя написать в очередь несколько строк.
Я бы использовал просто queue<char>. Поскольку у вас доступ к очереди из нескольких потоков, вам нужно синхронизировать обращения к очереди.
Для синхронизации подошёл бы std::mutex, если он доступен на вашем компиляторе. В любом случае, какая-то синхронизация вам всё равно нужна, т.к. в её отсутствие у разных потоков может быть разное представление о содержимом памяти (например потому, что в многопроцессорной системе каждый процессор сбрасывает кэш независимо).
Используйте boost::lockfree::queue
Продвижение своими сайтами как стратегия роста и независимости