Как стороннему потоку сказать главному, что он завершился?

100
29 марта 2022, 20:00

Вопрос вызван тем, что у меня есть CyclicBarrier, и мне из главного потока нужно узнать, когда CyclicBarrier открылся. Для CyclicBarrier я могу указать Runnable, который выполнится, когда барьер откроется:

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {/*здесь*/});

В главном потоке я запускаю все потоки(3), связанные с CyclicBarrier, и жду, допустим, 12 секунд.
Теперь возможны две ситуации:
1) Барьер уже открыт
2) Барьер еще закрыт
Так, теперь встает интересная проблема:
Мы не можем сделать сразу cyclicBarrier.await(), так как если 1), то мы застрянем навечно. Если мы сделаем проверку(getNumberWaiting() == 0), и только потом сделаю cyclicBarrier.await(), то есть вероятность, что барьер откроется в аккурат между этими операциями.
Есть ли атомарный способ решить проблему?

Answer 1

Через synchronized и wait/notify (+ java.util.concurrent.CyclicBarrier и java.util.concurrent.atomic.AtomicBoolean):

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class SynchronizedHelloWorld {
    public static void main(String[] args) throws Exception {
        AtomicBoolean isBarrierBroken = new AtomicBoolean();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            synchronized (isBarrierBroken) {
                isBarrierBroken.set(true);
                isBarrierBroken.notify();
            }
        });
        new DoSomethingAndWait("1", cyclicBarrier).start();
        new DoSomethingAndWait("2", cyclicBarrier).start();
        new DoSomethingAndWait("3", cyclicBarrier).start();
        // закомментировать строчку, если захочешь,
        // чтобы поменялся порядок выполнения wait() и notify()
        TimeUnit.SECONDS.sleep(5);
        synchronized (isBarrierBroken) {
            if (!isBarrierBroken.get()) {
                System.out.println("main thread waits");
                isBarrierBroken.wait();
            }
        }
        System.out.println("main thread ends");
    }
    private static class DoSomethingAndWait extends Thread {
        private String name;
        private CyclicBarrier cyclicBarrier;
        DoSomethingAndWait(String name, CyclicBarrier cyclicBarrier) {
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                System.out.printf("%s works%n", name);
                TimeUnit.SECONDS.sleep(1); // do work
                System.out.printf("%s waits%n", name);
                cyclicBarrier.await();
                System.out.printf("%s ends%n", name);
            } catch (Exception ignore) {
            }
        }
    }
}

Через java.util.concurrent.Lock и java.util.concurrent.Condition (+ java.util.concurrent.CyclicBarrier и java.util.concurrent.atomic.AtomicBoolean):

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.*;
public class ConcurrentLockHelloWorld {
    public static void main(String[] args) throws Exception {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        AtomicBoolean isBarrierBroken = new AtomicBoolean();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            try {
                lock.lock();
                isBarrierBroken.set(true);
                condition.signal();
            } catch (Exception ignore) {
            } finally {
                lock.unlock();
            }
        });
        new DoSomethingAndWait("1", cyclicBarrier).start();
        new DoSomethingAndWait("2", cyclicBarrier).start();
        new DoSomethingAndWait("3", cyclicBarrier).start();
        // закомментировать строчку, если захочешь,
        // чтобы поменялся порядок выполнения await() и signal()
        TimeUnit.SECONDS.sleep(5);
        try {
            lock.lock();
            if (!isBarrierBroken.get()) {
                System.out.println("main thread awaits");
                condition.await();
            }
        } finally {
            lock.unlock();
        }
        System.out.println("main thread ends");
    }
    private static class DoSomethingAndWait extends Thread {
        private String name;
        private CyclicBarrier cyclicBarrier;
        DoSomethingAndWait(String name, CyclicBarrier cyclicBarrier) {
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                System.out.printf("%s works%n", name);
                TimeUnit.SECONDS.sleep(1); // do work
                System.out.printf("%s waits%n", name);
                cyclicBarrier.await();
                System.out.printf("%s ends%n", name);
            } catch (Exception ignore) {
            }
        }
    }
}

