Асинхронное чтение/запись в Realm используя RXJava 2

154
01 ноября 2017, 01:11

Моя первая реализация асинхронной работы с помощью RXJava 2.

Цель:

Получить json данные с сервера библиотекой Retrofit2. Если успешно, то записать в Realm и сразу после записи получить обратно данные и отправить адаптеру RecyclerView.

Так вот, я все это реализовал таким образом:

private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {
    String accessToken = accessDataModel.getAccessToken();
    MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<ChatsModel>() {
                @Override
                public void onNext(ChatsModel chatsModel) {
                    if (chatsRepository.hasData()) {
                        chatsRepository.updateChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        Log.d(TAG, "DO, onSuccess updated!");
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                    }
                                    @Override
                                    public void onError(Throwable e) {
                                        Log.d(TAG, "DO, onError when update!");
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                    }
                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });
                    } else {
                        chatsRepository.insertChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                        Log.d(TAG, "DO, onSuccess inserted!");
                                    }
                                    @Override
                                    public void onError(Throwable e) {
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                        Log.d(TAG, "DO, onError when inserting!");
                                    }
                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });
                    }
                }
                @Override
                public void onError(Throwable t) {
                    Log.d(TAG, "onError" + t.getMessage());
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

Я записываю данные в Realm в методе onNext() подписчика MyApplication.getRestApi().getChats().

Вот код записи:

public Observable<ChatsModel> updateChatsData(final ChatsModel chatsModel) {
    return Observable.create(new ObservableOnSubscribe<ChatsModel>() {
        @Override
        public void subscribe(ObservableEmitter<ChatsModel> e) throws Exception {
            if (chatsModel != null) {
                realm.executeTransactionAsync(
                        realm -> realm.copyToRealmOrUpdate(chatsModel),
                        () -> {
                            Log.d(LOG_TAG, "Data success updated!");
                            ChatsModel localChatsModel = getAllChatsData();
                            e.onNext(localChatsModel);
                            e.onComplete();
                        },
                        error -> {
                            Log.d(LOG_TAG, "Update data failed!");
                            e.onError(error);
                        });
            }
        }
    });
}

Метод updateChatsData() записывает асинхронно и объявлен в другом классе.

Как видите мой метод fetchChatsFromNetwork() написан громоздко или мне так кажется.

Вопрос:

Правильно ли я делаю или нет, если нет, то как было бы правильнее?

Answer 1

Можно полностью отвязать запись в БД от уведомления адаптера о новых данных.

  1. Подпишитесь на Observable, выдающий выборку из БД и уведомляющий о ней адаптер.
  2. При сетевом запросе полученные данные пишите в БД.

При таком способе Observable из первого пункта уведомит адаптер сразу после записи/обновлении данных в БД.

Саму запись в БД макже можно проще сделать через flatMap как-то так:

MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
        .flatMap(data -> (chatsRepository.hasData() ? chatsRepository.updateChatsData(data) : chatsRepository.insertChatsData(data)).flatMap(data -> Observable.just(true)))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(
            aBoolean -> System.out.println("data in DB updated"),
            error -> System.out.println("error: " + e.getMessage())
        );

Тут, возможно, придётся поиграться с заменой транзакций записей в БД с синхронных на асинхронные (скорее наоборот) из-за того, как работают асинхронные в потоках без Looper.

READ ALSO
Не могу понять почему актинвость сама меняет жизненный цикл

Не могу понять почему актинвость сама меняет жизненный цикл

Есть приложение которое в потоке Asnytask в методе doİnBackground качает данные и парсит их в Map передает его в onPostExecute метод в котором в İntent кладется...

182
как можно организовать логаут после timeout в jsf?

как можно организовать логаут после timeout в jsf?

У меня есть обычная страница авторизации на сервере

216
почему функция работает только в методе onCreate(), хотя должна работать везде где её вызовут?

почему функция работает только в методе onCreate(), хотя должна работать везде где её вызовут?

Использую библиотеку GraphView для построения графика, для обновления данных построения используется функция seriesresetData(getDataPoint()); но по каким то причинам...

247