Make BrokerEvent suitable to be used for STARTED status event
parent
b5b187e182
commit
d9c210bd2a
|
|
@ -54,6 +54,7 @@ public class DeployBroker extends BrokerPhase {
|
|||
for (ConfigMap configMap : brokerEnvironment.getConfigMaps().values()) {
|
||||
namespace.configMaps().create(configMap);
|
||||
}
|
||||
|
||||
for (Pod toCreate : brokerEnvironment.getPods().values()) {
|
||||
deployments.deploy(toCreate);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,11 +18,11 @@ import org.eclipse.che.api.core.notification.EventService;
|
|||
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
|
||||
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
|
||||
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events.BrokerEvent;
|
||||
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events.BrokerResultListener;
|
||||
import org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events.BrokerStatusListener;
|
||||
|
||||
/**
|
||||
* Subscribes to Che plugin broker events, passes future that should be completed upon broker result
|
||||
* received to {@link BrokerResultListener} and calls next {@link BrokerPhase}.
|
||||
* received to {@link BrokerStatusListener} and calls next {@link BrokerPhase}.
|
||||
*
|
||||
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
|
||||
*
|
||||
|
|
@ -45,14 +45,14 @@ public class ListenBrokerEvents extends BrokerPhase {
|
|||
}
|
||||
|
||||
public List<ChePlugin> execute() throws InfrastructureException {
|
||||
BrokerResultListener brokerResultListener =
|
||||
new BrokerResultListener(workspaceId, toolingFuture);
|
||||
BrokerStatusListener brokerStatusListener =
|
||||
new BrokerStatusListener(workspaceId, toolingFuture);
|
||||
try {
|
||||
eventService.subscribe(brokerResultListener, BrokerEvent.class);
|
||||
eventService.subscribe(brokerStatusListener, BrokerEvent.class);
|
||||
|
||||
return nextPhase.execute();
|
||||
} finally {
|
||||
eventService.unsubscribe(brokerResultListener);
|
||||
eventService.unsubscribe(brokerStatusListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,18 +14,18 @@ package org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events;
|
|||
import com.google.common.annotations.Beta;
|
||||
import java.util.List;
|
||||
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerStatus;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
|
||||
|
||||
/**
|
||||
* Event sent by a plugin broker with results of broker invocation.
|
||||
*
|
||||
* <p>This class differs from {@link BrokerResultEvent} it is version of latter with a prettier
|
||||
* format. It has workspace tooling in a POJO representation instead of stringified JSON.
|
||||
* <p>This class differs from {@link BrokerStatusChangedEvent} it is version of latter with a
|
||||
* prettier format. It has workspace tooling in a POJO representation instead of stringified JSON.
|
||||
*
|
||||
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
|
||||
*
|
||||
* @see BrokerResultEvent
|
||||
* @see BrokerStatusChangedEvent
|
||||
*/
|
||||
@Beta
|
||||
public class BrokerEvent {
|
||||
|
|
@ -37,7 +37,7 @@ public class BrokerEvent {
|
|||
@SuppressWarnings("unused")
|
||||
public BrokerEvent() {}
|
||||
|
||||
public BrokerEvent(BrokerResultEvent resultEvent, List<ChePlugin> tooling) {
|
||||
public BrokerEvent(BrokerStatusChangedEvent resultEvent, List<ChePlugin> tooling) {
|
||||
this.error = resultEvent.getError();
|
||||
this.status = resultEvent.getStatus();
|
||||
this.workspaceId = resultEvent.getWorkspaceId();
|
||||
|
|
|
|||
|
|
@ -24,13 +24,13 @@ import javax.inject.Singleton;
|
|||
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
|
||||
import org.eclipse.che.api.core.notification.EventService;
|
||||
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
|
||||
import org.eclipse.che.commons.annotation.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
/**
|
||||
* Configure JSON_RPC consumers of Che plugin broker events. Also converts {@link BrokerResultEvent}
|
||||
* to {@link BrokerEvent}.
|
||||
* Configure JSON_RPC consumers of Che plugin broker events. Also converts {@link
|
||||
* BrokerStatusChangedEvent} to {@link BrokerEvent}.
|
||||
*
|
||||
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
|
||||
*
|
||||
|
|
@ -58,24 +58,22 @@ public class BrokerService {
|
|||
requestHandler
|
||||
.newConfiguration()
|
||||
.methodName(BROKER_STATUS_CHANGED_METHOD)
|
||||
.paramsAsDto(BrokerResultEvent.class)
|
||||
.paramsAsDto(BrokerStatusChangedEvent.class)
|
||||
.noResult()
|
||||
.withConsumer(this::handle);
|
||||
|
||||
requestHandler
|
||||
.newConfiguration()
|
||||
.methodName(BROKER_RESULT_METHOD)
|
||||
.paramsAsDto(BrokerResultEvent.class)
|
||||
.paramsAsDto(BrokerStatusChangedEvent.class)
|
||||
.noResult()
|
||||
.withConsumer(this::handle);
|
||||
}
|
||||
|
||||
private void handle(BrokerResultEvent event) {
|
||||
private void handle(BrokerStatusChangedEvent event) {
|
||||
// Tooling has fields that can't be parsed by DTO and JSON_RPC framework works with DTO only
|
||||
String encodedTooling = event.getTooling();
|
||||
if (event.getStatus() == null
|
||||
|| event.getWorkspaceId() == null
|
||||
|| (event.getError() == null && event.getTooling() == null)) {
|
||||
if (event.getStatus() == null || event.getWorkspaceId() == null) {
|
||||
LOG.error("Broker event skipped due to illegal content: {}", event);
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,28 +11,31 @@
|
|||
*/
|
||||
package org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events;
|
||||
|
||||
import static java.lang.String.format;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.eclipse.che.api.core.notification.EventSubscriber;
|
||||
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
|
||||
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
|
||||
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
|
||||
|
||||
/**
|
||||
* Listens for {@link BrokerEvent} and completes or exceptionally completes a future depending on
|
||||
* the event state.
|
||||
* Listens for {@link BrokerEvent} and completes or exceptionally completes a start and done futures
|
||||
* depending on the event state.
|
||||
*
|
||||
* <p>This API is in <b>Beta</b> and is subject to changes or removal.
|
||||
*
|
||||
* @author Oleksandr Garagatyi
|
||||
*/
|
||||
@Beta
|
||||
public class BrokerResultListener implements EventSubscriber<BrokerEvent> {
|
||||
public class BrokerStatusListener implements EventSubscriber<BrokerEvent> {
|
||||
|
||||
private final String workspaceId;
|
||||
private final CompletableFuture<List<ChePlugin>> finishFuture;
|
||||
|
||||
public BrokerResultListener(String workspaceId, CompletableFuture<List<ChePlugin>> finishFuture) {
|
||||
public BrokerStatusListener(String workspaceId, CompletableFuture<List<ChePlugin>> finishFuture) {
|
||||
this.workspaceId = workspaceId;
|
||||
this.finishFuture = finishFuture;
|
||||
}
|
||||
|
|
@ -45,12 +48,25 @@ public class BrokerResultListener implements EventSubscriber<BrokerEvent> {
|
|||
|
||||
switch (event.getStatus()) {
|
||||
case DONE:
|
||||
finishFuture.complete(event.getTooling());
|
||||
List<ChePlugin> tooling = event.getTooling();
|
||||
if (tooling != null) {
|
||||
finishFuture.complete(tooling);
|
||||
} else {
|
||||
finishFuture.completeExceptionally(
|
||||
new InternalInfrastructureException(
|
||||
format(
|
||||
"Plugin brokering process for workspace `%s` is finished but plugins list is missing",
|
||||
workspaceId)));
|
||||
}
|
||||
break;
|
||||
case FAILED:
|
||||
finishFuture.completeExceptionally(
|
||||
new InfrastructureException("Broker process failed with error: " + event.getError()));
|
||||
new InfrastructureException(
|
||||
format(
|
||||
"Plugin broking process for workspace %s failed with error: %s",
|
||||
workspaceId, event.getError())));
|
||||
break;
|
||||
case STARTED:
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright (c) 2012-2018 Red Hat, Inc.
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*
|
||||
* Contributors:
|
||||
* Red Hat, Inc. - initial API and implementation
|
||||
*/
|
||||
package org.eclipse.che.workspace.infrastructure.kubernetes.wsplugins.events;
|
||||
|
||||
import static java.util.Collections.*;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
|
||||
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
|
||||
import org.eclipse.che.api.workspace.server.wsplugins.model.ChePlugin;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerStatus;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.testng.MockitoTestNGListener;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Listeners;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
/**
|
||||
* Tests {@link BrokerStatusListener}.
|
||||
*
|
||||
* @author Sergii Leshchenko
|
||||
*/
|
||||
@Listeners(MockitoTestNGListener.class)
|
||||
public class BrokerStatusListenerTest {
|
||||
|
||||
public static final String WORKSPACE_ID = "workspace123";
|
||||
@Mock private CompletableFuture<List<ChePlugin>> finishFuture;
|
||||
|
||||
private BrokerStatusListener brokerStatusListener;
|
||||
|
||||
@BeforeMethod
|
||||
public void setUp() {
|
||||
brokerStatusListener = new BrokerStatusListener(WORKSPACE_ID, finishFuture);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldDoNothingIfEventWithForeignWorkspaceIdIsReceived() {
|
||||
// given
|
||||
BrokerEvent event = new BrokerEvent().withWorkspaceId("foreignWorkspace");
|
||||
|
||||
// when
|
||||
brokerStatusListener.onEvent(event);
|
||||
|
||||
// then
|
||||
verifyNoMoreInteractions(finishFuture);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldDoNothingWhenStartedEventIsReceived() {
|
||||
// given
|
||||
BrokerEvent event =
|
||||
new BrokerEvent().withWorkspaceId(WORKSPACE_ID).withStatus(BrokerStatus.STARTED);
|
||||
|
||||
// when
|
||||
brokerStatusListener.onEvent(event);
|
||||
|
||||
// then
|
||||
verifyNoMoreInteractions(finishFuture);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCompleteFinishFutureWhenDoneEventIsReceivedAndToolingIsNotNull() {
|
||||
// given
|
||||
BrokerEvent event =
|
||||
new BrokerEvent()
|
||||
.withWorkspaceId(WORKSPACE_ID)
|
||||
.withStatus(BrokerStatus.DONE)
|
||||
.withTooling(emptyList());
|
||||
|
||||
// when
|
||||
brokerStatusListener.onEvent(event);
|
||||
|
||||
// then
|
||||
verify(finishFuture).complete(emptyList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCompleteExceptionallyFinishFutureWhenDoneEventIsReceivedButToolingIsNull() {
|
||||
// given
|
||||
BrokerEvent event =
|
||||
new BrokerEvent()
|
||||
.withWorkspaceId(WORKSPACE_ID)
|
||||
.withStatus(BrokerStatus.DONE)
|
||||
.withTooling(null);
|
||||
|
||||
// when
|
||||
brokerStatusListener.onEvent(event);
|
||||
|
||||
// then
|
||||
verify(finishFuture).completeExceptionally(any(InternalInfrastructureException.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCompleteExceptionallyFinishFutureWhenFailedEventIsReceived() {
|
||||
// given
|
||||
BrokerEvent event =
|
||||
new BrokerEvent()
|
||||
.withWorkspaceId(WORKSPACE_ID)
|
||||
.withStatus(BrokerStatus.FAILED)
|
||||
.withError("error");
|
||||
|
||||
// when
|
||||
brokerStatusListener.onEvent(event);
|
||||
|
||||
// then
|
||||
verify(finishFuture).completeExceptionally(any(InfrastructureException.class));
|
||||
}
|
||||
}
|
||||
|
|
@ -18,7 +18,7 @@ import javax.inject.Inject;
|
|||
import javax.inject.Singleton;
|
||||
import org.eclipse.che.api.core.ForbiddenException;
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerManager;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
|
||||
import org.eclipse.che.commons.env.EnvironmentContext;
|
||||
import org.eclipse.che.commons.subject.Subject;
|
||||
import org.eclipse.che.multiuser.api.permission.server.jsonrpc.JsonRpcPermissionsFilterAdapter;
|
||||
|
|
@ -44,7 +44,7 @@ public class BrokerServicePermissionFilter extends JsonRpcPermissionsFilterAdapt
|
|||
switch (method) {
|
||||
case BROKER_STATUS_CHANGED_METHOD:
|
||||
case BROKER_RESULT_METHOD:
|
||||
workspaceId = ((BrokerResultEvent) params[0]).getWorkspaceId();
|
||||
workspaceId = ((BrokerStatusChangedEvent) params[0]).getWorkspaceId();
|
||||
break;
|
||||
default:
|
||||
throw new ForbiddenException("Unknown method is configured to be filtered.");
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import org.eclipse.che.api.core.ForbiddenException;
|
||||
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerManager;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerResultEvent;
|
||||
import org.eclipse.che.api.workspace.shared.dto.BrokerStatusChangedEvent;
|
||||
import org.eclipse.che.commons.env.EnvironmentContext;
|
||||
import org.eclipse.che.commons.subject.Subject;
|
||||
import org.eclipse.che.dto.server.DtoFactory;
|
||||
|
|
@ -78,7 +78,7 @@ public class BrokerServicePermissionFilterTest {
|
|||
|
||||
// when
|
||||
permissionFilter.doAccept(
|
||||
method, DtoFactory.newDto(BrokerResultEvent.class).withWorkspaceId("ws123"));
|
||||
method, DtoFactory.newDto(BrokerStatusChangedEvent.class).withWorkspaceId("ws123"));
|
||||
}
|
||||
|
||||
@Test(dataProvider = "coveredMethods")
|
||||
|
|
@ -89,7 +89,7 @@ public class BrokerServicePermissionFilterTest {
|
|||
|
||||
// when
|
||||
permissionFilter.doAccept(
|
||||
method, DtoFactory.newDto(BrokerResultEvent.class).withWorkspaceId("ws123"));
|
||||
method, DtoFactory.newDto(BrokerStatusChangedEvent.class).withWorkspaceId("ws123"));
|
||||
}
|
||||
|
||||
@Test(
|
||||
|
|
@ -98,7 +98,7 @@ public class BrokerServicePermissionFilterTest {
|
|||
public void shouldThrowExceptionIfUnknownMethodIsInvoking() throws Exception {
|
||||
// when
|
||||
permissionFilter.doAccept(
|
||||
"unknown", DtoFactory.newDto(BrokerResultEvent.class).withWorkspaceId("ws123"));
|
||||
"unknown", DtoFactory.newDto(BrokerStatusChangedEvent.class).withWorkspaceId("ws123"));
|
||||
}
|
||||
|
||||
@DataProvider
|
||||
|
|
|
|||
|
|
@ -22,26 +22,27 @@ import org.eclipse.che.dto.shared.DTO;
|
|||
* @author Oleksandr Garagatyi
|
||||
*/
|
||||
@DTO
|
||||
public interface BrokerResultEvent {
|
||||
public interface BrokerStatusChangedEvent {
|
||||
|
||||
/** Status of execution of a broker process. */
|
||||
BrokerStatus getStatus();
|
||||
|
||||
BrokerResultEvent withStatus(BrokerStatus status);
|
||||
BrokerStatusChangedEvent withStatus(BrokerStatus status);
|
||||
|
||||
/** ID of a workspace this event is related to. */
|
||||
String getWorkspaceId();
|
||||
|
||||
BrokerResultEvent withWorkspaceId(String workspaceId);
|
||||
BrokerStatusChangedEvent withWorkspaceId(String workspaceId);
|
||||
|
||||
/**
|
||||
* Error message that explains the reason of the broker process failure.
|
||||
*
|
||||
* <p>When this method returns non-null value method {@link #getTooling()} must return null.
|
||||
* <p>This method must return non-null value if {@link #getStatus() status} is {@link
|
||||
* BrokerStatus#FAILED}.
|
||||
*/
|
||||
String getError();
|
||||
|
||||
BrokerResultEvent withError(String error);
|
||||
BrokerStatusChangedEvent withError(String error);
|
||||
|
||||
/**
|
||||
* Stringified workspace tooling in JSON format.
|
||||
|
|
@ -50,9 +51,10 @@ public interface BrokerResultEvent {
|
|||
* framework (dashes in field name, field name not matching POJO getter), so we have to stringify
|
||||
* it to pass over Che JSON_RPC framework.
|
||||
*
|
||||
* <p>When this method returns non-null value method {@link #getError()} must return null.
|
||||
* <p>This method must return non-null value if {@link #getStatus() status} is {@link
|
||||
* BrokerStatus#DONE}.
|
||||
*/
|
||||
String getTooling();
|
||||
|
||||
BrokerResultEvent withTooling(String tooling);
|
||||
BrokerStatusChangedEvent withTooling(String tooling);
|
||||
}
|
||||
Loading…
Reference in New Issue