Нагрузка:
Алгоритм:
Есть Map
у которого ключ - это ID клиента, а значение - время истечения ожидания. Клиенты сдвигают это время при каждом обращении к серверу.
Есть таймер, который срабатывает раз в минуту, пробегает по списку клиентов, находит тех, у которых истекло время ожидания, удаляет их из списка и меняет статус в базе.
Реализация:
public class ActiveClientManager implements Consumer<Integer> {
private final Map<Integer, Long> clients;
private final PoolDataSource pds;
private final long timeout;
private final Object dbMonitor;
public ActiveClientManager(PoolDataSource pds, long timeout) {
clients = new ConcurrentHashMap<>();
this.pds = pds;
this.timeout = timeout;
dbMonitor = new Object();
}
// Действие клиента
@Override
public void accept(Integer clientID) {
// Время после которого, клиент уходит в оффлайн
long offlineTime = System.currentTimeMillis() + timeout;
// Вставляем нового клиента, или обновляем существующего
if (clients.put(clientID, offlineTime) == null) {
// Если была произведена вставка, то меняем статус в базе
try {
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) {
stmt.setInt(1, clientID);
// Перекрываем кислород таймеру
synchronized (dbMonitor) {
stmt.executeUpdate();
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// Обработка события таймера
public void testTimeout() throws SQLException {
long currentTime = System.currentTimeMillis();
// Сюда занесем всех клиентов, которые уже отвалились
List<Integer> removeClients = new ArrayList<>();
clients.replaceAll((key, val) -> {
if (val <= currentTime) {
// если время уже прошло, то сохраняем нашего клиента
removeClients.add(key);
// и удаляем его из общего списка
return null;
}
// иначе ничего не трогаем
return val;
});
// если есть кого удалять
if (removeClients.size() > 0) {
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) {
// блокируем добавление нового клиента
synchronized (dbMonitor) {
for (Integer client : removeClients) {
// если клиент не появился опять в общем списке
if (!clients.containsKey(client)) {
// то сбрасываем ему статус
stmt.setInt(1, client);
stmt.executeUpdate();
}
}
}
}
}
}
}
}
Собственно вопрос - не упустил ли я, что-либо в синхронизации?
Update
Выяснилось, что конструкция
clients.replaceAll((key, val) -> {
if (val <= currentTime) {
// если время уже прошло, то сохраняем нашего клиента
removeClients.add(key);
// и удаляем его из общего списка
return null;
}
// иначе ничего не трогаем
return val;
});
не работает. Если из лямбды вернуть null
, то метод бросает NullPointerException. Переписал так
clients.entrySet().removeIf((entry) -> {
if (entry.getValue() <= currentTime) {
removeClients.add(entry.getKey());
return true;
}
return false;
});
Синхронизируясь по одному объекту dbMonitor
вы ограничиваете паралелльность вашего решения. То есть, клиенты будут "ждать" базу данных, даже если они с разным clientId
. Если использовать встроенную синхронизацию в ConcurrentHashMap
, то получится большая "паралленость" ActiveClientManager
'а.
Но конечно же лучше проверить производительность тестами. Попробуйте создать менеджер, и вызвать методы accept
, testTimeout
из разных потоков.
Не претендую на абсолютную истинность решения, указал лишь на то, что бросилось в глаза.
Пример:
public class ActiveClientManager implements Consumer<Integer> {
private final Map<Integer, Long> clients = new ConcurrentHashMap<>();
private final PoolDataSource pds;
private final long timeout;
public ActiveClientManager(PoolDataSource pds, long timeout) {
this.pds = pds;
this.timeout = timeout;
}
// Действие клиента
@Override
public void accept(Integer clientID) {
// Время после которого, клиент уходит в оффлайн
long offlineTime = System.currentTimeMillis() + timeout;
// Вставляем нового клиента, или обновляем существующего
// синхронизируемся только если один и тот же clientID
clients.compute(clientID, (oldVal, newVal) -> {
if (oldVal == null) {
try {
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?")) {
stmt.setInt(1, clientID);
stmt.executeUpdate();
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return offlineTime;
});
}
// Обработка события таймера
public void testTimeout() throws SQLException {
long currentTime = System.currentTimeMillis();
// Сюда занесем всех клиентов, которые уже отвалились
// синхронизируемся на конерктном client
clients.replaceAll((client, val) -> {
if (val <= currentTime) {
// если время уже прошло, то сохраняем нашего клиента
try (Connection con = pds.getConnection()) {
try (PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?")) {
// блокируем добавление нового клиента
stmt.setInt(1, client);
stmt.executeUpdate();
}
} catch (SQLException e) {
e.printStackTrace();
}
// и удаляем его из общего списка
return null;
}
// иначе ничего не трогаем
return val;
});
}
}
Update
По второй реализации бросается к глаза, что Вы не определились, что делать если база не работает(или возвращает ошибки). В первом случае - accept
вы собираете список исключений, и пытаетесь обновлять "хоть каких-то" клиентов, даже если база начинает "сбоить". А в втором случае - testTimeout
просто логируете сообщение в консоль.
На вашем месте, я бы в обоих случаях бросал RuntimeException, потому что продолжать работать при не работающей базе кажется не логично. Но это зависит от того как Вы работаете с исключениями выше по коду.
Насчет двух циклов, логично не создавать коннект к базе, если он не нужен. Но два идентичных условия проверки - лишь усложняют код. И в будущем если Вам понадобится изменить условие придется менять в двух местах. Я бы оставил создание коннекта внутри.
Так же кажется, что код по работе с базой логично вынести в два отдельных метода или даже объединить в один.
Есть ощущение, что интерфейс Consumer здесь лишний. Напрашивается интерфейс с двумя методами get(), put(). Так же из кода не ясно кто будет вызывать методtestTimeout
. Кажется, что этот метод должен вызывать сам менеджер, через какой-то промежуток времени, ведь это его зона отвественности - поддержка в актуальном состоянии статусов клиентов. Тогда следует завести например Executors.newScheduledThreadPool(1)
, и в нем поток который будет вызывать приватный метод testTimeout
.
Обновленная версия
public class ActiveClientManager implements Consumer<Integer> {
private final Map<Integer, Long> clients = new ConcurrentHashMap<>();;
private final PoolDataSource pds;
private final long timeoutMSec;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException {
this.pds = pds;
this.timeoutMSec = timeoutMSec;
executorService.scheduleWithFixedDelay(this::testTimeout, 5 ,5, TimeUnit.MINUTES);
}
private void testTimeout() {
long currentTime = System.currentTimeMillis();
clients.entrySet().removeIf((entry) -> {
// если время уже прошло
if (entry.getValue() < currentTime) {
// обновляем БД
updateState(entry.getKey(), State.notactive);
// удаляем этот элемент
return true;
}
return false;
});
}
private void updateState(Integer clientId, State state) {
try (Connection con = pds.getConnection();
PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = " + state.getSqlState() + " WHERE id = ?");) {
stmt.setInt(1, clientId);
stmt.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void accept(Integer clientID) {
long offlineTime = System.currentTimeMillis() + timeoutMSec;
clients.compute(clientID, (key, oldval) -> {
// Если запись новая - обновляем БД
if (oldval == null) {
updateState(clientID, State.active);
}
return offlineTime;
});
}
public enum State {
active("1"), notactive("NULL");
private final String sqlState;
State(String sqlState) {
this.sqlState = sqlState;
}
public String getSqlState() {
return sqlState;
}
}
}
С учетом замечания @volyx получился такой класс
public class ActiveClientManager implements Consumer<Integer> {
private final Map<Integer, Long> clients;
private final PoolDataSource pds;
private final long timeoutMSec;
public ActiveClientManager(PoolDataSource pds, long timeoutMSec) throws SQLException {
clients = new ConcurrentHashMap<>();
this.pds = pds;
this.timeoutMSec = timeoutMSec;
}
private boolean isOffline(long currentTime, long limitTime) {
return limitTime <= currentTime;
}
// Ищем, есть ли хоть один клиент с просроченным таймаутом
private boolean searchTimeout(long currentTime) {
for (Long val : clients.values()) {
if (isOffline(currentTime, val))
return true;
};
return false;
}
protected void updateState(Integer clientID, PreparedStatement stmt) throws SQLException {
stmt.setInt(1, clientID);
stmt.executeUpdate();
}
public void testTimeout() throws SQLException {
long currentTime = System.currentTimeMillis();
if (!searchTimeout(currentTime))
return;
// Есть ли хоть один клиент с просроченным таймаутом. Создаем соединение
List<Pair<Integer, SQLException>> exceptions = new ArrayList<>();
try (
Connection con = pds.getConnection();
PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = NULL WHERE id = ?");
) {
clients.entrySet().removeIf((entry) -> {
// если время уже прошло
if (isOffline(currentTime, entry.getValue())) {
// обновляем БД
try {
updateState(entry.getKey(), stmt);
} catch (SQLException e) {
exceptions.add(new Pair<>(entry.getKey(), e));
}
// удаляем этот элемент
return true;
}
return false;
});
}
// если в лямбде были исключения, обрабатываем
if (exceptions.size() > 0)
throw exceptions.get(0).getVal();
}
@Override
public void accept(Integer clientID) {
long offlineTime = System.currentTimeMillis() + timeoutMSec;
clients.compute(clientID, (key, oldval) -> {
// Если запись новая - обновляем БД
if (oldval == null) {
try (
Connection con = pds.getConnection();
PreparedStatement stmt = con.prepareStatement("UPDATE clients SET state = 1 WHERE id = ?");
) {
updateState(entry.getKey(), stmt);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
return offlineTime;
});
}
}
Кофе для программистов: как напиток влияет на продуктивность кодеров?
Рекламные вывески: как привлечь внимание и увеличить продажи
Стратегії та тренди в SMM - Технології, що формують майбутнє сьогодні
Выделенный сервер, что это, для чего нужен и какие характеристики важны?
Современные решения для бизнеса: как облачные и виртуальные технологии меняют рынок
Как вытащить свой профиль firefox и запустить тест, используя этот профиль, при условии, что тест запускается на разных машинах и везде имя профиля...