Синхронизация подписки на Observable

169
02 июля 2018, 17:10

друзья, нужна ваша помощь о работе с асинхронщиной знаю только теорию, на практике не работал работаю с mongodb-async-driver + mongodb-driver-rx у меня есть запрос на добавление транзакций, который может выполняться из множества потоков. Для выполнения этого запроса он должен получить данные из бд, обновить их. У меня получается так, что два потока в одно время получают одинаковые данные из бд и выполняют операции по их обновлению, хотя должны делать это по очереди.

вот метод, который вызывается асинхронно

@Override
public void addTransaction(Transaction tr) {
    final Bson inputFilter = Filters.eq("_id", tr.getInputId());
    System.out.println(Thread.currentThread() + " начал искать инпут");
    Observer<Input> observer = new Observer<>() {
        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(1);
        }
        @Override
        public void onNext(Input input) {
           выполняем кучу работы по обновлению данных в бд
            только по её заверешению нужно дать доступ другому потоку на добавление транзакций
           });
        }
        @Override
        public void onError(Throwable e) {
            System.out.println("Ошибка при получении Input'ов " + e.getMessage());
        }
        @Override
        public void onComplete() {
            System.out.println("Добавление завершено");
        }
    };
  //подписка не должна происходить, пока происходит работа с бд
    Observables.observe(collectionInput.find(inputFilter)
            .map(QiwiInput::fromDocument)).subscribe(observer);
}

Пытался синхронизировать следующим образом

   private boolean isUpdate = false;
   @Override
    public void addTransaction(Transaction tr) {
        final Bson inputFilter = Filters.eq("_id", tr.getInputId());
        System.out.println("Начинаем искать инпуты");
        System.out.println(Thread.currentThread());
        synchronized(MongDB.this){
        if(isUpdate)
            wait();
        }
        Observables.observe(collectionInput.find(inputFilter)
                .map(QiwiInput::fromDocument)).subscribe(new Observer<>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                isUpdate = true;
                subscription.request(1);
            }
            @Override
            public void onNext(Input input) {
                System.out.println(input.getCommissionBalance().getNumber());
                System.out.println("ща буим добавлять");
                //как закончили операцию
                isUpdate = false;
                MongoDB.this.notifyAll();
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("Ошибка при получении Input'ов " + e.getMessage());
            }
            @Override
            public void onComplete() {
                System.out.println("Добавление завершено");
            }
        });
    }

но этот код выдает IllegalMonitorStateException на строке, где я вызываю notify. Подскажите, как быть

Answer 1

Вы используете синхронизацию на мониторе MongDB.this, в момент когда вы хотите послать сообщение другим потокам, ваш метод вызывающий notify() уже находится за пределами синхронизации, то есть монитор уже отсутствует.

Вы можете пользоваться notify() когда у вас есть монитор. В противном случае попытка вызвать notify() на несуществующий монитор будет вызывать IllegalMonitorStateException.

READ ALSO
Не получается сверстать элемент

Не получается сверстать элемент

Должно получится как на скрине

164
подключение ts файла в html

подключение ts файла в html

как подключить ts файл в htmlКак подключить js файл я знаю но ts нет

226
Как добавить сортировку к меткам карты?

Как добавить сортировку к меткам карты?

Необходимо после карты добавить сортировку - такого типа: отображать все, отобразить аптеки, отобразить гостиницы(Только определенный набор...

213
Выделение текущего пункта меню на CSS

Выделение текущего пункта меню на CSS

Начал делать меню на сайте, и столкнулся с проблемой, не могу понять, как посредством CSS можно выделить категорию меню в которой я сейчас нахожусь,...

181