Через java.util.concurrent.Exchanger и java.util.concurrent.CyclicBarrier:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class ConcurrentExchangerHelloWorld {
    public static void main(String[] args) throws Exception {
        Exchanger<String> exchanger = new Exchanger<>();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            try {
                System.out.println("barrier broken callback starts");
                exchanger.exchange(null);
                System.out.println("barrier broken callback ends");
            } catch (Exception ignore) {
            }
        });
        new DoSomethingAndWait("1", cyclicBarrier).start();
        new DoSomethingAndWait("2", cyclicBarrier).start();
        new DoSomethingAndWait("3", cyclicBarrier).start();
        // закомментировать строчку, если захочешь,
        // чтобы поменялся порядок вызова exchange()
        TimeUnit.SECONDS.sleep(5);
        System.out.println("main thread waits, maybe...");
        exchanger.exchange(null);
        System.out.println("main thread ends");
    }
    private static class DoSomethingAndWait extends Thread {
        private String name;
        private CyclicBarrier cyclicBarrier;
        DoSomethingAndWait(String name, CyclicBarrier cyclicBarrier) {
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                System.out.printf("%s works%n", name);
                TimeUnit.SECONDS.sleep(1); // do work
                System.out.printf("%s waits%n", name);
                cyclicBarrier.await();
                System.out.printf("%s ends%n", name);
            } catch (Exception ignore) {
            }
        }
    }
}

Через java.util.concurrent.CountDownLatch:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ConcurrentCountDownLatchHelloWorld {
    public static void main(String[] args) throws Exception {
        execute(() -> {
            try {
                System.out.println("callback");
            } catch (Exception ignore) {
            }
        });
    }
    private static void execute(Runnable callback) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new DoSomethingAndWait("1", countDownLatch).start();
        new DoSomethingAndWait("2", countDownLatch).start();
        new DoSomethingAndWait("3", countDownLatch).start();
        // закомментировать строчку, если захочешь,
        // чтобы поменялся порядок выполнения await()
        TimeUnit.SECONDS.sleep(5);
        System.out.println("main thread awaits");
        countDownLatch.await();
        callback.run();
        System.out.println("main thread ends");
    }
    private static class DoSomethingAndWait extends Thread {
        private String name;
        private CountDownLatch countDownLatch;
        DoSomethingAndWait(String name, CountDownLatch countDownLatch) {
            this.name = name;
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.printf("%s works%n", name);
                TimeUnit.SECONDS.sleep(1); // do work
                System.out.printf("%s waits%n", name);
                countDownLatch.countDown();
                countDownLatch.await();
                System.out.printf("%s ends%n", name);
            } catch (Exception ignore) {
            }
        }
    }
}

Через java.util.concurrent.Phaser:

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class ConcurrentPhaserHelloWorld {
    public static void main(String[] args) throws Exception {
        execute(() -> {
            try {
                System.out.println("callback");
            } catch (Exception ignore) {
            }
        });
    }
    private static void execute(Runnable callback) throws InterruptedException {
        Phaser phaser = new Phaser(3);
        new DoSomethingAndWait("1", phaser).start();
        new DoSomethingAndWait("2", phaser).start();
        new DoSomethingAndWait("3", phaser).start();
        // закомментировать строчку, если захочешь,
        // чтобы поменялся порядок выполнения await()
        TimeUnit.SECONDS.sleep(5);
        System.out.println("main thread awaits, maybe...");
        phaser.awaitAdvance(0);
        callback.run();
        System.out.println("main thread ends");
    }
    private static class DoSomethingAndWait extends Thread {
        private String name;
        private Phaser phaser;
        DoSomethingAndWait(String name, Phaser phaser) {
            this.name = name;
            this.phaser = phaser;
        }
        @Override
        public void run() {
            try {
                System.out.printf("%s works%n", name);
                TimeUnit.SECONDS.sleep(1); // do work
                System.out.printf("%s waits%n", name);
                phaser.arriveAndAwaitAdvance();
                System.out.printf("%s ends%n", name);
            } catch (Exception ignore) {
            }
        }
    }
}
Answer 2

Проблема решается применением Lock и Condition. Что мы получим в итоге? Мы получим счетчик, который будет считать кол-во раз открытия CyclicBarrier'а, и возможность узнавать о состоянии этого счетчика атомарно из любого потока. Это полностью решает стоящую перед нами задачу.
Перейдем к реализации.
Во-первых, у нас есть счетчик, его методы необязательно должны быть synchronized:

class Counter{
    private volatile int count;
    {
        count = 0;
    }
    public void count(){
        count++;
    }
    public int getCount(){
        return count;
    }
    public void reset(){
        count = 0;
    }
}

