diff --git a/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/notification/RemoteSubscriptionManager.java b/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/notification/RemoteSubscriptionManager.java new file mode 100644 index 0000000000..cdb73d4d42 --- /dev/null +++ b/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/notification/RemoteSubscriptionManager.java @@ -0,0 +1,93 @@ +/******************************************************************************* + * Copyright (c) 2012-2017 Codenvy, S.A. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Codenvy, S.A. - initial API and implementation + *******************************************************************************/ +package org.eclipse.che.api.core.notification; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator; +import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; +import org.eclipse.che.api.core.notification.dto.EventSubscription; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiPredicate; + +import static java.util.Collections.emptySet; + +@Singleton +public class RemoteSubscriptionManager { + private final Map> subscriptionContexts = new HashMap<>(); + + private final EventService eventService; + private final RequestTransmitter requestTransmitter; + + @Inject + public RemoteSubscriptionManager(EventService eventService, + RequestTransmitter requestTransmitter) { + this.eventService = eventService; + this.requestTransmitter = requestTransmitter; + } + + @Inject + private void configureSubscription(RequestHandlerConfigurator requestHandlerConfigurator) { + requestHandlerConfigurator.newConfiguration() + .methodName("subscribe") + .paramsAsDto(EventSubscription.class) + .noResult() + .withBiConsumer(this::consumeSubscriptionRequest); + + requestHandlerConfigurator.newConfiguration() + .methodName("unSubscribe") + .paramsAsDto(EventSubscription.class) + .noResult() + .withBiConsumer(this::consumeUnSubscriptionRequest); + } + + public void register(String method, Class eventType, BiPredicate> biPredicate) { + eventService.subscribe(event -> subscriptionContexts.get(method) + .stream() + .filter(context -> biPredicate.test(event, context.scope)) + .forEach(context -> transmit(context.endpointId, method, event)), + eventType); + } + + private void consumeSubscriptionRequest(String endpointId, EventSubscription eventSubscription) { + subscriptionContexts.computeIfPresent(eventSubscription.getMethod(), (k, v) -> new HashSet<>()) + .add(new SubscriptionContext(endpointId, eventSubscription.getScope())); + } + + private void consumeUnSubscriptionRequest(String endpointId, EventSubscription eventSubscription) { + subscriptionContexts.getOrDefault(eventSubscription.getMethod(), emptySet()) + .removeIf(subscriptionContext -> Objects.equals(subscriptionContext.endpointId, endpointId)); + } + + private void transmit(String endpointId, String method, T event) { + requestTransmitter.newRequest() + .endpointId(endpointId) + .methodName(method) + .paramsAsDto(event) + .sendAndSkipResult(); + } + + private class SubscriptionContext { + private final String endpointId; + private final Map scope; + + private SubscriptionContext(String endpointId, Map scope) { + this.endpointId = endpointId; + this.scope = scope; + } + } +} diff --git a/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/notification/dto/EventSubscription.java b/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/notification/dto/EventSubscription.java new file mode 100644 index 0000000000..3a3a4185a8 --- /dev/null +++ b/core/che-core-api-core/src/main/java/org/eclipse/che/api/core/notification/dto/EventSubscription.java @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright (c) 2012-2017 Codenvy, S.A. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Codenvy, S.A. - initial API and implementation + *******************************************************************************/ +package org.eclipse.che.api.core.notification.dto; + +import org.eclipse.che.dto.shared.DTO; + +import java.util.Map; + +@DTO +public interface EventSubscription { + String getMethod(); + + EventSubscription withMethod(String method); + + Map getScope(); + + EventSubscription withScope(Map scope); +} diff --git a/core/che-core-api-model/src/main/java/org/eclipse/che/api/core/model/workspace/runtime/RuntimeIdentity.java b/core/che-core-api-model/src/main/java/org/eclipse/che/api/core/model/workspace/runtime/RuntimeIdentity.java index 1ed798cd12..994f21a103 100644 --- a/core/che-core-api-model/src/main/java/org/eclipse/che/api/core/model/workspace/runtime/RuntimeIdentity.java +++ b/core/che-core-api-model/src/main/java/org/eclipse/che/api/core/model/workspace/runtime/RuntimeIdentity.java @@ -1,3 +1,13 @@ +/******************************************************************************* + * Copyright (c) 2012-2017 Codenvy, S.A. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Codenvy, S.A. - initial API and implementation + *******************************************************************************/ package org.eclipse.che.api.core.model.workspace.runtime; /** diff --git a/ide/che-core-ide-api/src/main/java/org/eclipse/che/ide/api/notification/SubscriptionManagerClient.java b/ide/che-core-ide-api/src/main/java/org/eclipse/che/ide/api/notification/SubscriptionManagerClient.java new file mode 100644 index 0000000000..b4000eb570 --- /dev/null +++ b/ide/che-core-ide-api/src/main/java/org/eclipse/che/ide/api/notification/SubscriptionManagerClient.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright (c) 2012-2017 Codenvy, S.A. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Codenvy, S.A. - initial API and implementation + *******************************************************************************/ +package org.eclipse.che.ide.api.notification; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; +import org.eclipse.che.api.core.notification.dto.EventSubscription; +import org.eclipse.che.ide.dto.DtoFactory; + +import java.util.Map; + +@Singleton +public class SubscriptionManagerClient { + private final RequestTransmitter requestTransmitter; + private final DtoFactory dtoFactory; + + @Inject + SubscriptionManagerClient(RequestTransmitter requestTransmitter, DtoFactory dtoFactory) { + this.requestTransmitter = requestTransmitter; + this.dtoFactory = dtoFactory; + } + + public void subscribe(String endpointId, String method, Map scope) { + requestTransmitter.newRequest() + .endpointId(endpointId) + .methodName("subscribe") + .paramsAsDto(dtoFactory.createDto(EventSubscription.class).withMethod(method).withScope(scope)) + .sendAndSkipResult(); + } +} diff --git a/ide/che-core-ide-api/src/main/resources/org/eclipse/che/api/core/Core.gwt.xml b/ide/che-core-ide-api/src/main/resources/org/eclipse/che/api/core/Core.gwt.xml index d4087dabb5..e4c0e0dd70 100644 --- a/ide/che-core-ide-api/src/main/resources/org/eclipse/che/api/core/Core.gwt.xml +++ b/ide/che-core-ide-api/src/main/resources/org/eclipse/che/api/core/Core.gwt.xml @@ -24,4 +24,5 @@ + diff --git a/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/bootstrap/CurrentWorkspaceManager.java b/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/bootstrap/CurrentWorkspaceManager.java index b4fd9135e3..4d16039fb7 100644 --- a/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/bootstrap/CurrentWorkspaceManager.java +++ b/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/bootstrap/CurrentWorkspaceManager.java @@ -17,18 +17,18 @@ import com.google.inject.Singleton; import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; import org.eclipse.che.api.core.model.workspace.Workspace; import org.eclipse.che.api.core.model.workspace.WorkspaceStatus; -import org.eclipse.che.api.promises.client.Operation; -import org.eclipse.che.api.promises.client.PromiseError; -import org.eclipse.che.api.workspace.shared.dto.WorkspaceDto; import org.eclipse.che.ide.CoreLocalizationConstant; import org.eclipse.che.ide.api.app.AppContext; import org.eclipse.che.ide.api.notification.NotificationManager; +import org.eclipse.che.ide.api.notification.SubscriptionManagerClient; import org.eclipse.che.ide.context.BrowserAddress; import org.eclipse.che.ide.ui.loaders.LoaderPresenter; -import org.eclipse.che.ide.util.loging.Log; import org.eclipse.che.ide.workspace.WorkspaceServiceClient; import org.eclipse.che.ide.workspace.WorkspaceStatusHandler; +import java.util.Map; + +import static java.util.Collections.singletonMap; import static org.eclipse.che.api.core.model.workspace.WorkspaceStatus.RUNNING; import static org.eclipse.che.api.core.model.workspace.WorkspaceStatus.STOPPED; import static org.eclipse.che.api.core.model.workspace.WorkspaceStatus.STOPPING; @@ -55,6 +55,7 @@ public class CurrentWorkspaceManager { private final CoreLocalizationConstant messages; private final WorkspaceStatusHandler wsStatusHandler; private final AppContext appContext; + private final SubscriptionManagerClient subscriptionManagerClient; @Inject CurrentWorkspaceManager(WorkspaceServiceClient workspaceServiceClient, @@ -65,7 +66,8 @@ public class CurrentWorkspaceManager { IdeInitializer ideInitializer, CoreLocalizationConstant messages, WorkspaceStatusHandler wsStatusHandler, - AppContext appContext) { + AppContext appContext, + SubscriptionManagerClient subscriptionManagerClient) { this.workspaceServiceClient = workspaceServiceClient; this.browserAddress = browserAddress; this.transmitter = transmitter; @@ -75,6 +77,7 @@ public class CurrentWorkspaceManager { this.messages = messages; this.wsStatusHandler = wsStatusHandler; this.appContext = appContext; + this.subscriptionManagerClient = subscriptionManagerClient; } // TODO: handle errors while workspace starting (show message dialog) @@ -93,19 +96,22 @@ public class CurrentWorkspaceManager { } private void subscribeToEvents(String workspaceId) { - subscribe(WS_STATUS_ERROR_MSG, "event:workspace-status:subscribe", workspaceId); + workspaceServiceClient.getWorkspace(browserAddress.getWorkspaceKey()) + .then(skip -> { + String endpointId = "ws-master"; + String method = "workspace/statusChanged"; + Map scope = singletonMap("workspaceId", workspaceId); + subscriptionManagerClient.subscribe(endpointId, method, scope); + }); + + +// subscribe(WS_STATUS_ERROR_MSG, "event:workspace-status:subscribe", workspaceId); // subscribe(WS_AGENT_OUTPUT_ERROR_MSG, "event:ws-agent-output:subscribe", workspaceId); // subscribe(ENV_STATUS_ERROR_MSG, "event:environment-status:subscribe", workspaceId); } private void subscribe(String it, String methodName, String id) { - workspaceServiceClient.getWorkspace(browserAddress.getWorkspaceKey()) - .then((Operation)skip -> transmitter.newRequest() - .endpointId("ws-master") - .methodName(methodName) - .paramsAsString(id) - .sendAndSkipResult()) - .catchError((Operation)error -> Log.error(getClass(), it + ": " + error.getMessage())); + } /** Starts the workspace with the default environment. */ diff --git a/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/EnvironmentStatusEventHandler.java b/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/EnvironmentStatusEventHandler.java index 2367c77fda..4983a3102e 100644 --- a/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/EnvironmentStatusEventHandler.java +++ b/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/EnvironmentStatusEventHandler.java @@ -25,7 +25,7 @@ class EnvironmentStatusEventHandler { @Inject void configureEnvironmentStatusHandler(RequestHandlerConfigurator configurator, Provider handlerProvider) { configurator.newConfiguration() - .methodName("event:environment-status:changed") + .methodName("machine/statusChanged") .paramsAsDto(MachineStatusEvent.class) .noResult() .withBiConsumer((endpointId, event) -> { diff --git a/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/WorkspaceStatusEventHandler.java b/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/WorkspaceStatusEventHandler.java index 63b9be3ed6..8a078ad6ac 100644 --- a/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/WorkspaceStatusEventHandler.java +++ b/ide/che-core-ide-app/src/main/java/org/eclipse/che/ide/workspace/events/WorkspaceStatusEventHandler.java @@ -25,7 +25,7 @@ class WorkspaceStatusEventHandler { @Inject WorkspaceStatusEventHandler(RequestHandlerConfigurator configurator, Provider handlerProvider) { configurator.newConfiguration() - .methodName("event:workspace-status:changed") + .methodName("workspace/statusChanged") .paramsAsDto(WorkspaceStatusEvent.class) .noResult() .withBiConsumer((endpointId, event) -> { diff --git a/infrastructures/docker/src/main/java/org/eclipse/che/workspace/infrastructure/docker/snapshot/SnapshotException.java b/infrastructures/docker/src/main/java/org/eclipse/che/workspace/infrastructure/docker/snapshot/SnapshotException.java index b9318b1c9b..27ca61e55e 100644 --- a/infrastructures/docker/src/main/java/org/eclipse/che/workspace/infrastructure/docker/snapshot/SnapshotException.java +++ b/infrastructures/docker/src/main/java/org/eclipse/che/workspace/infrastructure/docker/snapshot/SnapshotException.java @@ -1,3 +1,13 @@ +/******************************************************************************* + * Copyright (c) 2012-2017 Codenvy, S.A. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Codenvy, S.A. - initial API and implementation + *******************************************************************************/ package org.eclipse.che.workspace.infrastructure.docker.snapshot; import org.eclipse.che.api.core.ServerException; diff --git a/wsmaster/che-core-api-workspace-shared/src/main/java/org/eclipse/che/api/workspace/shared/dto/RuntimeIdentityDto.java b/wsmaster/che-core-api-workspace-shared/src/main/java/org/eclipse/che/api/workspace/shared/dto/RuntimeIdentityDto.java index b1e2760528..e4bafea0ce 100644 --- a/wsmaster/che-core-api-workspace-shared/src/main/java/org/eclipse/che/api/workspace/shared/dto/RuntimeIdentityDto.java +++ b/wsmaster/che-core-api-workspace-shared/src/main/java/org/eclipse/che/api/workspace/shared/dto/RuntimeIdentityDto.java @@ -1,3 +1,13 @@ +/******************************************************************************* + * Copyright (c) 2012-2017 Codenvy, S.A. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Codenvy, S.A. - initial API and implementation + *******************************************************************************/ package org.eclipse.che.api.workspace.shared.dto; import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; diff --git a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/MachineStatusJsonRpcMessenger.java b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/MachineStatusJsonRpcMessenger.java index a0658e797a..4000a75ba9 100644 --- a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/MachineStatusJsonRpcMessenger.java +++ b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/MachineStatusJsonRpcMessenger.java @@ -10,94 +10,34 @@ *******************************************************************************/ package org.eclipse.che.api.workspace.server.event; -import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator; -import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; -import org.eclipse.che.api.core.notification.EventService; -import org.eclipse.che.api.core.notification.EventSubscriber; +import org.eclipse.che.api.core.notification.RemoteSubscriptionManager; import org.eclipse.che.api.workspace.shared.dto.event.MachineStatusEvent; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import static com.google.common.collect.Sets.newConcurrentHashSet; - /** * Send workspace events using JSON RPC to the clients */ @Singleton -public class MachineStatusJsonRpcMessenger implements EventSubscriber { - private final RequestTransmitter transmitter; - private final EventService eventService; - - private final Map> endpointIds = new ConcurrentHashMap<>(); +public class MachineStatusJsonRpcMessenger { + private final RemoteSubscriptionManager remoteSubscriptionManager; @Inject - public MachineStatusJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) { - this.transmitter = transmitter; - this.eventService = eventService; - } - - @Override - public void onEvent(MachineStatusEvent event) { - send(event); - } - - public void send(MachineStatusEvent event) { - String id = event.getIdentity().getWorkspaceId(); - endpointIds.entrySet() - .stream() - .filter(it -> it.getValue().contains(id)) - .map(Map.Entry::getKey) - .forEach(it -> transmitter.newRequest() - .endpointId(it) - .methodName("machine/statusChanged") - .paramsAsDto(event) - .sendAndSkipResult()); - } - - @Inject - private void configureSubscribeHandler(RequestHandlerConfigurator configurator) { - - configurator.newConfiguration() - .methodName("machine/subscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - endpointIds.putIfAbsent(endpointId, newConcurrentHashSet()); - endpointIds.get(endpointId).add(workspaceId); - }); - } - - @Inject - private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) { - configurator.newConfiguration() - .methodName("machine/unSubscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - Set workspaceIds = endpointIds.get(endpointId); - if (workspaceIds != null) { - workspaceIds.remove(workspaceId); - - if (workspaceIds.isEmpty()) { - endpointIds.remove(endpointId); - } - } - }); + public MachineStatusJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) { + this.remoteSubscriptionManager = remoteSubscriptionManager; } @PostConstruct - private void subscribe() { - eventService.subscribe(this); + private void postConstruct() { + remoteSubscriptionManager.register("machine/statusChanged", MachineStatusEvent.class, this::predicate); } - @PreDestroy - private void unsubscribe() { - eventService.unsubscribe(this); + private boolean predicate(MachineStatusEvent event, Map scope) { + return event.getIdentity().getWorkspaceId().equals(scope.get("workspaceId")); } } diff --git a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/RuntimeStatusJsonRpcMessenger.java b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/RuntimeStatusJsonRpcMessenger.java index 00056add12..4386c3bb03 100644 --- a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/RuntimeStatusJsonRpcMessenger.java +++ b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/RuntimeStatusJsonRpcMessenger.java @@ -14,6 +14,8 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator; import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; import org.eclipse.che.api.core.notification.EventService; import org.eclipse.che.api.core.notification.EventSubscriber; +import org.eclipse.che.api.core.notification.RemoteSubscriptionManager; +import org.eclipse.che.api.workspace.shared.dto.event.MachineStatusEvent; import org.eclipse.che.api.workspace.shared.dto.event.RuntimeStatusEvent; import javax.annotation.PostConstruct; @@ -30,74 +32,20 @@ import static com.google.common.collect.Sets.newConcurrentHashSet; * Send workspace events using JSON RPC to the clients */ @Singleton -public class RuntimeStatusJsonRpcMessenger implements EventSubscriber { - private final RequestTransmitter transmitter; - private final EventService eventService; - - private final Map> endpointIds = new ConcurrentHashMap<>(); +public class RuntimeStatusJsonRpcMessenger { + private final RemoteSubscriptionManager remoteSubscriptionManager; @Inject - public RuntimeStatusJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) { - this.transmitter = transmitter; - this.eventService = eventService; - } - - @Override - public void onEvent(RuntimeStatusEvent event) { - send(event); - } - - public void send(RuntimeStatusEvent event) { - String id = event.getIdentity().getWorkspaceId(); - endpointIds.entrySet() - .stream() - .filter(it -> it.getValue().contains(id)) - .map(Map.Entry::getKey) - .forEach(it -> transmitter.newRequest() - .endpointId(it) - .methodName("runtime/statusChanged") - .paramsAsDto(event) - .sendAndSkipResult()); - } - - @Inject - private void configureSubscribeHandler(RequestHandlerConfigurator configurator) { - - configurator.newConfiguration() - .methodName("runtime/subscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - endpointIds.putIfAbsent(endpointId, newConcurrentHashSet()); - endpointIds.get(endpointId).add(workspaceId); - }); - } - - @Inject - private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) { - configurator.newConfiguration() - .methodName("runtime/unSubscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - Set workspaceIds = endpointIds.get(endpointId); - if (workspaceIds != null) { - workspaceIds.remove(workspaceId); - - if (workspaceIds.isEmpty()) { - endpointIds.remove(endpointId); - } - } - }); + public RuntimeStatusJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) { + this.remoteSubscriptionManager = remoteSubscriptionManager; } @PostConstruct - private void subscribe() { - eventService.subscribe(this); + private void postConstruct() { + remoteSubscriptionManager.register("runtime/statusChanged", RuntimeStatusEvent.class, this::predicate); } - @PreDestroy - private void unsubscribe() { - eventService.unsubscribe(this); + private boolean predicate(RuntimeStatusEvent event, Map scope) { + return event.getIdentity().getWorkspaceId().equals(scope.get("workspaceId")); } } diff --git a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/ServerStatusJsonRpcMessenger.java b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/ServerStatusJsonRpcMessenger.java index b3096a9af2..b7c2010c4b 100644 --- a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/ServerStatusJsonRpcMessenger.java +++ b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/ServerStatusJsonRpcMessenger.java @@ -14,6 +14,8 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator; import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; import org.eclipse.che.api.core.notification.EventService; import org.eclipse.che.api.core.notification.EventSubscriber; +import org.eclipse.che.api.core.notification.RemoteSubscriptionManager; +import org.eclipse.che.api.workspace.shared.dto.event.RuntimeStatusEvent; import org.eclipse.che.api.workspace.shared.dto.event.ServerStatusEvent; import javax.annotation.PostConstruct; @@ -30,74 +32,20 @@ import static com.google.common.collect.Sets.newConcurrentHashSet; * Send workspace events using JSON RPC to the clients */ @Singleton -public class ServerStatusJsonRpcMessenger implements EventSubscriber { - private final RequestTransmitter transmitter; - private final EventService eventService; - - private final Map> endpointIds = new ConcurrentHashMap<>(); +public class ServerStatusJsonRpcMessenger { + private final RemoteSubscriptionManager remoteSubscriptionManager; @Inject - public ServerStatusJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) { - this.transmitter = transmitter; - this.eventService = eventService; - } - - @Override - public void onEvent(ServerStatusEvent event) { - send(event); - } - - public void send(ServerStatusEvent event) { - String id = event.getIdentity().getWorkspaceId(); - endpointIds.entrySet() - .stream() - .filter(it -> it.getValue().contains(id)) - .map(Map.Entry::getKey) - .forEach(it -> transmitter.newRequest() - .endpointId(it) - .methodName("server/statusChanged") - .paramsAsDto(event) - .sendAndSkipResult()); - } - - @Inject - private void configureSubscribeHandler(RequestHandlerConfigurator configurator) { - - configurator.newConfiguration() - .methodName("server/subscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - endpointIds.putIfAbsent(endpointId, newConcurrentHashSet()); - endpointIds.get(endpointId).add(workspaceId); - }); - } - - @Inject - private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) { - configurator.newConfiguration() - .methodName("server/unSubscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - Set workspaceIds = endpointIds.get(endpointId); - if (workspaceIds != null) { - workspaceIds.remove(workspaceId); - - if (workspaceIds.isEmpty()) { - endpointIds.remove(endpointId); - } - } - }); + public ServerStatusJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) { + this.remoteSubscriptionManager = remoteSubscriptionManager; } @PostConstruct - private void subscribe() { - eventService.subscribe(this); + private void postConstruct() { + remoteSubscriptionManager.register("server/statusChanged", ServerStatusEvent.class, this::predicate); } - @PreDestroy - private void unsubscribe() { - eventService.unsubscribe(this); + private boolean predicate(ServerStatusEvent event, Map scope) { + return event.getIdentity().getWorkspaceId().equals(scope.get("workspaceId")); } } diff --git a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/WorkspaceJsonRpcMessenger.java b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/WorkspaceJsonRpcMessenger.java index 18ffca6cdc..37c7deac96 100644 --- a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/WorkspaceJsonRpcMessenger.java +++ b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/event/WorkspaceJsonRpcMessenger.java @@ -14,6 +14,8 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter; import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator; import org.eclipse.che.api.core.notification.EventService; import org.eclipse.che.api.core.notification.EventSubscriber; +import org.eclipse.che.api.core.notification.RemoteSubscriptionManager; +import org.eclipse.che.api.workspace.shared.dto.event.ServerStatusEvent; import org.eclipse.che.api.workspace.shared.dto.event.WorkspaceStatusEvent; import javax.annotation.PostConstruct; @@ -30,74 +32,20 @@ import static com.google.common.collect.Sets.newConcurrentHashSet; * Send workspace events using JSON RPC to the clients */ @Singleton -public class WorkspaceJsonRpcMessenger implements EventSubscriber { - private final RequestTransmitter transmitter; - private final EventService eventService; - - private final Map> endpointIds = new ConcurrentHashMap<>(); +public class WorkspaceJsonRpcMessenger { + private final RemoteSubscriptionManager remoteSubscriptionManager; @Inject - public WorkspaceJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) { - this.transmitter = transmitter; - this.eventService = eventService; - } - - @Override - public void onEvent(WorkspaceStatusEvent event) { - send(event); - } - - public void send(WorkspaceStatusEvent event) { - String id = event.getWorkspaceId(); - endpointIds.entrySet() - .stream() - .filter(it -> it.getValue().contains(id)) - .map(Map.Entry::getKey) - .forEach(it -> transmitter.newRequest() - .endpointId(it) - .methodName("event:workspace-status:changed") - .paramsAsDto(event) - .sendAndSkipResult()); - } - - @Inject - private void configureSubscribeHandler(RequestHandlerConfigurator configurator) { - - configurator.newConfiguration() - .methodName("event:workspace-status:subscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - endpointIds.putIfAbsent(endpointId, newConcurrentHashSet()); - endpointIds.get(endpointId).add(workspaceId); - }); - } - - @Inject - private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) { - configurator.newConfiguration() - .methodName("event:workspace-status:un-subscribe") - .paramsAsString() - .noResult() - .withBiConsumer((endpointId, workspaceId) -> { - Set workspaceIds = endpointIds.get(endpointId); - if (workspaceIds != null) { - workspaceIds.remove(workspaceId); - - if (workspaceIds.isEmpty()) { - endpointIds.remove(endpointId); - } - } - }); + public WorkspaceJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) { + this.remoteSubscriptionManager = remoteSubscriptionManager; } @PostConstruct - private void subscribe() { - eventService.subscribe(this); + private void postConstruct() { + remoteSubscriptionManager.register("workspace/statusChanged", WorkspaceStatusEvent.class, this::predicate); } - @PreDestroy - private void unsubscribe() { - eventService.unsubscribe(this); + private boolean predicate(WorkspaceStatusEvent event, Map scope) { + return event.getWorkspaceId().equals(scope.get("workspaceId")); } } diff --git a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/spi/InternalRuntime.java b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/spi/InternalRuntime.java index 7aac52eb55..51c9d10084 100644 --- a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/spi/InternalRuntime.java +++ b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/spi/InternalRuntime.java @@ -4,9 +4,9 @@ * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html - *

+ * * Contributors: - * Codenvy, S.A. - initial API and implementation + * Codenvy, S.A. - initial API and implementation *******************************************************************************/ package org.eclipse.che.api.workspace.server.spi;