added subscription manager (#5226)
parent
ad3bc4d739
commit
1379fbf602
|
|
@ -0,0 +1,93 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2012-2017 Codenvy, S.A.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.api.core.notification;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
||||
import org.eclipse.che.api.core.notification.dto.EventSubscription;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiPredicate;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
|
||||
@Singleton
|
||||
public class RemoteSubscriptionManager {
|
||||
private final Map<String, Set<SubscriptionContext>> subscriptionContexts = new HashMap<>();
|
||||
|
||||
private final EventService eventService;
|
||||
private final RequestTransmitter requestTransmitter;
|
||||
|
||||
@Inject
|
||||
public RemoteSubscriptionManager(EventService eventService,
|
||||
RequestTransmitter requestTransmitter) {
|
||||
this.eventService = eventService;
|
||||
this.requestTransmitter = requestTransmitter;
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureSubscription(RequestHandlerConfigurator requestHandlerConfigurator) {
|
||||
requestHandlerConfigurator.newConfiguration()
|
||||
.methodName("subscribe")
|
||||
.paramsAsDto(EventSubscription.class)
|
||||
.noResult()
|
||||
.withBiConsumer(this::consumeSubscriptionRequest);
|
||||
|
||||
requestHandlerConfigurator.newConfiguration()
|
||||
.methodName("unSubscribe")
|
||||
.paramsAsDto(EventSubscription.class)
|
||||
.noResult()
|
||||
.withBiConsumer(this::consumeUnSubscriptionRequest);
|
||||
}
|
||||
|
||||
public <T> void register(String method, Class<T> eventType, BiPredicate<T, Map<String, String>> biPredicate) {
|
||||
eventService.subscribe(event -> subscriptionContexts.get(method)
|
||||
.stream()
|
||||
.filter(context -> biPredicate.test(event, context.scope))
|
||||
.forEach(context -> transmit(context.endpointId, method, event)),
|
||||
eventType);
|
||||
}
|
||||
|
||||
private void consumeSubscriptionRequest(String endpointId, EventSubscription eventSubscription) {
|
||||
subscriptionContexts.computeIfPresent(eventSubscription.getMethod(), (k, v) -> new HashSet<>())
|
||||
.add(new SubscriptionContext(endpointId, eventSubscription.getScope()));
|
||||
}
|
||||
|
||||
private void consumeUnSubscriptionRequest(String endpointId, EventSubscription eventSubscription) {
|
||||
subscriptionContexts.getOrDefault(eventSubscription.getMethod(), emptySet())
|
||||
.removeIf(subscriptionContext -> Objects.equals(subscriptionContext.endpointId, endpointId));
|
||||
}
|
||||
|
||||
private <T> void transmit(String endpointId, String method, T event) {
|
||||
requestTransmitter.newRequest()
|
||||
.endpointId(endpointId)
|
||||
.methodName(method)
|
||||
.paramsAsDto(event)
|
||||
.sendAndSkipResult();
|
||||
}
|
||||
|
||||
private class SubscriptionContext {
|
||||
private final String endpointId;
|
||||
private final Map<String, String> scope;
|
||||
|
||||
private SubscriptionContext(String endpointId, Map<String, String> scope) {
|
||||
this.endpointId = endpointId;
|
||||
this.scope = scope;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2012-2017 Codenvy, S.A.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.api.core.notification.dto;
|
||||
|
||||
import org.eclipse.che.dto.shared.DTO;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@DTO
|
||||
public interface EventSubscription {
|
||||
String getMethod();
|
||||
|
||||
EventSubscription withMethod(String method);
|
||||
|
||||
Map<String, String> getScope();
|
||||
|
||||
EventSubscription withScope(Map<String, String> scope);
|
||||
}
|
||||
|
|
@ -1,3 +1,13 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2012-2017 Codenvy, S.A.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.api.core.model.workspace.runtime;
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2012-2017 Codenvy, S.A.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.ide.api.notification;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
||||
import org.eclipse.che.api.core.notification.dto.EventSubscription;
|
||||
import org.eclipse.che.ide.dto.DtoFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class SubscriptionManagerClient {
|
||||
private final RequestTransmitter requestTransmitter;
|
||||
private final DtoFactory dtoFactory;
|
||||
|
||||
@Inject
|
||||
SubscriptionManagerClient(RequestTransmitter requestTransmitter, DtoFactory dtoFactory) {
|
||||
this.requestTransmitter = requestTransmitter;
|
||||
this.dtoFactory = dtoFactory;
|
||||
}
|
||||
|
||||
public void subscribe(String endpointId, String method, Map<String, String> scope) {
|
||||
requestTransmitter.newRequest()
|
||||
.endpointId(endpointId)
|
||||
.methodName("subscribe")
|
||||
.paramsAsDto(dtoFactory.createDto(EventSubscription.class).withMethod(method).withScope(scope))
|
||||
.sendAndSkipResult();
|
||||
}
|
||||
}
|
||||
|
|
@ -24,4 +24,5 @@
|
|||
<source path='jsonrpc/json'/>
|
||||
<source path='jsonrpc/commons'/>
|
||||
<source path='logger/commons'/>
|
||||
<source path='notification/dto'/>
|
||||
</module>
|
||||
|
|
|
|||
|
|
@ -17,18 +17,18 @@ import com.google.inject.Singleton;
|
|||
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
||||
import org.eclipse.che.api.core.model.workspace.Workspace;
|
||||
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
|
||||
import org.eclipse.che.api.promises.client.Operation;
|
||||
import org.eclipse.che.api.promises.client.PromiseError;
|
||||
import org.eclipse.che.api.workspace.shared.dto.WorkspaceDto;
|
||||
import org.eclipse.che.ide.CoreLocalizationConstant;
|
||||
import org.eclipse.che.ide.api.app.AppContext;
|
||||
import org.eclipse.che.ide.api.notification.NotificationManager;
|
||||
import org.eclipse.che.ide.api.notification.SubscriptionManagerClient;
|
||||
import org.eclipse.che.ide.context.BrowserAddress;
|
||||
import org.eclipse.che.ide.ui.loaders.LoaderPresenter;
|
||||
import org.eclipse.che.ide.util.loging.Log;
|
||||
import org.eclipse.che.ide.workspace.WorkspaceServiceClient;
|
||||
import org.eclipse.che.ide.workspace.WorkspaceStatusHandler;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.eclipse.che.api.core.model.workspace.WorkspaceStatus.RUNNING;
|
||||
import static org.eclipse.che.api.core.model.workspace.WorkspaceStatus.STOPPED;
|
||||
import static org.eclipse.che.api.core.model.workspace.WorkspaceStatus.STOPPING;
|
||||
|
|
@ -55,6 +55,7 @@ public class CurrentWorkspaceManager {
|
|||
private final CoreLocalizationConstant messages;
|
||||
private final WorkspaceStatusHandler wsStatusHandler;
|
||||
private final AppContext appContext;
|
||||
private final SubscriptionManagerClient subscriptionManagerClient;
|
||||
|
||||
@Inject
|
||||
CurrentWorkspaceManager(WorkspaceServiceClient workspaceServiceClient,
|
||||
|
|
@ -65,7 +66,8 @@ public class CurrentWorkspaceManager {
|
|||
IdeInitializer ideInitializer,
|
||||
CoreLocalizationConstant messages,
|
||||
WorkspaceStatusHandler wsStatusHandler,
|
||||
AppContext appContext) {
|
||||
AppContext appContext,
|
||||
SubscriptionManagerClient subscriptionManagerClient) {
|
||||
this.workspaceServiceClient = workspaceServiceClient;
|
||||
this.browserAddress = browserAddress;
|
||||
this.transmitter = transmitter;
|
||||
|
|
@ -75,6 +77,7 @@ public class CurrentWorkspaceManager {
|
|||
this.messages = messages;
|
||||
this.wsStatusHandler = wsStatusHandler;
|
||||
this.appContext = appContext;
|
||||
this.subscriptionManagerClient = subscriptionManagerClient;
|
||||
}
|
||||
|
||||
// TODO: handle errors while workspace starting (show message dialog)
|
||||
|
|
@ -93,19 +96,22 @@ public class CurrentWorkspaceManager {
|
|||
}
|
||||
|
||||
private void subscribeToEvents(String workspaceId) {
|
||||
subscribe(WS_STATUS_ERROR_MSG, "event:workspace-status:subscribe", workspaceId);
|
||||
workspaceServiceClient.getWorkspace(browserAddress.getWorkspaceKey())
|
||||
.then(skip -> {
|
||||
String endpointId = "ws-master";
|
||||
String method = "workspace/statusChanged";
|
||||
Map<String, String> scope = singletonMap("workspaceId", workspaceId);
|
||||
subscriptionManagerClient.subscribe(endpointId, method, scope);
|
||||
});
|
||||
|
||||
|
||||
// subscribe(WS_STATUS_ERROR_MSG, "event:workspace-status:subscribe", workspaceId);
|
||||
// subscribe(WS_AGENT_OUTPUT_ERROR_MSG, "event:ws-agent-output:subscribe", workspaceId);
|
||||
// subscribe(ENV_STATUS_ERROR_MSG, "event:environment-status:subscribe", workspaceId);
|
||||
}
|
||||
|
||||
private void subscribe(String it, String methodName, String id) {
|
||||
workspaceServiceClient.getWorkspace(browserAddress.getWorkspaceKey())
|
||||
.then((Operation<WorkspaceDto>)skip -> transmitter.newRequest()
|
||||
.endpointId("ws-master")
|
||||
.methodName(methodName)
|
||||
.paramsAsString(id)
|
||||
.sendAndSkipResult())
|
||||
.catchError((Operation<PromiseError>)error -> Log.error(getClass(), it + ": " + error.getMessage()));
|
||||
|
||||
}
|
||||
|
||||
/** Starts the workspace with the default environment. */
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class EnvironmentStatusEventHandler {
|
|||
@Inject
|
||||
void configureEnvironmentStatusHandler(RequestHandlerConfigurator configurator, Provider<EnvironmentStatusHandler> handlerProvider) {
|
||||
configurator.newConfiguration()
|
||||
.methodName("event:environment-status:changed")
|
||||
.methodName("machine/statusChanged")
|
||||
.paramsAsDto(MachineStatusEvent.class)
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, event) -> {
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class WorkspaceStatusEventHandler {
|
|||
@Inject
|
||||
WorkspaceStatusEventHandler(RequestHandlerConfigurator configurator, Provider<WorkspaceStatusHandler> handlerProvider) {
|
||||
configurator.newConfiguration()
|
||||
.methodName("event:workspace-status:changed")
|
||||
.methodName("workspace/statusChanged")
|
||||
.paramsAsDto(WorkspaceStatusEvent.class)
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, event) -> {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,13 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2012-2017 Codenvy, S.A.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.workspace.infrastructure.docker.snapshot;
|
||||
|
||||
import org.eclipse.che.api.core.ServerException;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,13 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2012-2017 Codenvy, S.A.
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.api.workspace.shared.dto;
|
||||
|
||||
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
|
||||
|
|
|
|||
|
|
@ -10,94 +10,34 @@
|
|||
*******************************************************************************/
|
||||
package org.eclipse.che.api.workspace.server.event;
|
||||
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
||||
import org.eclipse.che.api.core.notification.EventService;
|
||||
import org.eclipse.che.api.core.notification.EventSubscriber;
|
||||
import org.eclipse.che.api.core.notification.RemoteSubscriptionManager;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.MachineStatusEvent;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.google.common.collect.Sets.newConcurrentHashSet;
|
||||
|
||||
/**
|
||||
* Send workspace events using JSON RPC to the clients
|
||||
*/
|
||||
@Singleton
|
||||
public class MachineStatusJsonRpcMessenger implements EventSubscriber<MachineStatusEvent> {
|
||||
private final RequestTransmitter transmitter;
|
||||
private final EventService eventService;
|
||||
|
||||
private final Map<String, Set<String>> endpointIds = new ConcurrentHashMap<>();
|
||||
public class MachineStatusJsonRpcMessenger {
|
||||
private final RemoteSubscriptionManager remoteSubscriptionManager;
|
||||
|
||||
@Inject
|
||||
public MachineStatusJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) {
|
||||
this.transmitter = transmitter;
|
||||
this.eventService = eventService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(MachineStatusEvent event) {
|
||||
send(event);
|
||||
}
|
||||
|
||||
public void send(MachineStatusEvent event) {
|
||||
String id = event.getIdentity().getWorkspaceId();
|
||||
endpointIds.entrySet()
|
||||
.stream()
|
||||
.filter(it -> it.getValue().contains(id))
|
||||
.map(Map.Entry::getKey)
|
||||
.forEach(it -> transmitter.newRequest()
|
||||
.endpointId(it)
|
||||
.methodName("machine/statusChanged")
|
||||
.paramsAsDto(event)
|
||||
.sendAndSkipResult());
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
|
||||
configurator.newConfiguration()
|
||||
.methodName("machine/subscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
endpointIds.putIfAbsent(endpointId, newConcurrentHashSet());
|
||||
endpointIds.get(endpointId).add(workspaceId);
|
||||
});
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
configurator.newConfiguration()
|
||||
.methodName("machine/unSubscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
Set<String> workspaceIds = endpointIds.get(endpointId);
|
||||
if (workspaceIds != null) {
|
||||
workspaceIds.remove(workspaceId);
|
||||
|
||||
if (workspaceIds.isEmpty()) {
|
||||
endpointIds.remove(endpointId);
|
||||
}
|
||||
}
|
||||
});
|
||||
public MachineStatusJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) {
|
||||
this.remoteSubscriptionManager = remoteSubscriptionManager;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void subscribe() {
|
||||
eventService.subscribe(this);
|
||||
private void postConstruct() {
|
||||
remoteSubscriptionManager.register("machine/statusChanged", MachineStatusEvent.class, this::predicate);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void unsubscribe() {
|
||||
eventService.unsubscribe(this);
|
||||
private boolean predicate(MachineStatusEvent event, Map<String, String> scope) {
|
||||
return event.getIdentity().getWorkspaceId().equals(scope.get("workspaceId"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
|
|||
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
||||
import org.eclipse.che.api.core.notification.EventService;
|
||||
import org.eclipse.che.api.core.notification.EventSubscriber;
|
||||
import org.eclipse.che.api.core.notification.RemoteSubscriptionManager;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.MachineStatusEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.RuntimeStatusEvent;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
|
@ -30,74 +32,20 @@ import static com.google.common.collect.Sets.newConcurrentHashSet;
|
|||
* Send workspace events using JSON RPC to the clients
|
||||
*/
|
||||
@Singleton
|
||||
public class RuntimeStatusJsonRpcMessenger implements EventSubscriber<RuntimeStatusEvent> {
|
||||
private final RequestTransmitter transmitter;
|
||||
private final EventService eventService;
|
||||
|
||||
private final Map<String, Set<String>> endpointIds = new ConcurrentHashMap<>();
|
||||
public class RuntimeStatusJsonRpcMessenger {
|
||||
private final RemoteSubscriptionManager remoteSubscriptionManager;
|
||||
|
||||
@Inject
|
||||
public RuntimeStatusJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) {
|
||||
this.transmitter = transmitter;
|
||||
this.eventService = eventService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(RuntimeStatusEvent event) {
|
||||
send(event);
|
||||
}
|
||||
|
||||
public void send(RuntimeStatusEvent event) {
|
||||
String id = event.getIdentity().getWorkspaceId();
|
||||
endpointIds.entrySet()
|
||||
.stream()
|
||||
.filter(it -> it.getValue().contains(id))
|
||||
.map(Map.Entry::getKey)
|
||||
.forEach(it -> transmitter.newRequest()
|
||||
.endpointId(it)
|
||||
.methodName("runtime/statusChanged")
|
||||
.paramsAsDto(event)
|
||||
.sendAndSkipResult());
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
|
||||
configurator.newConfiguration()
|
||||
.methodName("runtime/subscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
endpointIds.putIfAbsent(endpointId, newConcurrentHashSet());
|
||||
endpointIds.get(endpointId).add(workspaceId);
|
||||
});
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
configurator.newConfiguration()
|
||||
.methodName("runtime/unSubscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
Set<String> workspaceIds = endpointIds.get(endpointId);
|
||||
if (workspaceIds != null) {
|
||||
workspaceIds.remove(workspaceId);
|
||||
|
||||
if (workspaceIds.isEmpty()) {
|
||||
endpointIds.remove(endpointId);
|
||||
}
|
||||
}
|
||||
});
|
||||
public RuntimeStatusJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) {
|
||||
this.remoteSubscriptionManager = remoteSubscriptionManager;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void subscribe() {
|
||||
eventService.subscribe(this);
|
||||
private void postConstruct() {
|
||||
remoteSubscriptionManager.register("runtime/statusChanged", RuntimeStatusEvent.class, this::predicate);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void unsubscribe() {
|
||||
eventService.unsubscribe(this);
|
||||
private boolean predicate(RuntimeStatusEvent event, Map<String, String> scope) {
|
||||
return event.getIdentity().getWorkspaceId().equals(scope.get("workspaceId"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
|
|||
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
||||
import org.eclipse.che.api.core.notification.EventService;
|
||||
import org.eclipse.che.api.core.notification.EventSubscriber;
|
||||
import org.eclipse.che.api.core.notification.RemoteSubscriptionManager;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.RuntimeStatusEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.ServerStatusEvent;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
|
@ -30,74 +32,20 @@ import static com.google.common.collect.Sets.newConcurrentHashSet;
|
|||
* Send workspace events using JSON RPC to the clients
|
||||
*/
|
||||
@Singleton
|
||||
public class ServerStatusJsonRpcMessenger implements EventSubscriber<ServerStatusEvent> {
|
||||
private final RequestTransmitter transmitter;
|
||||
private final EventService eventService;
|
||||
|
||||
private final Map<String, Set<String>> endpointIds = new ConcurrentHashMap<>();
|
||||
public class ServerStatusJsonRpcMessenger {
|
||||
private final RemoteSubscriptionManager remoteSubscriptionManager;
|
||||
|
||||
@Inject
|
||||
public ServerStatusJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) {
|
||||
this.transmitter = transmitter;
|
||||
this.eventService = eventService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(ServerStatusEvent event) {
|
||||
send(event);
|
||||
}
|
||||
|
||||
public void send(ServerStatusEvent event) {
|
||||
String id = event.getIdentity().getWorkspaceId();
|
||||
endpointIds.entrySet()
|
||||
.stream()
|
||||
.filter(it -> it.getValue().contains(id))
|
||||
.map(Map.Entry::getKey)
|
||||
.forEach(it -> transmitter.newRequest()
|
||||
.endpointId(it)
|
||||
.methodName("server/statusChanged")
|
||||
.paramsAsDto(event)
|
||||
.sendAndSkipResult());
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
|
||||
configurator.newConfiguration()
|
||||
.methodName("server/subscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
endpointIds.putIfAbsent(endpointId, newConcurrentHashSet());
|
||||
endpointIds.get(endpointId).add(workspaceId);
|
||||
});
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
configurator.newConfiguration()
|
||||
.methodName("server/unSubscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
Set<String> workspaceIds = endpointIds.get(endpointId);
|
||||
if (workspaceIds != null) {
|
||||
workspaceIds.remove(workspaceId);
|
||||
|
||||
if (workspaceIds.isEmpty()) {
|
||||
endpointIds.remove(endpointId);
|
||||
}
|
||||
}
|
||||
});
|
||||
public ServerStatusJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) {
|
||||
this.remoteSubscriptionManager = remoteSubscriptionManager;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void subscribe() {
|
||||
eventService.subscribe(this);
|
||||
private void postConstruct() {
|
||||
remoteSubscriptionManager.register("server/statusChanged", ServerStatusEvent.class, this::predicate);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void unsubscribe() {
|
||||
eventService.unsubscribe(this);
|
||||
private boolean predicate(ServerStatusEvent event, Map<String, String> scope) {
|
||||
return event.getIdentity().getWorkspaceId().equals(scope.get("workspaceId"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
|
|||
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
|
||||
import org.eclipse.che.api.core.notification.EventService;
|
||||
import org.eclipse.che.api.core.notification.EventSubscriber;
|
||||
import org.eclipse.che.api.core.notification.RemoteSubscriptionManager;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.ServerStatusEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.event.WorkspaceStatusEvent;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
|
@ -30,74 +32,20 @@ import static com.google.common.collect.Sets.newConcurrentHashSet;
|
|||
* Send workspace events using JSON RPC to the clients
|
||||
*/
|
||||
@Singleton
|
||||
public class WorkspaceJsonRpcMessenger implements EventSubscriber<WorkspaceStatusEvent> {
|
||||
private final RequestTransmitter transmitter;
|
||||
private final EventService eventService;
|
||||
|
||||
private final Map<String, Set<String>> endpointIds = new ConcurrentHashMap<>();
|
||||
public class WorkspaceJsonRpcMessenger {
|
||||
private final RemoteSubscriptionManager remoteSubscriptionManager;
|
||||
|
||||
@Inject
|
||||
public WorkspaceJsonRpcMessenger(RequestTransmitter transmitter, EventService eventService) {
|
||||
this.transmitter = transmitter;
|
||||
this.eventService = eventService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(WorkspaceStatusEvent event) {
|
||||
send(event);
|
||||
}
|
||||
|
||||
public void send(WorkspaceStatusEvent event) {
|
||||
String id = event.getWorkspaceId();
|
||||
endpointIds.entrySet()
|
||||
.stream()
|
||||
.filter(it -> it.getValue().contains(id))
|
||||
.map(Map.Entry::getKey)
|
||||
.forEach(it -> transmitter.newRequest()
|
||||
.endpointId(it)
|
||||
.methodName("event:workspace-status:changed")
|
||||
.paramsAsDto(event)
|
||||
.sendAndSkipResult());
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
|
||||
configurator.newConfiguration()
|
||||
.methodName("event:workspace-status:subscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
endpointIds.putIfAbsent(endpointId, newConcurrentHashSet());
|
||||
endpointIds.get(endpointId).add(workspaceId);
|
||||
});
|
||||
}
|
||||
|
||||
@Inject
|
||||
private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) {
|
||||
configurator.newConfiguration()
|
||||
.methodName("event:workspace-status:un-subscribe")
|
||||
.paramsAsString()
|
||||
.noResult()
|
||||
.withBiConsumer((endpointId, workspaceId) -> {
|
||||
Set<String> workspaceIds = endpointIds.get(endpointId);
|
||||
if (workspaceIds != null) {
|
||||
workspaceIds.remove(workspaceId);
|
||||
|
||||
if (workspaceIds.isEmpty()) {
|
||||
endpointIds.remove(endpointId);
|
||||
}
|
||||
}
|
||||
});
|
||||
public WorkspaceJsonRpcMessenger(RemoteSubscriptionManager remoteSubscriptionManager) {
|
||||
this.remoteSubscriptionManager = remoteSubscriptionManager;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void subscribe() {
|
||||
eventService.subscribe(this);
|
||||
private void postConstruct() {
|
||||
remoteSubscriptionManager.register("workspace/statusChanged", WorkspaceStatusEvent.class, this::predicate);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void unsubscribe() {
|
||||
eventService.unsubscribe(this);
|
||||
private boolean predicate(WorkspaceStatusEvent event, Map<String, String> scope) {
|
||||
return event.getWorkspaceId().equals(scope.get("workspaceId"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@
|
|||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* <p>
|
||||
*
|
||||
* Contributors:
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
* Codenvy, S.A. - initial API and implementation
|
||||
*******************************************************************************/
|
||||
package org.eclipse.che.api.workspace.server.spi;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue