Дали задачу, но чего-то никак не могу понять как ее реализовать.
Есть producer
, например, 25 потоков должно быть.
Есть consumer
, например, 5 потоков должно быть.
producer посылает в очередь сообщение, получает оттуда String
возвращается в producer.
consumer должен вычитывать эту очередь.
Размер сообщений, который будет оправлен, например, 500, после чего, видимо, ExecutorService/ThreadPool
должен умереть для producer
. Для consumer
он должен работать до того момента, пока очередь не будет пуста.
Я всё не могу понять, как сделать так, чтобы 1) все потоки стартовали одновременно 2) как запустить новые потоки в consumer
Вот так у меня примерно в main
public class Main{
private ExecutorService prodService= Executors.newFixedThreadPool(25, prodFactory);
private ExecutorService conService= Executors.newFixedThreadPool(5, conFactory);
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("beans.xml");
Starter bean = (Starter) context.getBean("starter");
bean.doit();
}
private void doit() {
for (int i = 0; i < 10; i++) {
prodService.execute(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName());
makeSmth();
}
});
conService.execute(() -> {doSmthTo();});
}
}
}
По факту, там у меня еще много чего страшного и не понятного, но я не знаю куда это всё внедрять)
Реализовал примерно так.
class Stater {
public static boolean STOP = false;
private Producer producer;
private Consumer consumer;
private ExecutorService prodService= Executors.newFixedThreadPool(PRODUCER_NUMBER, taxiFactory);
private ExecutorService conService= Executors.newFixedThreadPool(CONSUMER_NUMBER, clientFactory);
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("beans.xml");
Starter bean = (Starter) context.getBean("starter");
bean.doit();
}
private void doit() {
for (int i = 0; i < PRODUCER_NUMBER; i++) {
prodService.execute(() -> producer.get());
conService.execute(() -> consumer.run());
}
Starter.STOP = true;
producerService.shutdown();
consumerService.shutdown();
}
}
class Common {
private Queue<Message> emergencyQueue;
private BlockingQueue<Message> blockingQueue;
public void insertOrder(Message message) {
if (!blockingQueue.offer(message)) {
emergencyQueue.add(message);
}
}
public Message getOrder() {
if (emergencyQueue.isEmpty()) {
if (!blockingQueue.isEmpty()) {
return blockingQueue.poll();
} else {
return null;
}
} else {
return emergencyQueue.poll();
}
}
public boolean shouldStop() {
return blockingQueue.isEmpty() && emergencyQueue.isEmpty() && Starter.STOP;
}
}
class Consumer implements Runnable{
private Common common;
public void run(){
common.insertOrder(new Message());
}
}
class Producer implements Runnable{
private Common common;
public void run(){
while (!common.shouldStop()) {
Message message=common.getOrder();
if (message == null) {
Thread.sleep(new Random().nextInt(TIME_TO_WAIT));
}
}
}
}
LinkedBloсkingQueue - один из вариантов быстрого решения вопроса
Виртуальный выделенный сервер (VDS) становится отличным выбором
Занимался по книге Брайсона,дошел до момента где нужно сделать счетчик в игре больше-меньше,но каждый раз при попытки набора нового числа...
У меня есть перехваченный пакет в виде ByteBuffer, как из него нужно достать порт в java/kotlin?
Есть приложение, в котором отражается лента новостейЕсли кликнуть на неё, то открывается конкретная новость