CHE-9528 Refactor terminations mechanism to support suspend and dependencies

6.19.x
Max Shaposhnik 2018-04-23 17:32:09 +03:00 committed by Sergii Leshchenko
parent d227e297b6
commit 67d27e9eff
22 changed files with 769 additions and 50 deletions

View File

@ -86,6 +86,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-api-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-api-system</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-api-workspace</artifactId>

View File

@ -22,7 +22,6 @@ import io.fabric8.kubernetes.client.utils.Utils;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
@ -192,7 +191,11 @@ public class KubernetesClientFactory {
};
}
protected void doCleanup() {
/**
* Shuts down the {@link KubernetesClient} by closing it's connection pool. Typically should be
* called on application tear down.
*/
public void shutdownClient() {
ConnectionPool connectionPool = httpClient.connectionPool();
Dispatcher dispatcher = httpClient.dispatcher();
ExecutorService executorService =
@ -226,15 +229,6 @@ public class KubernetesClientFactory {
return new UnclosableKubernetesClient(clientHttpClient, config);
}
@PreDestroy
private void cleanup() {
try {
doCleanup();
} catch (RuntimeException ex) {
LOG.error(ex.getMessage());
}
}
/**
* Decorates the {@link DefaultKubernetesClient} so that it can not be closed from the outside.
*/

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.workspace.infrastructure.kubernetes;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import javax.inject.Inject;
import org.eclipse.che.api.system.server.ServiceTermination;
import org.eclipse.che.api.workspace.server.WorkspaceServiceTermination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Termination for Kubernetes HTTP client.
*
* @author Max Shaposhnik (mshaposh@redhat.com)
*/
public class KubernetesClientTermination implements ServiceTermination {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientTermination.class);
private KubernetesClientFactory kubernetesClientFactory;
@Inject
public KubernetesClientTermination(KubernetesClientFactory factory) {
this.kubernetesClientFactory = factory;
}
@Override
public void terminate() throws InterruptedException {
suspend();
}
@Override
public void suspend() throws InterruptedException {
try {
kubernetesClientFactory.shutdownClient();
} catch (RuntimeException e) {
LOG.error(e.getMessage());
}
}
@Override
public String getServiceName() {
return "KubernetesClient";
}
@Override
public Set<String> getDependencies() {
return ImmutableSet.of(WorkspaceServiceTermination.SERVICE_NAME);
}
}

View File

@ -22,6 +22,7 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import java.util.Map;
import org.eclipse.che.api.system.server.ServiceTermination;
import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure;
import org.eclipse.che.api.workspace.server.spi.environment.InternalEnvironmentFactory;
import org.eclipse.che.api.workspace.server.spi.provision.env.CheApiExternalEnvVarProvider;
@ -81,6 +82,10 @@ public class KubernetesInfraModule extends AbstractModule {
volumesStrategies.addBinding(UNIQUE_STRATEGY).to(UniqueWorkspacePVCStrategy.class);
bind(WorkspaceVolumesStrategy.class).toProvider(WorkspaceVolumeStrategyProvider.class);
Multibinder.newSetBinder(binder(), ServiceTermination.class)
.addBinding()
.to(KubernetesClientTermination.class);
MapBinder<String, ExternalServerExposerStrategy<KubernetesEnvironment>> ingressStrategies =
MapBinder.newMapBinder(
binder(),

View File

@ -54,10 +54,6 @@
<groupId>io.fabric8</groupId>
<artifactId>openshift-client</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
@ -78,6 +74,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-api-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-api-system</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-api-workspace</artifactId>

View File

@ -24,7 +24,6 @@ import io.fabric8.openshift.client.OpenShiftConfigBuilder;
import io.fabric8.openshift.client.internal.OpenShiftOAuthInterceptor;
import java.io.IOException;
import java.net.URL;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
@ -226,15 +225,6 @@ public class OpenShiftClientFactory extends KubernetesClientFactory {
return createOC(buildConfig(getDefaultConfig(), null));
}
@PreDestroy
private void cleanup() {
try {
doCleanup();
} catch (RuntimeException ex) {
LOG.error(ex.getMessage());
}
}
/** Decorates the {@link DefaultOpenShiftClient} so that it can not be closed from the outside. */
private static class UnclosableOpenShiftClient extends DefaultOpenShiftClient {

View File

@ -18,6 +18,7 @@ import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import org.eclipse.che.api.system.server.ServiceTermination;
import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure;
import org.eclipse.che.api.workspace.server.spi.environment.InternalEnvironmentFactory;
import org.eclipse.che.api.workspace.server.spi.provision.env.CheApiExternalEnvVarProvider;
@ -25,6 +26,7 @@ import org.eclipse.che.api.workspace.server.spi.provision.env.CheApiInternalEnvV
import org.eclipse.che.api.workspace.server.spi.provision.env.EnvVarProvider;
import org.eclipse.che.workspace.infrastructure.docker.environment.dockerimage.DockerImageEnvironment;
import org.eclipse.che.workspace.infrastructure.docker.environment.dockerimage.DockerImageEnvironmentFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.KubernetesClientTermination;
import org.eclipse.che.workspace.infrastructure.kubernetes.bootstrapper.KubernetesBootstrapperFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.cache.KubernetesMachineCache;
import org.eclipse.che.workspace.infrastructure.kubernetes.cache.KubernetesRuntimeStateCache;
@ -87,5 +89,9 @@ public class OpenShiftInfraModule extends AbstractModule {
bind(KubernetesRuntimeStateCache.class).to(JpaKubernetesRuntimeStateCache.class);
bind(KubernetesMachineCache.class).to(JpaKubernetesMachineCache.class);
Multibinder.newSetBinder(binder(), ServiceTermination.class)
.addBinding()
.to(KubernetesClientTermination.class);
}
}

View File

@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
import static org.everrest.assured.JettyHttpServer.ADMIN_USER_NAME;
import static org.everrest.assured.JettyHttpServer.ADMIN_USER_PASSWORD;
import static org.everrest.assured.JettyHttpServer.SECURE_PATH;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
@ -92,7 +93,7 @@ public class SystemServicePermissionsFilterTest {
.then()
.statusCode(204);
verify(systemService).stop();
verify(systemService).stop(anyBoolean());
}
@Test
@ -107,7 +108,7 @@ public class SystemServicePermissionsFilterTest {
.then()
.statusCode(403);
verify(systemService, never()).stop();
verify(systemService, never()).stop(anyBoolean());
}
@Test

View File

@ -30,15 +30,37 @@ public enum EventType {
*/
STOPPING_SERVICE,
/**
* Published when system is starting to suspend a service. This is the first event published for a
* certain service.
*
* <pre>
* SUSPENDING_SERVICE -> (0..N)SERVICE_ITEM_SUSPENDED -> SERVICE_SUSPENDED
* </pre>
*/
SUSPENDING_SERVICE,
/**
* Published after service item is stopped. Events of such type are published between {@link
* #STOPPING_SERVICE} and {@link #SERVICE_STOPPED} events.
*/
SERVICE_ITEM_STOPPED,
/**
* Published after service item is suspended. Events of such type are published between {@link
* #SUSPENDING_SERVICE} and {@link #SERVICE_SUSPENDED} events.
*/
SERVICE_ITEM_SUSPENDED,
/**
* Published when shutting down of a service is finished. The last event in the chain for a
* certain service.
*/
SERVICE_STOPPED
SERVICE_STOPPED,
/**
* Published when suspending a service is finished. The last event in the chain for a certain
* service.
*/
SERVICE_SUSPENDED
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.api.system.shared.event.service;
import org.eclipse.che.api.system.shared.event.EventType;
/**
* See {@link EventType#SUSPENDING_SERVICE} description.
*
* @author Max Shaposhnyk
*/
public class SuspendingSystemServiceEvent extends SystemServiceEvent {
public SuspendingSystemServiceEvent(String serviceName) {
super(serviceName);
}
@Override
public EventType getType() {
return EventType.SUSPENDING_SERVICE;
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.api.system.shared.event.service;
import java.util.Objects;
import org.eclipse.che.api.system.shared.event.EventType;
import org.eclipse.che.commons.annotation.Nullable;
/**
* See {@link EventType#SERVICE_ITEM_SUSPENDED} description.
*
* @author Max Shaposhnyk
*/
public class SystemServiceItemSuspendedEvent extends SystemServiceEvent {
private final String item;
private Integer total;
private Integer current;
public SystemServiceItemSuspendedEvent(String serviceName, String item) {
super(serviceName);
this.item = Objects.requireNonNull(item, "Item required");
}
public SystemServiceItemSuspendedEvent(
String serviceName, String item, @Nullable Integer current, @Nullable Integer total) {
this(serviceName, item);
this.current = current;
this.total = total;
}
@Override
public EventType getType() {
return EventType.SERVICE_ITEM_SUSPENDED;
}
public String getItem() {
return item;
}
@Nullable
public Integer getTotal() {
return total;
}
@Nullable
public Integer getCurrent() {
return current;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof SystemServiceItemSuspendedEvent)) {
return false;
}
final SystemServiceItemSuspendedEvent that = (SystemServiceItemSuspendedEvent) obj;
return super.equals(that)
&& item.equals(that.item)
&& Objects.equals(total, that.total)
&& Objects.equals(current, that.current);
}
@Override
public int hashCode() {
int hash = 7;
hash = 31 * hash + super.hashCode();
hash = 31 * hash + item.hashCode();
hash = 31 * hash + Objects.hashCode(total);
hash = 31 * hash + Objects.hashCode(current);
return hash;
}
@Override
public String toString() {
return "SystemServiceItemSuspendedEvent{"
+ "item='"
+ item
+ '\''
+ ", total="
+ total
+ ", current="
+ current
+ ", eventType='"
+ getType()
+ '\''
+ ", service='"
+ getServiceName()
+ "\'}";
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.api.system.shared.event.service;
import org.eclipse.che.api.system.shared.event.EventType;
/**
* See {@link EventType#SERVICE_SUSPENDED} description.
*
* @author Max Shaposhnyk
*/
public class SystemServiceSuspendedEvent extends SystemServiceEvent {
public SystemServiceSuspendedEvent(String serviceName) {
super(serviceName);
}
@Override
public EventType getType() {
return EventType.SERVICE_SUSPENDED;
}
}

View File

@ -74,6 +74,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-inject</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-lang</artifactId>

View File

@ -10,12 +10,13 @@
*/
package org.eclipse.che.api.system.server;
import java.util.Set;
import org.eclipse.che.api.system.shared.event.service.StoppingSystemServiceEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceItemStoppedEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceStoppedEvent;
/**
* Defines an interface for implementing termination process for a certain service.
* Defines an interface for implementing termination or suspend process for a certain service.
*
* @author Yevhenii Voevodin
*/
@ -30,10 +31,31 @@ public interface ServiceTermination {
*/
void terminate() throws InterruptedException;
/**
* Suspends a certain service. Means that no more new service entities should be created and/or
* executed etc.
*
* @throws UnsupportedOperationException if this operation is not supported
* @throws InterruptedException as suspend is synchronous some of the implementations may need to
* wait for asynchronous jobs to finish their execution, so if suspend is interrupted and
* implementation supports suspending it should throw an interrupted exception
*/
default void suspend() throws InterruptedException, UnsupportedOperationException {
throw new UnsupportedOperationException("This operation is not supported.");
}
/**
* Returns the name of the service which is terminated by this termination. The name is used for
* logging/sending events like {@link StoppingSystemServiceEvent}, {@link
* SystemServiceItemStoppedEvent} or {@link SystemServiceStoppedEvent}.
*/
String getServiceName();
/**
* Returns set of terminations service names on which the given termination depends, i.e. it MUST
* be executed after them.
*
* @return list of dependencies is any, or empty list otherwise.
*/
Set<String> getDependencies();
}

View File

@ -10,16 +10,25 @@
*/
package org.eclipse.che.api.system.server;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSortedSet;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.system.shared.event.service.StoppingSystemServiceEvent;
import org.eclipse.che.api.system.shared.event.service.SuspendingSystemServiceEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceStoppedEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceSuspendedEvent;
import org.eclipse.che.inject.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Terminates system services.
* Terminates or suspends system services.
*
* @author Yevhenii Voevodin
*/
@ -33,27 +42,122 @@ class ServiceTerminator {
@Inject
ServiceTerminator(EventService eventService, Set<ServiceTermination> terminations) {
this.eventService = eventService;
this.terminations = terminations;
checkNamesAndDependencies(terminations);
this.terminations =
ImmutableSortedSet.copyOf(new ServiceTerminationComparator(terminations), terminations);
}
/**
* Terminates system services.
* Terminates system services in a order satisfying termination dependencies.
*
* @throws InterruptedException when termination is interrupted
*/
void terminateAll() throws InterruptedException {
for (ServiceTermination termination : terminations) {
LOG.info("Shutting down '{}' service", termination.getServiceName());
eventService.publish(new StoppingSystemServiceEvent(termination.getServiceName()));
doTerminate(termination);
}
}
/**
* Suspends system services in a order satisfying termination dependencies.
*
* @throws InterruptedException when suspending is interrupted
*/
void suspendAll() throws InterruptedException {
for (ServiceTermination termination : terminations) {
LOG.info("Suspending down '{}' service", termination.getServiceName());
eventService.publish(new SuspendingSystemServiceEvent(termination.getServiceName()));
try {
termination.terminate();
termination.suspend();
eventService.publish(new SystemServiceSuspendedEvent(termination.getServiceName()));
LOG.info("Service '{}' is suspended", termination.getServiceName());
} catch (UnsupportedOperationException e) {
LOG.info(
"Suspending down '{}' service isn't supported, terminating it",
termination.getServiceName());
doTerminate(termination);
} catch (InterruptedException x) {
LOG.error(
"Interrupted while waiting for '{}' service to shutdown", termination.getServiceName());
"Interrupted while waiting for '{}' service to suspend", termination.getServiceName());
throw x;
}
LOG.info("Service '{}' is shut down", termination.getServiceName());
eventService.publish(new SystemServiceStoppedEvent(termination.getServiceName()));
}
}
@VisibleForTesting
void doTerminate(ServiceTermination termination) throws InterruptedException {
eventService.publish(new StoppingSystemServiceEvent(termination.getServiceName()));
try {
termination.terminate();
} catch (InterruptedException x) {
LOG.error(
"Interrupted while waiting for '{}' service to shutdown", termination.getServiceName());
throw x;
}
LOG.info("Service '{}' is shut down", termination.getServiceName());
eventService.publish(new SystemServiceStoppedEvent(termination.getServiceName()));
}
private void checkNamesAndDependencies(Set<ServiceTermination> terminationSet) {
Set<String> uniqueNamesSet = new HashSet<>();
terminationSet.forEach(
t -> {
if (!uniqueNamesSet.add(t.getServiceName())) {
throw new ConfigurationException(
String.format(
"Duplicate termination found with service name %s", t.getServiceName()));
}
});
terminationSet.forEach(
t -> {
if (!uniqueNamesSet.containsAll(t.getDependencies())) {
throw new RuntimeException(
String.format("Unknown dependency found in termination %s", t.getServiceName()));
}
});
}
public static class ServiceTerminationComparator implements Comparator<ServiceTermination> {
private final Map<String, Set<String>> dependencies;
public ServiceTerminationComparator(Set<ServiceTermination> terminations) {
this.dependencies =
terminations
.stream()
.collect(
Collectors.toMap(
ServiceTermination::getServiceName, ServiceTermination::getDependencies));
}
@Override
public int compare(ServiceTermination o1, ServiceTermination o2) {
return checkTransitiveDependency(o1.getServiceName(), o2.getServiceName(), new HashSet<>());
}
// Recursively dig into dependencies and sort them out
private int checkTransitiveDependency(String o1, String o2, Set<String> loopList) {
if (loopList.contains(o1)) {
throw new RuntimeException("Circular dependency found between terminations " + loopList);
}
Set<String> directDependencies = dependencies.get(o1);
if (directDependencies.isEmpty()) {
return -1;
} else {
if (directDependencies.contains(o2)) {
return 1;
} else {
loopList.add(o1);
for (String dependency : directDependencies) {
if (checkTransitiveDependency(dependency, o2, loopList) > 0) {
return 1;
}
}
return -1;
}
}
}
}
}

View File

@ -57,8 +57,8 @@ public class SystemManager {
}
/**
* Stops some of the system services preparing system to lighter shutdown. System status is
* changed from {@link SystemStatus#RUNNING} to {@link SystemStatus#PREPARING_TO_SHUTDOWN}.
* Stops some of the system services preparing system to full shutdown. System status is changed
* from {@link SystemStatus#RUNNING} to {@link SystemStatus#PREPARING_TO_SHUTDOWN}.
*
* @throws ConflictException when system status is different from running
*/
@ -78,6 +78,28 @@ public class SystemManager {
exec.shutdown();
}
/**
* Suspends some of the system services preparing system to lighter shutdown. System status is
* changed from {@link SystemStatus#RUNNING} to {@link SystemStatus#PREPARING_TO_SHUTDOWN}.
*
* @throws ConflictException when system status is different from running
*/
public void suspendServices() throws ConflictException {
if (!statusRef.compareAndSet(RUNNING, PREPARING_TO_SHUTDOWN)) {
throw new ConflictException(
"System shutdown has been already called, system status: " + statusRef.get());
}
ExecutorService exec =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("SuspendSystemServicesPool")
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.build());
exec.execute(ThreadLocalPropagateContext.wrap(this::doSuspendServices));
exec.shutdown();
}
/**
* Gets current system status.
*
@ -105,13 +127,31 @@ public class SystemManager {
}
}
/** Synchronously stops corresponding services. */
private void doSuspendServices() {
LOG.info("Preparing system to shutdown");
eventService.publish(asDto(new SystemStatusChangedEvent(RUNNING, PREPARING_TO_SHUTDOWN)));
try {
terminator.suspendAll();
statusRef.set(READY_TO_SHUTDOWN);
eventService.publish(
asDto(new SystemStatusChangedEvent(PREPARING_TO_SHUTDOWN, READY_TO_SHUTDOWN)));
LOG.info("System is ready to shutdown");
} catch (InterruptedException x) {
LOG.error("Interrupted while waiting for system service to shutdown components");
Thread.currentThread().interrupt();
} finally {
shutdownLatch.countDown();
}
}
@PreDestroy
@VisibleForTesting
void shutdown() throws InterruptedException {
if (!statusRef.compareAndSet(RUNNING, PREPARING_TO_SHUTDOWN)) {
shutdownLatch.await();
} else {
doStopServices();
doSuspendServices();
}
}
}

View File

@ -19,10 +19,12 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import org.eclipse.che.api.core.ConflictException;
import org.eclipse.che.api.core.rest.Service;
import org.eclipse.che.api.core.rest.shared.dto.Link;
@ -53,8 +55,13 @@ public class SystemService extends Service {
@ApiResponse(code = 204, message = "The system is preparing to stop"),
@ApiResponse(code = 409, message = "Stop has been already called")
})
public void stop() throws ConflictException {
manager.stopServices();
public void stop(@QueryParam("shutdown") @DefaultValue("false") boolean shutdown)
throws ConflictException {
if (shutdown) {
manager.stopServices();
} else {
manager.suspendServices();
}
}
@GET

View File

@ -83,7 +83,10 @@ public class DtoConverterTest {
EnumSet.of(
EventType.STATUS_CHANGED,
EventType.STOPPING_SERVICE,
EventType.SUSPENDING_SERVICE,
EventType.SERVICE_ITEM_STOPPED,
EventType.SERVICE_ITEM_SUSPENDED,
EventType.SERVICE_SUSPENDED,
EventType.SERVICE_STOPPED);
assertEquals(handled, EnumSet.allOf(EventType.class));
}

View File

@ -59,6 +59,13 @@ public class SystemManagerTest {
assertEquals(systemManager.getSystemStatus(), RUNNING);
}
@Test
public void servicesAreSuspended() throws Exception {
systemManager.suspendServices();
verifySuspendCompleted();
}
@Test
public void servicesAreStopped() throws Exception {
systemManager.stopServices();
@ -84,11 +91,20 @@ public class SystemManagerTest {
public void shutdownStopsServicesIfNotStopped() throws Exception {
systemManager.shutdown();
verifyShutdownCompleted();
verifySuspendCompleted();
}
private void verifyShutdownCompleted() throws InterruptedException {
verify(terminator, timeout(2000)).terminateAll();
verifyEvents();
}
private void verifySuspendCompleted() throws InterruptedException {
verify(terminator, timeout(2000)).suspendAll();
verifyEvents();
}
private void verifyEvents() {
verify(eventService, times(2)).publish(eventsCaptor.capture());
Iterator<SystemStatusChangedEventDto> eventsIt = eventsCaptor.getAllValues().iterator();
assertEquals(

View File

@ -10,17 +10,27 @@
*/
package org.eclipse.che.api.system.server;
import static java.util.stream.Collectors.toSet;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.Set;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.system.shared.event.service.StoppingSystemServiceEvent;
import org.eclipse.che.api.system.shared.event.service.SuspendingSystemServiceEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceStoppedEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceSuspendedEvent;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.testng.MockitoTestNGListener;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;
@ -57,6 +67,34 @@ public class SystemTerminatorTest {
verify(eventService).publish(new SystemServiceStoppedEvent("service2"));
}
@Test
public void executesSuspendals() throws Exception {
terminator.suspendAll();
verify(termination1).suspend();
verify(termination2).suspend();
verify(eventService).publish(new SuspendingSystemServiceEvent("service1"));
verify(eventService).publish(new SystemServiceSuspendedEvent("service1"));
verify(eventService).publish(new SuspendingSystemServiceEvent("service2"));
verify(eventService).publish(new SystemServiceSuspendedEvent("service2"));
}
@Test
public void executesTermitationsWhenSuspendalsNotSupported() throws Exception {
doThrow(UnsupportedOperationException.class).when(termination1).suspend();
terminator.suspendAll();
verify(termination1).terminate();
verify(termination2).suspend();
verify(eventService).publish(new SuspendingSystemServiceEvent("service1"));
verify(eventService).publish(new StoppingSystemServiceEvent("service1"));
verify(eventService).publish(new SystemServiceStoppedEvent("service1"));
verify(eventService).publish(new SuspendingSystemServiceEvent("service2"));
verify(eventService).publish(new SystemServiceSuspendedEvent("service2"));
}
@Test(
expectedExceptions = InterruptedException.class,
expectedExceptionsMessageRegExp = "interrupt!"
@ -66,4 +104,118 @@ public class SystemTerminatorTest {
terminator.terminateAll();
}
@Test(dataProvider = "dependableTerminations")
public void shouldOrderTerminationsByDependency(
Set<ServiceTermination> terminations, Set<String> expectedOrder) throws Exception {
ServiceTerminator localTerminator = spy(new ServiceTerminator(eventService, terminations));
localTerminator.suspendAll();
ArgumentCaptor<ServiceTermination> captor = ArgumentCaptor.forClass(ServiceTermination.class);
verify(localTerminator, times(terminations.size())).doTerminate(captor.capture());
assertEquals(
captor.getAllValues().stream().map(ServiceTermination::getServiceName).collect(toSet()),
expectedOrder);
}
@Test(
dataProvider = "loopableTerminations",
expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Circular dependency found between terminations \\[B, D\\]"
)
public void shouldFailOnCyclicDependency(Set<ServiceTermination> terminations) throws Exception {
new ServiceTerminator(eventService, terminations);
}
@Test(
dataProvider = "sameNameTerminations",
expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Duplicate termination found with service name .+"
)
public void shouldFailOnTerminationsWithSameServiceName(Set<ServiceTermination> terminations)
throws Exception {
new ServiceTerminator(eventService, terminations);
}
@Test(
dataProvider = "wrongDependencyTerminations",
expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Unknown dependency found in termination .+"
)
public void shouldFailOnTerminationsWithUnexistingDeps(Set<ServiceTermination> terminations)
throws Exception {
new ServiceTerminator(eventService, terminations);
}
@DataProvider(name = "dependableTerminations")
public Object[][] dependableTerminations() {
return new Object[][] {
{
ImmutableSet.of(
getServiceTerminationWithDependency("A", Collections.emptySet()),
getServiceTerminationWithDependency("B", ImmutableSet.of("C", "D", "G")),
getServiceTerminationWithDependency("C", Collections.emptySet()),
getServiceTerminationWithDependency("D", ImmutableSet.of("C")),
getServiceTerminationWithDependency("E", ImmutableSet.of("B")),
getServiceTerminationWithDependency("F", ImmutableSet.of("C")),
getServiceTerminationWithDependency("G", ImmutableSet.of("C"))),
ImmutableSet.of("A", "C", "D", "F", "G", "B", "E")
}
};
}
@DataProvider(name = "loopableTerminations")
public Object[][] loopableTerminations() {
return new Object[][] {
{
ImmutableSet.of(
getServiceTerminationWithDependency("A", Collections.emptySet()),
getServiceTerminationWithDependency("B", ImmutableSet.of("C", "D")),
getServiceTerminationWithDependency("C", Collections.emptySet()),
getServiceTerminationWithDependency("D", ImmutableSet.of("B")) // loop here
)
}
};
}
@DataProvider(name = "sameNameTerminations")
public Object[][] sameNameTerminations() {
return new Object[][] {
{
ImmutableSet.of(
getServiceTerminationWithDependency("A", Collections.emptySet()),
getServiceTerminationWithDependency("C", Collections.emptySet()),
getServiceTerminationWithDependency("C", Collections.emptySet()))
}
};
}
@DataProvider(name = "wrongDependencyTerminations")
public Object[][] wrongDependencyTerminations() {
return new Object[][] {
{
ImmutableSet.of(
getServiceTerminationWithDependency("A", Collections.emptySet()),
// no such termination
getServiceTerminationWithDependency("C", ImmutableSet.of("B")))
}
};
}
private ServiceTermination getServiceTerminationWithDependency(
String name, Set<String> depencencies) {
return new ServiceTermination() {
@Override
public void terminate() throws InterruptedException {}
@Override
public String getServiceName() {
return name;
}
@Override
public Set<String> getDependencies() {
return depencencies;
}
};
}
}

View File

@ -14,6 +14,7 @@ import static org.eclipse.che.api.system.server.DtoConverter.asDto;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
@ -28,15 +29,23 @@ import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.core.notification.EventSubscriber;
import org.eclipse.che.api.system.server.ServiceTermination;
import org.eclipse.che.api.system.shared.event.service.SystemServiceItemStoppedEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceItemSuspendedEvent;
import org.eclipse.che.api.system.shared.event.service.SystemServiceStoppedEvent;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure;
import org.eclipse.che.api.workspace.shared.dto.event.WorkspaceStatusEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Terminates workspace service.
* Terminates workspace service. In case of full system shutdown or if current infra doesn't support
* workspaces recovery, it blocks starting new workspaces and stops all that already running. In
* case of suspend and recovery support, blocks starting new workspaces and waits until all
* workspaces that are currently in a starting/stopping state to finish this process and become
* stable running or stopped state.
*
* @author Yevhenii Voevodin
* @author Max Shaposhnyk
*/
public class WorkspaceServiceTermination implements ServiceTermination {
@ -45,9 +54,12 @@ public class WorkspaceServiceTermination implements ServiceTermination {
/** Delay in MS between runtimes stopped checks. The value is experimental. */
private static final long DEFAULT_PULL_RUNTIMES_PERIOD_MS = TimeUnit.SECONDS.toMillis(1);
public static final String SERVICE_NAME = "workspace";
private final WorkspaceManager manager;
private final WorkspaceSharedPool sharedPool;
private final WorkspaceRuntimes runtimes;
private final RuntimeInfrastructure runtimeInfrastructure;
private final EventService eventService;
@Inject
@ -55,18 +67,30 @@ public class WorkspaceServiceTermination implements ServiceTermination {
WorkspaceManager manager,
WorkspaceSharedPool sharedPool,
WorkspaceRuntimes runtimes,
RuntimeInfrastructure runtimeInfrastructure,
EventService eventService) {
this.manager = manager;
this.sharedPool = sharedPool;
this.runtimes = runtimes;
this.runtimeInfrastructure = runtimeInfrastructure;
this.eventService = eventService;
}
@Override
public String getServiceName() {
return "workspace";
return SERVICE_NAME;
}
@Override
public Set<String> getDependencies() {
return Collections.emptySet();
}
/**
* Blocks starting new workspaces and stops all that already running
*
* @throws InterruptedException
*/
@Override
public void terminate() throws InterruptedException {
Preconditions.checkState(runtimes.refuseStart());
@ -82,8 +106,33 @@ public class WorkspaceServiceTermination implements ServiceTermination {
}
}
/**
* Blocks starting new workspaces and waits until all workspaces that are currently in a
* starting/stopping state to finish this process
*
* @throws InterruptedException
* @throws UnsupportedOperationException
*/
@Override
public void suspend() throws InterruptedException, UnsupportedOperationException {
Preconditions.checkState(runtimes.refuseStart());
try {
runtimeInfrastructure.getIdentities();
} catch (UnsupportedOperationException | InfrastructureException e) {
throw new UnsupportedOperationException("Current infrastructure does not support suspend.");
}
WorkspaceSuspendedEventsPropagator propagator = new WorkspaceSuspendedEventsPropagator();
eventService.subscribe(propagator);
try {
waitAllWorkspacesRunningOrStopped();
sharedPool.shutdown();
} finally {
eventService.unsubscribe(propagator);
}
}
private void stopRunningAndStartingWorkspacesAsync() {
for (String workspaceId : runtimes.getRuntimesIds()) {
for (String workspaceId : runtimes.getRunning()) {
WorkspaceStatus status = runtimes.getStatus(workspaceId);
if (status == WorkspaceStatus.RUNNING || status == WorkspaceStatus.STARTING) {
try {
@ -107,7 +156,7 @@ public class WorkspaceServiceTermination implements ServiceTermination {
private final AtomicInteger currentlyStopped;
private WorkspaceStoppedEventsPropagator() {
this.totalRunning = runtimes.getRuntimesIds().size();
this.totalRunning = runtimes.getRunning().size();
this.currentlyStopped = new AtomicInteger(0);
}
@ -125,6 +174,33 @@ public class WorkspaceServiceTermination implements ServiceTermination {
}
}
/** Propagates workspace suspended events as {@link SystemServiceItemSuspendedEvent} events. */
private class WorkspaceSuspendedEventsPropagator
implements EventSubscriber<WorkspaceStatusEvent> {
private final int totalRunning;
private final AtomicInteger currentlyStopped;
private WorkspaceSuspendedEventsPropagator() {
this.totalRunning = runtimes.getInProgress().size();
this.currentlyStopped = new AtomicInteger(0);
}
@Override
public void onEvent(WorkspaceStatusEvent event) {
if (event.getStatus() == WorkspaceStatus.STOPPED
|| event.getStatus() == WorkspaceStatus.RUNNING) {
eventService.publish(
asDto(
new SystemServiceItemSuspendedEvent(
getServiceName(),
event.getWorkspaceId(),
currentlyStopped.incrementAndGet(),
totalRunning)));
}
}
}
private void waitAllWorkspacesStopped() throws InterruptedException {
Timer timer = new Timer("RuntimesStoppedTracker", false);
CountDownLatch latch = new CountDownLatch(1);
@ -142,4 +218,22 @@ public class WorkspaceServiceTermination implements ServiceTermination {
DEFAULT_PULL_RUNTIMES_PERIOD_MS);
latch.await();
}
private void waitAllWorkspacesRunningOrStopped() throws InterruptedException {
Timer timer = new Timer("RuntimesStoppedTracker", false);
CountDownLatch latch = new CountDownLatch(1);
timer.schedule(
new TimerTask() {
@Override
public void run() {
if (!runtimes.isAnyInProgress()) {
timer.cancel();
latch.countDown();
}
}
},
0,
DEFAULT_PULL_RUNTIMES_PERIOD_MS);
latch.await();
}
}

View File

@ -12,7 +12,11 @@ package org.eclipse.che.api.workspace.server;
import static org.eclipse.che.dto.server.DtoFactory.newDto;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -23,6 +27,7 @@ import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.core.notification.EventSubscriber;
import org.eclipse.che.api.system.shared.dto.SystemServiceItemStoppedEventDto;
import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure;
import org.eclipse.che.api.workspace.shared.dto.event.WorkspaceStatusEvent;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -48,6 +53,8 @@ public class WorkspaceServiceTerminationTest {
@Mock private WorkspaceRuntimes workspaceRuntimes;
@Mock private RuntimeInfrastructure runtimeInfrastructure;
@InjectMocks private WorkspaceServiceTermination termination;
@BeforeMethod
@ -56,16 +63,19 @@ public class WorkspaceServiceTerminationTest {
}
@Test(dataProvider = "workspaceStoppedOnTerminationStatuses", timeOut = 1000L)
public void shutsDownWorkspaceService(WorkspaceStatus status) throws Exception {
public void shutsDownWorkspaceServiceIfFullShutdownRequested(WorkspaceStatus status)
throws Exception {
String workspaceId = "workspace123";
AtomicBoolean isAnyRunning = new AtomicBoolean(true);
when(workspaceRuntimes.isAnyRunning()).thenAnswer(inv -> isAnyRunning.get());
// one workspace is running
when(workspaceRuntimes.getRuntimesIds()).thenReturn(Collections.singleton(workspaceId));
when(workspaceRuntimes.getRunning()).thenReturn(Collections.singleton(workspaceId));
when(workspaceRuntimes.getStatus(workspaceId)).thenReturn(status);
when(runtimeInfrastructure.getIdentities()).thenReturn(Collections.emptySet());
// once stopped change the flag
doAnswer(
inv -> {
@ -79,9 +89,32 @@ public class WorkspaceServiceTerminationTest {
termination.terminate();
}
@Test
public void suspendsWorkspaceService() throws Exception {
AtomicBoolean isAnyRunning = new AtomicBoolean(true);
String workspaceId = "workspace123";
// one workspace is running
when(workspaceRuntimes.getRunning()).thenReturn(Collections.singleton(workspaceId));
when(workspaceRuntimes.getStatus(workspaceId)).thenReturn(WorkspaceStatus.RUNNING);
when(workspaceRuntimes.isAnyInProgress())
.thenAnswer(
inv -> {
boolean prev = isAnyRunning.get();
isAnyRunning.set(false);
return prev;
});
// do the actual termination
termination.suspend();
// verify checked progress workspaces
verify(workspaceRuntimes, atLeastOnce()).isAnyInProgress();
verify(workspaceManager, never()).stopWorkspace(anyString(), anyMap());
}
@Test
public void publishesStoppedWorkspaceStoppedEventsAsServiceItemStoppedEvents() throws Exception {
when(workspaceRuntimes.getRuntimesIds()).thenReturn(ImmutableSet.of("id1", "id2", "id3"));
when(workspaceRuntimes.getRunning()).thenReturn(ImmutableSet.of("id1", "id2", "id3"));
doAnswer(
inv -> {
@SuppressWarnings("unchecked")