Runnable, выполняемый при каждом открытии CyclicBarrier'а:

Runnable toRunWhenOpen = () -> {
    cyclicBarrierMainLock.lock();
    try{
        counter.count();
        forFirstCyclicBarrier.signal();
    } catch (Exception e){
        e.printStackTrace();
    } finally{
        cyclicBarrierMainLock.unlock();
    }
};

А вот код в "главном потоке":

cyclicBarrierMainLock.lock();
try{
    if(counter.getCount() == 0){
        System.out.println("MAIN waits");
        forFirstCyclicBarrier.await();
    }
} catch(Exception exception){
    exception.printStackTrace();
} finally{
    cyclicBarrierMainLock.unlock();
}

Видим, что все синхронизировано - Lock не даст случится ситуации, описанной в вопросе. Суть программы же заключается в том, что в ходе выполнения возможны только две ситуации:
1) Runnable выполнится быстрее. Если "главный поток" доберется до cyclicBarrierMainLock.lock() во время выполнения Runnable, то он будет послушно ждать освобождения Lock'а.
2) Главному потоку придется ждать Runnable. Тогда главный поток сначала захватит Lock, а потом, как дойдет до await(), сразу же отпустит. Вот тогда то Runnable и сможет пройти дальше. А как пройдет - даст команду signal(), и уже Runnable будет ждать, когда "главный поток" дойдет до unlock().

Если интересно, вот рабочий вариант, на котором можно проверить работоспособность предложенного способа:

Класс DoSmthingAndWait, который является потоком, который "подписывается" на барьер, когда выполняет всю работу:

class DoSmthingAndWait extends Thread {
    private String name;
    CyclicBarrier cyclicBarrier;
    DoSmthingAndWait(String name, CyclicBarrier cyclicBarrier) {
        this.name = name;
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        System.out.println(name + " works");
        try {
            sleep(3000); // do work
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + " waits");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(name + " ends");
    }
}

Первый сценарий:

ReentrantLock cyclicBarrierMainLock = new ReentrantLock();
Condition forFirstCyclicBarrier = cyclicBarrierMainLock.newCondition();
Counter counter = new Counter();
Runnable toRunWhenOpen = () -> {
    cyclicBarrierMainLock.lock();
    try{
        counter.count();
        forFirstCyclicBarrier.signal();
    } catch (Exception e){
        e.printStackTrace();
    } finally{
        cyclicBarrierMainLock.unlock();
    }
};
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, toRunWhenOpen);
new DoSmthingAndWait("1", cyclicBarrier).start();
TimeUnit.SECONDS.sleep(1);
new DoSmthingAndWait("2", cyclicBarrier).start();
TimeUnit.SECONDS.sleep(1);
new DoSmthingAndWait("3", cyclicBarrier).start();
cyclicBarrierMainLock.lock();
try{
    if(counter.getCount() == 0){
        System.out.println("MAIN waits");
        forFirstCyclicBarrier.await();
    }
} catch(Exception exception){
    exception.printStackTrace();
} finally{
    cyclicBarrierMainLock.unlock();
}
System.out.println("MAIN ends");

Вывод:

1 works
2 works
3 works
MAIN waits
1 waits
2 waits
3 waits
3 ends
2 ends
1 ends
MAIN ends

Второй сценарий:

Добавьте следующую строку перед cyclicBarrierMainLock.lock():

TimeUnit.SECONDS.sleep(5);

Получите следующий вывод:

1 works
2 works
3 works
1 waits
2 waits
3 waits
3 ends
2 ends
1 ends
MAIN ends

READ ALSO
Растяжение изображения

Растяжение изображения

Подскажите, пожалуйста, каким(и) способом можно обработать готовое изображение что бы растянув/сжав придать ему форму разностороннего четырёхугольника...

83
Android поиск в AppBar

Android поиск в AppBar

Я начинающий программистУ меня возникла небольшая проблема

211
Не получается добавить информацию в базу данных с помощью PDO

Не получается добавить информацию в базу данных с помощью PDO

Создал таблицу и пытаюсь занести в неё данные из формыДанные не заносятся

98
Вытащить номера пользователей в массив с функции

Вытащить номера пользователей в массив с функции

Хочу вытащить номера всех рефералов Пользователя, и номера их рефераловЧерез print_r все выводится, но надо вытащить массив

204