Управление сессиями Jetty Webscoket Server

349
04 марта 2017, 02:54

В общем задача, при получении одним из webscoket'ов сообщения, переслать это сообщение остальным подключенным websocket'ам. Использую jetty-9.2.20.v20161216.

Вот так инициализирую сервер.

JettyWSServer websocketServer = new JettyWSServer("localhost", 8000, new MySocketHandler(), new QueuedThreadPool(128));
public <T extends WebSocketHandler> JettyWSServer(String hostName, int port, T webscoketHandler, QueuedThreadPool threadPool) {
    this.hostName = hostName;
    this.port = port;
    this.handler = webscoketHandler;
    this.threadPool = threadPool;
    this.socket = null;
    //create server
    this.server = new Server(this.threadPool);
    //set connector
    ServerConnector connector = new ServerConnector(server);
    connector.setHost(this.hostName);
    connector.setPort(this.port);
    this.server.addConnector(connector);
    //set handler
    this.server.setHandler(this.handler);
    //set listener
    setLifecycleListener();
}

MySocketHandler.java

public class MySocketHandler extends WebSocketHandler {
    private final String TAG = MySocketHandler.class.getSimpleName();
    private MySocketCreator creator;
    @Override
    public void configure(WebSocketServletFactory webSocketServletFactory) {
        this.creator = new MySocketCreator();
        webSocketServletFactory.setCreator(this.creator);
    }

    public Set<ServerSocket> getSockets(){
        return this.creator.getSockets();
    }
}

MySocketCreator.java

public class MySocketCreator implements WebSocketCreator {
    private static final String TAG = MySocketCreator.class.getSimpleName();
    private static Log log = new Log(TAG, true);
    private Set<ServerSocket> sockets = new HashSet<>();
    private Set<Session> guests = new HashSet<>();
    private ConcurrentHashMap<ServiceUser, ArrayList<WSDeviceSessionWrapper>> users = new ConcurrentHashMap<>();

    @Override
    public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
        ServerSocket socket = new ServerSocket(statusCallback);
        sockets.add(socket);
        return socket;
    }

    private OnSessionStatusListener statusCallback = new OnSessionStatusListener() {
        @Override
        public void onGuestIn(Session session) {

            synchronized (this) {
                guests.add(session);
                Integer totalAgeReduce = users.values()
                        .stream()
                        .map(wsDeviceSessionWrappers -> {
                            return 1;
                        })
                        .reduce(
                                0,
                                (a, b) -> a + b);
                log.debug("onGuestIn() " + "Guests: " + guests.size() + " Registered: " + totalAgeReduce);
            }
        }
        @Override
        public void onUserIn(Session session, ServiceUser user, Device device) {

            synchronized (this) {
                if (guests.contains(session)) guests.remove(session);
                if (!users.containsKey(user)) {
                    users.put(user, new ArrayList<WSDeviceSessionWrapper>());
                }
                users.get(user).add(new WSDeviceSessionWrapper(session, device));
                log.debug("onUserIn() " + "Guests: " + guests.size() + " Registered: " + users.size());
            }
        }
        @Override
        public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) {
        log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size());
         MySocketCreator.this.users.keySet().forEach(user -> {
               users.forEach(u -> {
                   if (user.equals(u)) {
                       ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user);
                       new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> {
                               wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
                           }
                       });
                   }
               });
           });

        }
        @Override
        public void sendResponse(ServiceUser user, WSResponse response, Device excludeDevice) {
                MySocketCreator.this.users.get(user).forEach(wrapper -> {
                    wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
                });
        }
        @Override
        public void onExit(Session session, ServiceUser user, Device device) {
            synchronized (this) {
                //remove from guest sessions
                if (session != null && guests.contains(session)) guests.remove(session);

                if (user != null && device != null && users.containsKey(user)) {
                    ArrayList<WSDeviceSessionWrapper> wrappers = users.get(user);
                    Iterator<WSDeviceSessionWrapper> iterator = wrappers.iterator();
                    while (iterator.hasNext()) {
                        WSDeviceSessionWrapper wrapper = iterator.next();
                        if (wrapper.getSession() == session || wrapper.getSession().equals(session) && wrapper.getDevice() == device || wrapper.getDevice().equals(device)) {
                            //remove session for current device
                            iterator.remove();
                            //if user does not have session on server
                            //remove him from current server users
                            if (wrappers.size() == 0) {
                                users.remove(user);
                            }
                        }
                    }
                }
                Integer totalRegisteredDevices = users.values()
                        .stream()
                        .map(wsDeviceSessionWrappers -> {
                            return 1;
                        })
                        .reduce(
                                0,
                                (a, b) -> a + b);
                log.debug("onExit() " + "Guests: " + guests.size() + " Registered: " + totalRegisteredDevices);
            }

        }
    };
    public Set<ServerSocket> getSockets() {
        return sockets;
    }
}

Логика работы такая:

В классе MySocketCreator , при создании нового сокета я передаю туда callback. Далее в самом сокете в событии onOpen я вызываю callback и передаю в него сессию, и эта сессия сохраняется в классе MySocketCreator, после я эту сессию связываю с юзером и девайсом.

Проблема заключается в том что когда я пытаюсь отправить сообщени все пользователям из какого либо websocket'a вызвать через callback метод

    @Override
    public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) {
    log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size());
       MySocketCreator.this.users.keySet().forEach(user -> {
           users.forEach(u -> {
               if (user.equals(u)) {
                   ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user);
                   new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> {
                           wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());
                       }
                   });
               }
           });
       });
    }

строка wrapper.getSession().getRemote().sendStringByFuture(response.toJSON());

лочит поток и сервер виснет. Попытка заменить ее на

wrapper.getSession().getRemote().sendString(response.toJSON());

генерирует исключения

java.lang.IllegalStateException: Blocking message pending 10000 for BLOCKING
org.eclipse.jetty.websocket.api.WebSocketException: RemoteEndpoint unavailable, outgoing connection not open

Получаеться что оба варианта не работают и зависанию и некорректно работе сервера. И это все всего лишь при 300 подключениях.

Вопрос:каким образом я могу выполнить задачу, отправив всем юзерам сообщения, в чем может быть моя ошибка?

READ ALSO
java Условие прохождения 24 часов

java Условие прохождения 24 часов

Здравствуйте, есть следующая задачка, оповестить пользователя о прохождения 24 часов с момента нажатия на кнопкуТекущее время получаю:

255
backendless 3 или 4

backendless 3 или 4

С какой версии лучше начать использование сервиса backendless?

242
Изменение цвета при нажатии на кнопку со своим стилем

Изменение цвета при нажатии на кнопку со своим стилем

Мне нужно при нажатии на кнопку, менять ее цвет, но при этом моя кнопка имеет закругленные углы, то есть уже имеется файл xml:

385
JComboBox выравнивание текста

JComboBox выравнивание текста

есть кнопка-выпадающий список,в ней есть строки,что прописать чтобы текст был по центру,а не слева? нашел вот такое,но не знаю как применить

321