Многопоточный Produce/Consumer

126
16 мая 2021, 10:30

Дали задачу, но чего-то никак не могу понять как ее реализовать.

Есть producer, например, 25 потоков должно быть. Есть consumer, например, 5 потоков должно быть.

producer посылает в очередь сообщение, получает оттуда String возвращается в producer.

consumer должен вычитывать эту очередь.

Размер сообщений, который будет оправлен, например, 500, после чего, видимо, ExecutorService/ThreadPool должен умереть для producer. Для consumer он должен работать до того момента, пока очередь не будет пуста.

Я всё не могу понять, как сделать так, чтобы 1) все потоки стартовали одновременно 2) как запустить новые потоки в consumer

Вот так у меня примерно в main

public class Main{
private ExecutorService prodService= Executors.newFixedThreadPool(25, prodFactory);
private ExecutorService conService= Executors.newFixedThreadPool(5, conFactory);
 public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("beans.xml");
        Starter bean = (Starter) context.getBean("starter");
        bean.doit();
 }
 private void doit() {
        for (int i = 0; i < 10; i++) {
            prodService.execute(() -> {
                 for (int i = 0; i < 10; i++) {
                    System.out.println(Thread.currentThread().getName());
                    makeSmth();
                 }
            });
        conService.execute(() -> {doSmthTo();});
        }
 }
}

По факту, там у меня еще много чего страшного и не понятного, но я не знаю куда это всё внедрять)

Answer 1

Реализовал примерно так.

class Stater {
public static boolean STOP = false;
private Producer producer;
private Consumer consumer;
private ExecutorService prodService= Executors.newFixedThreadPool(PRODUCER_NUMBER, taxiFactory);
private ExecutorService conService= Executors.newFixedThreadPool(CONSUMER_NUMBER, clientFactory);

 public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("beans.xml");
        Starter bean = (Starter) context.getBean("starter");
        bean.doit();
 }
 private void doit() {
    for (int i = 0; i < PRODUCER_NUMBER; i++) {
        prodService.execute(() -> producer.get());
        conService.execute(() -> consumer.run());
    }
    Starter.STOP = true;
    producerService.shutdown();
    consumerService.shutdown();
 }
}

class Common {
private Queue<Message> emergencyQueue;
private BlockingQueue<Message> blockingQueue;
   public void insertOrder(Message message) {
        if (!blockingQueue.offer(message)) {
            emergencyQueue.add(message);
        }
    }
   public Message getOrder() {
        if (emergencyQueue.isEmpty()) {
            if (!blockingQueue.isEmpty()) {
                return blockingQueue.poll();
            } else {
                return null;
            }
        } else {
            return emergencyQueue.poll();
        }
    }
   public boolean shouldStop() {
        return blockingQueue.isEmpty() && emergencyQueue.isEmpty() && Starter.STOP;
    }   
}

class Consumer implements Runnable{
private Common common;
   public void run(){
       common.insertOrder(new Message());
    }
}
class Producer implements Runnable{
private Common common;
   public void run(){
          while (!common.shouldStop()) {
              Message message=common.getOrder();
              if (message == null) {
               Thread.sleep(new Random().nextInt(TIME_TO_WAIT));
            }
          }
    }
}
Answer 2

LinkedBloсkingQueue - один из вариантов быстрого решения вопроса

READ ALSO
Объясните где совершил ошибку

Объясните где совершил ошибку

Занимался по книге Брайсона,дошел до момента где нужно сделать счетчик в игре больше-меньше,но каждый раз при попытки набора нового числа...

96
Получить порт из ByteBuffer

Получить порт из ByteBuffer

У меня есть перехваченный пакет в виде ByteBuffer, как из него нужно достать порт в java/kotlin?

118
Разные типы данных в активности

Разные типы данных в активности

Есть приложение, в котором отражается лента новостейЕсли кликнуть на неё, то открывается конкретная новость

112