diff --git a/core/che-core-api-core/pom.xml b/core/che-core-api-core/pom.xml index fad26fc8b6..58e6603d59 100644 --- a/core/che-core-api-core/pom.xml +++ b/core/che-core-api-core/pom.xml @@ -86,6 +86,10 @@ org.eclipse.che.core che-core-commons-lang + + org.eclipse.che.core + che-core-commons-schedule + org.everrest everrest-core diff --git a/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/websocket/impl/MessagesReSender.java b/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/websocket/impl/MessagesReSender.java index 358ddae899..f10190f48a 100644 --- a/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/websocket/impl/MessagesReSender.java +++ b/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/websocket/impl/MessagesReSender.java @@ -10,15 +10,15 @@ */ package org.eclipse.che.api.core.websocket.impl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; +import com.google.common.collect.EvictingQueue; import java.util.Map; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import javax.inject.Inject; import javax.inject.Singleton; import javax.websocket.Session; +import org.eclipse.che.commons.schedule.ScheduleDelay; /** * Instance is responsible for re-sending messages that were not sent during the period when WEB @@ -29,57 +29,77 @@ import javax.websocket.Session; */ @Singleton public class MessagesReSender { + private static final int MAX_MESSAGES = 100; private final WebSocketSessionRegistry registry; - private final Map> messagesMap = new HashMap<>(); + private final Map> delayedMessageRegistry = + new ConcurrentHashMap<>(); @Inject public MessagesReSender(WebSocketSessionRegistry registry) { this.registry = registry; } + @ScheduleDelay(initialDelay = 60, delay = 60) + void cleanStaleMessages() { + long currentTimeMillis = System.currentTimeMillis(); + + delayedMessageRegistry + .values() + .forEach(it -> it.removeIf(m -> currentTimeMillis - m.timeMillis > 60_000)); + + delayedMessageRegistry.values().removeIf(Queue::isEmpty); + } + public void add(String endpointId, String message) { - List messages = messagesMap.get(endpointId); - if (messages == null) { - messages = new LinkedList<>(); - messagesMap.put(endpointId, messages); - } - - if (messages.size() <= MAX_MESSAGES) { - messages.add(message); - } + delayedMessageRegistry + .computeIfAbsent(endpointId, k -> EvictingQueue.create(MAX_MESSAGES)) + .offer(new DelayedMessage(message)); } public void resend(String endpointId) { - final List messages = messagesMap.remove(endpointId); + Queue delayedMessages = delayedMessageRegistry.remove(endpointId); - if (messages == null || messages.isEmpty()) { + if (delayedMessages == null || delayedMessages.isEmpty()) { return; } - final Optional sessionOptional = registry.get(endpointId); + Optional sessionOptional = registry.get(endpointId); if (!sessionOptional.isPresent()) { return; } - final Session session = sessionOptional.get(); - - final List backing = new ArrayList<>(messages); - messages.clear(); - - for (String message : backing) { + Queue backingQueue = EvictingQueue.create(delayedMessages.size()); + while (!delayedMessages.isEmpty()) { + backingQueue.offer(delayedMessages.poll()); + } + Session session = sessionOptional.get(); + for (DelayedMessage delayedMessage : backingQueue) { if (session.isOpen()) { - session.getAsyncRemote().sendText(message); + session.getAsyncRemote().sendText(delayedMessage.message); } else { - messages.add(message); + delayedMessages.add(delayedMessage); } } - messagesMap.put(endpointId, messages); + if (!delayedMessages.isEmpty()) { + delayedMessageRegistry.put(endpointId, delayedMessages); + } + } + + private static class DelayedMessage { + + private final long timeMillis; + private final String message; + + private DelayedMessage(String message) { + this.message = message; + this.timeMillis = System.currentTimeMillis(); + } } }