Вопрос вызван тем, что у меня есть CyclicBarrier
, и мне из главного потока нужно узнать, когда CyclicBarrier
открылся. Для CyclicBarrier
я могу указать Runnable
, который выполнится, когда барьер откроется:
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {/*здесь*/});
В главном потоке я запускаю все потоки(3), связанные с CyclicBarrier
, и жду, допустим, 12 секунд.
Теперь возможны две ситуации:
1) Барьер уже открыт
2) Барьер еще закрыт
Так, теперь встает интересная проблема:
Мы не можем сделать сразу cyclicBarrier.await()
, так как если 1), то мы застрянем навечно. Если мы сделаем проверку(getNumberWaiting() == 0
), и только потом сделаю cyclicBarrier.await()
, то есть вероятность, что барьер откроется в аккурат между этими операциями.
Есть ли атомарный способ решить проблему?
Через synchronized
и wait
/notify
(+ java.util.concurrent.CyclicBarrier
и java.util.concurrent.atomic.AtomicBoolean
):
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class SynchronizedHelloWorld {
public static void main(String[] args) throws Exception {
AtomicBoolean isBarrierBroken = new AtomicBoolean();
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
synchronized (isBarrierBroken) {
isBarrierBroken.set(true);
isBarrierBroken.notify();
}
});
new DoSomethingAndWait("1", cyclicBarrier).start();
new DoSomethingAndWait("2", cyclicBarrier).start();
new DoSomethingAndWait("3", cyclicBarrier).start();
// закомментировать строчку, если захочешь,
// чтобы поменялся порядок выполнения wait() и notify()
TimeUnit.SECONDS.sleep(5);
synchronized (isBarrierBroken) {
if (!isBarrierBroken.get()) {
System.out.println("main thread waits");
isBarrierBroken.wait();
}
}
System.out.println("main thread ends");
}
private static class DoSomethingAndWait extends Thread {
private String name;
private CyclicBarrier cyclicBarrier;
DoSomethingAndWait(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.printf("%s works%n", name);
TimeUnit.SECONDS.sleep(1); // do work
System.out.printf("%s waits%n", name);
cyclicBarrier.await();
System.out.printf("%s ends%n", name);
} catch (Exception ignore) {
}
}
}
}
Через java.util.concurrent.Lock
и java.util.concurrent.Condition
(+ java.util.concurrent.CyclicBarrier
и java.util.concurrent.atomic.AtomicBoolean
):
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.*;
public class ConcurrentLockHelloWorld {
public static void main(String[] args) throws Exception {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
AtomicBoolean isBarrierBroken = new AtomicBoolean();
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
try {
lock.lock();
isBarrierBroken.set(true);
condition.signal();
} catch (Exception ignore) {
} finally {
lock.unlock();
}
});
new DoSomethingAndWait("1", cyclicBarrier).start();
new DoSomethingAndWait("2", cyclicBarrier).start();
new DoSomethingAndWait("3", cyclicBarrier).start();
// закомментировать строчку, если захочешь,
// чтобы поменялся порядок выполнения await() и signal()
TimeUnit.SECONDS.sleep(5);
try {
lock.lock();
if (!isBarrierBroken.get()) {
System.out.println("main thread awaits");
condition.await();
}
} finally {
lock.unlock();
}
System.out.println("main thread ends");
}
private static class DoSomethingAndWait extends Thread {
private String name;
private CyclicBarrier cyclicBarrier;
DoSomethingAndWait(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.printf("%s works%n", name);
TimeUnit.SECONDS.sleep(1); // do work
System.out.printf("%s waits%n", name);
cyclicBarrier.await();
System.out.printf("%s ends%n", name);
} catch (Exception ignore) {
}
}
}
}
Через java.util.concurrent.Exchanger
и java.util.concurrent.CyclicBarrier
:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class ConcurrentExchangerHelloWorld {
public static void main(String[] args) throws Exception {
Exchanger<String> exchanger = new Exchanger<>();
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
try {
System.out.println("barrier broken callback starts");
exchanger.exchange(null);
System.out.println("barrier broken callback ends");
} catch (Exception ignore) {
}
});
new DoSomethingAndWait("1", cyclicBarrier).start();
new DoSomethingAndWait("2", cyclicBarrier).start();
new DoSomethingAndWait("3", cyclicBarrier).start();
// закомментировать строчку, если захочешь,
// чтобы поменялся порядок вызова exchange()
TimeUnit.SECONDS.sleep(5);
System.out.println("main thread waits, maybe...");
exchanger.exchange(null);
System.out.println("main thread ends");
}
private static class DoSomethingAndWait extends Thread {
private String name;
private CyclicBarrier cyclicBarrier;
DoSomethingAndWait(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.printf("%s works%n", name);
TimeUnit.SECONDS.sleep(1); // do work
System.out.printf("%s waits%n", name);
cyclicBarrier.await();
System.out.printf("%s ends%n", name);
} catch (Exception ignore) {
}
}
}
}
Через java.util.concurrent.CountDownLatch
:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ConcurrentCountDownLatchHelloWorld {
public static void main(String[] args) throws Exception {
execute(() -> {
try {
System.out.println("callback");
} catch (Exception ignore) {
}
});
}
private static void execute(Runnable callback) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new DoSomethingAndWait("1", countDownLatch).start();
new DoSomethingAndWait("2", countDownLatch).start();
new DoSomethingAndWait("3", countDownLatch).start();
// закомментировать строчку, если захочешь,
// чтобы поменялся порядок выполнения await()
TimeUnit.SECONDS.sleep(5);
System.out.println("main thread awaits");
countDownLatch.await();
callback.run();
System.out.println("main thread ends");
}
private static class DoSomethingAndWait extends Thread {
private String name;
private CountDownLatch countDownLatch;
DoSomethingAndWait(String name, CountDownLatch countDownLatch) {
this.name = name;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.printf("%s works%n", name);
TimeUnit.SECONDS.sleep(1); // do work
System.out.printf("%s waits%n", name);
countDownLatch.countDown();
countDownLatch.await();
System.out.printf("%s ends%n", name);
} catch (Exception ignore) {
}
}
}
}
Через java.util.concurrent.Phaser
:
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class ConcurrentPhaserHelloWorld {
public static void main(String[] args) throws Exception {
execute(() -> {
try {
System.out.println("callback");
} catch (Exception ignore) {
}
});
}
private static void execute(Runnable callback) throws InterruptedException {
Phaser phaser = new Phaser(3);
new DoSomethingAndWait("1", phaser).start();
new DoSomethingAndWait("2", phaser).start();
new DoSomethingAndWait("3", phaser).start();
// закомментировать строчку, если захочешь,
// чтобы поменялся порядок выполнения await()
TimeUnit.SECONDS.sleep(5);
System.out.println("main thread awaits, maybe...");
phaser.awaitAdvance(0);
callback.run();
System.out.println("main thread ends");
}
private static class DoSomethingAndWait extends Thread {
private String name;
private Phaser phaser;
DoSomethingAndWait(String name, Phaser phaser) {
this.name = name;
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.printf("%s works%n", name);
TimeUnit.SECONDS.sleep(1); // do work
System.out.printf("%s waits%n", name);
phaser.arriveAndAwaitAdvance();
System.out.printf("%s ends%n", name);
} catch (Exception ignore) {
}
}
}
}
Проблема решается применением Lock
и Condition
.
Что мы получим в итоге? Мы получим счетчик, который будет считать кол-во раз открытия CyclicBarrier'а
, и возможность узнавать о состоянии этого счетчика атомарно из любого потока. Это полностью решает стоящую перед нами задачу.
Перейдем к реализации.
Во-первых, у нас есть счетчик, его методы необязательно должны быть synchronized
:
class Counter{
private volatile int count;
{
count = 0;
}
public void count(){
count++;
}
public int getCount(){
return count;
}
public void reset(){
count = 0;
}
}
Runnable
, выполняемый при каждом открытии CyclicBarrier'а
:
Runnable toRunWhenOpen = () -> {
cyclicBarrierMainLock.lock();
try{
counter.count();
forFirstCyclicBarrier.signal();
} catch (Exception e){
e.printStackTrace();
} finally{
cyclicBarrierMainLock.unlock();
}
};
А вот код в "главном потоке":
cyclicBarrierMainLock.lock();
try{
if(counter.getCount() == 0){
System.out.println("MAIN waits");
forFirstCyclicBarrier.await();
}
} catch(Exception exception){
exception.printStackTrace();
} finally{
cyclicBarrierMainLock.unlock();
}
Видим, что все синхронизировано - Lock
не даст случится ситуации, описанной в вопросе. Суть программы же заключается в том, что в ходе выполнения возможны только две ситуации:
1) Runnable
выполнится быстрее. Если "главный поток" доберется до cyclicBarrierMainLock.lock()
во время выполнения Runnable
, то он будет послушно ждать освобождения Lock'а
.
2) Главному потоку придется ждать Runnable
. Тогда главный поток сначала захватит Lock
, а потом, как дойдет до await()
, сразу же отпустит. Вот тогда то Runnable
и сможет пройти дальше. А как пройдет - даст команду signal()
, и уже Runnable
будет ждать, когда "главный поток" дойдет до unlock()
.
Если интересно, вот рабочий вариант, на котором можно проверить работоспособность предложенного способа:
Класс DoSmthingAndWait
, который является потоком, который "подписывается" на барьер, когда выполняет всю работу:
class DoSmthingAndWait extends Thread {
private String name;
CyclicBarrier cyclicBarrier;
DoSmthingAndWait(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(name + " works");
try {
sleep(3000); // do work
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " waits");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " ends");
}
}
Первый сценарий:
ReentrantLock cyclicBarrierMainLock = new ReentrantLock();
Condition forFirstCyclicBarrier = cyclicBarrierMainLock.newCondition();
Counter counter = new Counter();
Runnable toRunWhenOpen = () -> {
cyclicBarrierMainLock.lock();
try{
counter.count();
forFirstCyclicBarrier.signal();
} catch (Exception e){
e.printStackTrace();
} finally{
cyclicBarrierMainLock.unlock();
}
};
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, toRunWhenOpen);
new DoSmthingAndWait("1", cyclicBarrier).start();
TimeUnit.SECONDS.sleep(1);
new DoSmthingAndWait("2", cyclicBarrier).start();
TimeUnit.SECONDS.sleep(1);
new DoSmthingAndWait("3", cyclicBarrier).start();
cyclicBarrierMainLock.lock();
try{
if(counter.getCount() == 0){
System.out.println("MAIN waits");
forFirstCyclicBarrier.await();
}
} catch(Exception exception){
exception.printStackTrace();
} finally{
cyclicBarrierMainLock.unlock();
}
System.out.println("MAIN ends");
Вывод:
1 works
2 works
3 works
MAIN waits
1 waits
2 waits
3 waits
3 ends
2 ends
1 ends
MAIN ends
Второй сценарий:
Добавьте следующую строку перед cyclicBarrierMainLock.lock()
:
TimeUnit.SECONDS.sleep(5);
Получите следующий вывод:
1 works
2 works
3 works
1 waits
2 waits
3 waits
3 ends
2 ends
1 ends
MAIN ends
Айфон мало держит заряд, разбираемся с проблемой вместе с AppLab
Подскажите, пожалуйста, каким(и) способом можно обработать готовое изображение что бы растянув/сжав придать ему форму разностороннего четырёхугольника...
Создал таблицу и пытаюсь занести в неё данные из формыДанные не заносятся
Хочу вытащить номера всех рефералов Пользователя, и номера их рефераловЧерез print_r все выводится, но надо вытащить массив