From 5c4fd8b6982f7be2a5fd2a8cbcfd90a3a0258e45 Mon Sep 17 00:00:00 2001 From: Sergii Leshchenko Date: Mon, 14 May 2018 12:36:07 +0300 Subject: [PATCH] CHE-9502 Add an ability to schedule servers checkers for STARTING K8s/OS runtime (#9685) --- .../kubernetes/KubernetesInfrastructure.java | 6 +- .../kubernetes/KubernetesInternalRuntime.java | 50 +++++++--- .../kubernetes/KubernetesRuntimeContext.java | 5 +- .../KubernetesInternalRuntimeTest.java | 48 ++++++++++ .../openshift/OpenShiftRuntimeContext.java | 5 +- .../server/hc/probe/ProbeScheduler.java | 91 ++++++++++++++++++- .../hc/probe/WorkspaceProbesFactory.java | 13 ++- 7 files changed, 190 insertions(+), 28 deletions(-) diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInfrastructure.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInfrastructure.java index 1ffe610bce..95512454af 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInfrastructure.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInfrastructure.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableSet; import java.util.Set; import javax.inject.Inject; import javax.inject.Singleton; -import org.eclipse.che.api.core.ValidationException; import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; import org.eclipse.che.api.core.notification.EventService; import org.eclipse.che.api.workspace.server.spi.InfrastructureException; @@ -66,8 +65,7 @@ public class KubernetesInfrastructure extends RuntimeInfrastructure { @Override protected KubernetesRuntimeContext internalPrepare( - RuntimeIdentity id, InternalEnvironment environment) - throws ValidationException, InfrastructureException { + RuntimeIdentity id, InternalEnvironment environment) throws InfrastructureException { final KubernetesEnvironment kubernetesEnvironment = asKubernetesEnv(environment); k8sEnvProvisioner.provision(kubernetesEnvironment, id); @@ -76,7 +74,7 @@ public class KubernetesInfrastructure extends RuntimeInfrastructure { } private KubernetesEnvironment asKubernetesEnv(InternalEnvironment source) - throws ValidationException, InfrastructureException { + throws InfrastructureException { if (source instanceof KubernetesEnvironment) { return (KubernetesEnvironment) source; } diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntime.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntime.java index e7afb65524..b827c92efd 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntime.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntime.java @@ -43,7 +43,6 @@ import javax.inject.Inject; import javax.inject.Named; import org.eclipse.che.api.core.model.workspace.Warning; import org.eclipse.che.api.core.model.workspace.WorkspaceStatus; -import org.eclipse.che.api.core.model.workspace.runtime.Machine; import org.eclipse.che.api.core.model.workspace.runtime.MachineStatus; import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; import org.eclipse.che.api.core.model.workspace.runtime.ServerStatus; @@ -53,6 +52,7 @@ import org.eclipse.che.api.workspace.server.hc.ServersCheckerFactory; import org.eclipse.che.api.workspace.server.hc.probe.ProbeResult; import org.eclipse.che.api.workspace.server.hc.probe.ProbeResult.ProbeStatus; import org.eclipse.che.api.workspace.server.hc.probe.ProbeScheduler; +import org.eclipse.che.api.workspace.server.hc.probe.WorkspaceProbes; import org.eclipse.che.api.workspace.server.hc.probe.WorkspaceProbesFactory; import org.eclipse.che.api.workspace.server.spi.InfrastructureException; import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException; @@ -548,14 +548,41 @@ public class KubernetesInternalRuntime< } } - public void startServersCheckers() throws InfrastructureException { - for (Entry machineEntry : getMachines().entrySet()) { + /** + * Schedules server checkers. + * + *

Note that if the runtime is {@link WorkspaceStatus#RUNNING} then checkers will be scheduled + * immediately. If the runtime is {@link WorkspaceStatus#STARTING} then checkers will be scheduled + * when it becomes {@link WorkspaceStatus#RUNNING}. If runtime has any another status then + * checkers won't be scheduled at all. + * + * @throws InfrastructureException when any exception occurred + */ + public void scheduleServersCheckers() throws InfrastructureException { + WorkspaceStatus status = getStatus(); + + if (status != WorkspaceStatus.RUNNING && status != WorkspaceStatus.STARTING) { + return; + } + + ServerLivenessHandler consumer = new ServerLivenessHandler(); + WorkspaceProbes probes = + probesFactory.getProbes(getContext().getIdentity(), getInternalMachines()); + + if (status == WorkspaceStatus.RUNNING) { + probeScheduler.schedule(probes, consumer); + } else { + // Workspace is starting it is needed to start servers checkers when it becomes RUNNING probeScheduler.schedule( - probesFactory.getProbes( - getContext().getIdentity(), - machineEntry.getKey(), - machineEntry.getValue().getServers()), - new ServerLivenessHandler()); + probes, + consumer, + () -> { + try { + return getStatus(); + } catch (InfrastructureException e) { + throw new RuntimeException(e.getMessage()); + } + }); } } @@ -661,9 +688,10 @@ public class KubernetesInternalRuntime< } /** - * @param ContainerEvent - * @return true if event reason or message matches one of the comma separated values defined in - * 'che.infra.kubernetes.workspace_unrecoverable_events',false otherwise + * Returns true if event reason or message matches one of the comma separated values defined in + * 'che.infra.kubernetes.workspace_unrecoverable_events',false otherwise + * + * @param event event to check */ private boolean isUnrecoverable(ContainerEvent event) { boolean isUnrecoverable = false; diff --git a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesRuntimeContext.java b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesRuntimeContext.java index 0a86614486..6018fb514a 100644 --- a/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesRuntimeContext.java +++ b/infrastructures/kubernetes/src/main/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesRuntimeContext.java @@ -16,7 +16,6 @@ import java.util.Optional; import javax.inject.Inject; import javax.inject.Named; import org.eclipse.che.api.core.ValidationException; -import org.eclipse.che.api.core.model.workspace.WorkspaceStatus; import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; import org.eclipse.che.api.workspace.server.spi.InfrastructureException; import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException; @@ -81,9 +80,7 @@ public class KubernetesRuntimeContext extends R namespaceFactory.create(workspaceId, runtimeState.getNamespace()), getEnvironment().getWarnings()); - if (runtime.getStatus() == WorkspaceStatus.RUNNING) { - runtime.startServersCheckers(); - } + runtime.scheduleServersCheckers(); return runtime; } diff --git a/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntimeTest.java b/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntimeTest.java index b2d8ea6fa9..8a157b250f 100644 --- a/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntimeTest.java +++ b/infrastructures/kubernetes/src/test/java/org/eclipse/che/workspace/infrastructure/kubernetes/KubernetesInternalRuntimeTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -595,6 +596,53 @@ public class KubernetesInternalRuntimeTest { assertFalse(runtimeStatesCache.get(internalRuntime.getContext().getIdentity()).isPresent()); } + @Test + public void shouldScheduleServerCheckersForRunningRuntime() throws Exception { + // given + runtimeStatesCache.putIfAbsent( + new KubernetesRuntimeState( + internalRuntime.getContext().getIdentity(), "test", WorkspaceStatus.RUNNING)); + + // when + internalRuntime.scheduleServersCheckers(); + + // then + verify(probesScheduler).schedule(any(), any()); + } + + @Test + public void shouldScheduleServerCheckersForStartingRuntime() throws Exception { + // given + runtimeStatesCache.putIfAbsent( + new KubernetesRuntimeState( + internalRuntime.getContext().getIdentity(), "test", WorkspaceStatus.STARTING)); + + // when + internalRuntime.scheduleServersCheckers(); + + // then + verify(probesScheduler).schedule(any(), any(), any()); + } + + @Test(dataProvider = "nonStartingRunningStatuses") + public void shouldNotScheduleServerCheckersIfRuntimeIsNotStartingOrRunning(WorkspaceStatus status) + throws Exception { + // given + runtimeStatesCache.putIfAbsent( + new KubernetesRuntimeState(internalRuntime.getContext().getIdentity(), "test", status)); + + // when + internalRuntime.scheduleServersCheckers(); + + // then + verifyZeroInteractions(probesScheduler); + } + + @DataProvider(name = "nonStartingRunningStatuses") + public Object[][] nonStartingRunningStatuses() { + return new Object[][] {{WorkspaceStatus.STOPPED}, {WorkspaceStatus.STOPPING}}; + } + private static MachineStatusEvent newEvent(String machineName, MachineStatus status) { return newDto(MachineStatusEvent.class) .withIdentity(DtoConverter.asDto(IDENTITY)) diff --git a/infrastructures/openshift/src/main/java/org/eclipse/che/workspace/infrastructure/openshift/OpenShiftRuntimeContext.java b/infrastructures/openshift/src/main/java/org/eclipse/che/workspace/infrastructure/openshift/OpenShiftRuntimeContext.java index 61ffb50db7..5fa64e6608 100644 --- a/infrastructures/openshift/src/main/java/org/eclipse/che/workspace/infrastructure/openshift/OpenShiftRuntimeContext.java +++ b/infrastructures/openshift/src/main/java/org/eclipse/che/workspace/infrastructure/openshift/OpenShiftRuntimeContext.java @@ -15,7 +15,6 @@ import java.util.Optional; import javax.inject.Inject; import javax.inject.Named; import org.eclipse.che.api.core.ValidationException; -import org.eclipse.che.api.core.model.workspace.WorkspaceStatus; import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity; import org.eclipse.che.api.workspace.server.spi.InfrastructureException; import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure; @@ -74,9 +73,7 @@ public class OpenShiftRuntimeContext extends KubernetesRuntimeContext schedule(probes.getWorkspaceId(), probeFactory, probeResultConsumer)); } + /** + * Schedules provided {@link WorkspaceProbes} when a workspace becomes {@link + * WorkspaceStatus#RUNNING}. + * + *

Note that probes scheduling will be canceled when {@link #cancel(String)} is called or when + * a workspace becomes {@link WorkspaceStatus#STOPPING} or {@link WorkspaceStatus#STOPPED}. + * + * @param probes probes to schedule + * @param probeResultConsumer consumer of {@link ProbeResult} instances produced on retrieving + * probe execution results + * @param statusSupplier supplier to retrieve workspace status. Scheduling will be delayed if + * {@link RuntimeException} is thrown + * @see #schedule(WorkspaceProbes, Consumer) + */ + public void schedule( + WorkspaceProbes probes, + Consumer probeResultConsumer, + Supplier statusSupplier) { + DelayedSchedulingTask task = + new DelayedSchedulingTask(statusSupplier, probes, probeResultConsumer); + + // scheduleWithFixedDelay is used in favor of scheduleAtFixedRate because in case of big amount + // of scheduled probes start time of tasks may shift and this may lead to a situation when + // another probeConfig is needed immediately after the previous one is finished which doesn't + // seem a good thing + ScheduledFuture scheduledFuture = + probesExecutor.scheduleWithFixedDelay(task, 10L, 10L, TimeUnit.SECONDS); + + probesFutures.compute( + probes.getWorkspaceId(), + (key, scheduledFutures) -> { + List target = scheduledFutures; + if (target == null) { + target = new ArrayList<>(); + } + target.add(scheduledFuture); + return target; + }); + } + /** * Dismisses following and if possible current executions of probes of a workspace with a * specified ID. */ public void cancel(String workspaceId) { List tasks = probesFutures.remove(workspaceId); - if (tasks == null) { - return; + if (tasks != null) { + tasks.forEach(task -> task.cancel(true)); } - - tasks.forEach(task -> task.cancel(true)); } /** Denies starting of new probes and terminates active one if scheduler not terminated yet. */ @@ -216,6 +256,49 @@ public class ProbeScheduler { } } + private class DelayedSchedulingTask implements Runnable { + private final String workspaceId; + private final Supplier statusSupplier; + private final WorkspaceProbes probes; + private final Consumer probeResultConsumer; + + DelayedSchedulingTask( + Supplier statusSupplier, + WorkspaceProbes probes, + Consumer probeResultConsumer) { + this.workspaceId = probes.getWorkspaceId(); + this.statusSupplier = statusSupplier; + this.probes = probes; + this.probeResultConsumer = probeResultConsumer; + } + + @Override + public void run() { + WorkspaceStatus status; + + try { + status = statusSupplier.get(); + } catch (RuntimeException e) { + // delay + return; + } + + switch (status) { + case STARTING: + // delay + return; + case RUNNING: + ProbeScheduler.this.cancel(workspaceId); + schedule(probes, probeResultConsumer); + return; + case STOPPED: + case STOPPING: + default: + ProbeScheduler.this.cancel(workspaceId); + } + } + } + private class TimeoutProbeTask extends TimerTask { private final Probe probe; diff --git a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/hc/probe/WorkspaceProbesFactory.java b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/hc/probe/WorkspaceProbesFactory.java index 4ccfd137e9..583740c586 100644 --- a/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/hc/probe/WorkspaceProbesFactory.java +++ b/wsmaster/che-core-api-workspace/src/main/java/org/eclipse/che/api/workspace/server/hc/probe/WorkspaceProbesFactory.java @@ -64,9 +64,20 @@ public class WorkspaceProbesFactory { */ public WorkspaceProbes getProbes(RuntimeIdentity runtimeId, Runtime runtime) throws InfrastructureException { + return getProbes(runtimeId, runtime.getMachines()); + } + + /** + * Get {@link WorkspaceProbes} for servers of the specified machines + * + * @throws InfrastructureException when the operation fails + */ + public WorkspaceProbes getProbes( + RuntimeIdentity runtimeId, Map machines) + throws InfrastructureException { List factories = new ArrayList<>(); try { - for (Entry entry : runtime.getMachines().entrySet()) { + for (Entry entry : machines.entrySet()) { fillProbes(runtimeId, entry.getKey(), factories, entry.getValue().getServers()); } } catch (MalformedURLException e) {