ConcurrentModificationException в forEach цикле

219
10 апреля 2018, 06:24

Имеется многопоточная среда обработки ивент-событий. Сам поток обработчик:

private class EventCallable implements Runnable {
    @Override
    public void run() {
        EventContainer event;
        while ((event = events.poll()) != null) {
            synchronized (event) {
                try {
                    if (event.future.isCancelled()) {
                        continue;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Dispatching event {}", event.event);
                    }
                    event.future.running = true;
                    event.future.complete = false;
                    doDispatch(event.event);
                    event.future.set(event.event);
                } catch (Throwable t) {
                    event.future.setException(t);
                    log.warn("Exception in WorldEventDispatcher thread", t);
                }
            }
        }
    }
}

функция обработки:

private void doDispatch(WorldEvent event) {
    final ObjectID<?>[] objects = event.getDispatchableObjects();
    for (ObjectID<?> obj : objects) {
        if (obj == null) {
            continue;
        }
        if (!globalListeners.isEmpty()) {
            readLock.lock();
            try {
                final Set<WorldListener> notDispatched = CollectionFactory.newSet();
                for (WorldListener listener : globalListeners) {
                    if (!listener.dispatch(event)) {
                        notDispatched.add(listener);
                    }
                }
                notDispatched.forEach(globalListeners::remove);
            } finally {
                readLock.unlock();
            }
        }
        Set<WorldListener> worldListeners = getListeners(obj);
        if (!worldListeners.isEmpty()) {
            readLock.lock();
            try {
                final Set<WorldListener> notDispatched = CollectionFactory.newSet();
                for (WorldListener listener : worldListeners) {
                    if (!listener.dispatch(event)) {
                        notDispatched.add(listener);
                    }
                }
            } finally {
                readLock.unlock();
            }
        }
    }
}

Конструкция коллекции

private Set<WorldListener> globalListeners = CollectionFactory.newSet();
private ConcurrentHashMap<ObjectID<?>, Set<WorldListener>> listeners = CollectionFactory.newConcurrentHashMap();

функция вызова слушателя:

private Set<WorldListener> getListeners(ObjectID<?> id) {
    Preconditions.checkNotNull(id, "id");
    Set<WorldListener> set = listeners.get(id);
    writeLock.lock();
    try {
        if (set == null) {
            set = CollectionFactory.newSet();
            listeners.put(id, set);
        }
        return set;
    } finally {
        writeLock.unlock();
    }
}

Ексепшн летит при итерировании как listeners так и globalListeners коллекциях. Пробовал как синхронизацию так и как есть сейчас вариант на блокировках, итераторы. Собственно говоря, в чем может быть проблема? Сама ошибка :

[event-dispatcher-2] [22:07:25] [WARN] WorldEventDispatcherServiceImpl:     Exception in WorldEventDispatcher thread
java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[?:1.8.0_60]
at java.util.HashMap$KeyIterator.next(HashMap.java:1453) ~[?:1.8.0_60]
at org.genfork.game.services.world.event.WorldEventDispatcherServiceImpl.doDispatch(WorldEventDispatcherServiceImpl.java:93) ~[classes/:?]
at org.genfork.game.services.world.event.WorldEventDispatcherServiceImpl.access$800(WorldEventDispatcherServiceImpl.java:34) ~[classes/:?]
at org.genfork.game.services.world.event.WorldEventDispatcherServiceImpl$EventCallable.run(WorldEventDispatcherServiceImpl.java:233) [classes/:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

На строках - for (WorldListener listener : globalListeners) и for (WorldListener listener : listeners)

Answer 1

Ошибку ConcurrentModificationException можно получить при изменении коллекции во время итерирования, даже в однопоточном приложении.

Пример:

Set<String> st2 = new HashSet<>();
st2.add("a");
st2.add("b");
st2.add("c");
for (String s : st2) {
    st2.remove(s);
    //or st2.add("xxx");
}

С учетом приведённого кода, ошибка происходит скорее всего в строке

for (WorldListener listener : globalListeners)

т.к. globalListeners у нас простой сет, а не потокозащищенный (для сета в стектрейсе также присутствует at java.util.HashMap$HashIterator.nextNode)

Можно сделать globalListeners потокозащищенным сетом ConcurrentSkipListSet, перебирать через итератор используя remove + убрать notDispatched и локи. Если оставите локи на чтение/запись, то notDispatched.forEach(globalListeners::remove) лучше вынести за readLock и "завернуть" в writeLock

Для listeners немного сложнее, т.к. там вложенные сеты

READ ALSO
Уменьшить ширину Line Chart в библиотеке MPAndroidChart

Уменьшить ширину Line Chart в библиотеке MPAndroidChart

Создаю график с одной точкой 0

192
Преобразование Xml файла с помощью xslt. Java

Преобразование Xml файла с помощью xslt. Java

Доброго времени суток дамы и господаИмеется xml файл следующего вида :

168
как удалить Connector/NET 6.9.9

как удалить Connector/NET 6.9.9

Привет всем! Вот уже целый день пытаюсь установить MySql community 57

471