CHE-9502 Add an ability to schedule servers checkers for STARTING K8s/OS runtime (#9685)

6.19.x
Sergii Leshchenko 2018-05-14 12:36:07 +03:00 committed by GitHub
parent a7f86b3c1b
commit 5c4fd8b698
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 190 additions and 28 deletions

View File

@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableSet;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.che.api.core.ValidationException;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
@ -66,8 +65,7 @@ public class KubernetesInfrastructure extends RuntimeInfrastructure {
@Override
protected KubernetesRuntimeContext internalPrepare(
RuntimeIdentity id, InternalEnvironment environment)
throws ValidationException, InfrastructureException {
RuntimeIdentity id, InternalEnvironment environment) throws InfrastructureException {
final KubernetesEnvironment kubernetesEnvironment = asKubernetesEnv(environment);
k8sEnvProvisioner.provision(kubernetesEnvironment, id);
@ -76,7 +74,7 @@ public class KubernetesInfrastructure extends RuntimeInfrastructure {
}
private KubernetesEnvironment asKubernetesEnv(InternalEnvironment source)
throws ValidationException, InfrastructureException {
throws InfrastructureException {
if (source instanceof KubernetesEnvironment) {
return (KubernetesEnvironment) source;
}

View File

@ -43,7 +43,6 @@ import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.che.api.core.model.workspace.Warning;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.core.model.workspace.runtime.Machine;
import org.eclipse.che.api.core.model.workspace.runtime.MachineStatus;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.core.model.workspace.runtime.ServerStatus;
@ -53,6 +52,7 @@ import org.eclipse.che.api.workspace.server.hc.ServersCheckerFactory;
import org.eclipse.che.api.workspace.server.hc.probe.ProbeResult;
import org.eclipse.che.api.workspace.server.hc.probe.ProbeResult.ProbeStatus;
import org.eclipse.che.api.workspace.server.hc.probe.ProbeScheduler;
import org.eclipse.che.api.workspace.server.hc.probe.WorkspaceProbes;
import org.eclipse.che.api.workspace.server.hc.probe.WorkspaceProbesFactory;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
@ -548,14 +548,41 @@ public class KubernetesInternalRuntime<
}
}
public void startServersCheckers() throws InfrastructureException {
for (Entry<String, ? extends Machine> machineEntry : getMachines().entrySet()) {
/**
* Schedules server checkers.
*
* <p>Note that if the runtime is {@link WorkspaceStatus#RUNNING} then checkers will be scheduled
* immediately. If the runtime is {@link WorkspaceStatus#STARTING} then checkers will be scheduled
* when it becomes {@link WorkspaceStatus#RUNNING}. If runtime has any another status then
* checkers won't be scheduled at all.
*
* @throws InfrastructureException when any exception occurred
*/
public void scheduleServersCheckers() throws InfrastructureException {
WorkspaceStatus status = getStatus();
if (status != WorkspaceStatus.RUNNING && status != WorkspaceStatus.STARTING) {
return;
}
ServerLivenessHandler consumer = new ServerLivenessHandler();
WorkspaceProbes probes =
probesFactory.getProbes(getContext().getIdentity(), getInternalMachines());
if (status == WorkspaceStatus.RUNNING) {
probeScheduler.schedule(probes, consumer);
} else {
// Workspace is starting it is needed to start servers checkers when it becomes RUNNING
probeScheduler.schedule(
probesFactory.getProbes(
getContext().getIdentity(),
machineEntry.getKey(),
machineEntry.getValue().getServers()),
new ServerLivenessHandler());
probes,
consumer,
() -> {
try {
return getStatus();
} catch (InfrastructureException e) {
throw new RuntimeException(e.getMessage());
}
});
}
}
@ -661,9 +688,10 @@ public class KubernetesInternalRuntime<
}
/**
* @param ContainerEvent
* @return true if event reason or message matches one of the comma separated values defined in
* 'che.infra.kubernetes.workspace_unrecoverable_events',false otherwise
* Returns true if event reason or message matches one of the comma separated values defined in
* 'che.infra.kubernetes.workspace_unrecoverable_events',false otherwise
*
* @param event event to check
*/
private boolean isUnrecoverable(ContainerEvent event) {
boolean isUnrecoverable = false;

View File

@ -16,7 +16,6 @@ import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.che.api.core.ValidationException;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.InternalInfrastructureException;
@ -81,9 +80,7 @@ public class KubernetesRuntimeContext<T extends KubernetesEnvironment> extends R
namespaceFactory.create(workspaceId, runtimeState.getNamespace()),
getEnvironment().getWarnings());
if (runtime.getStatus() == WorkspaceStatus.RUNNING) {
runtime.startServersCheckers();
}
runtime.scheduleServersCheckers();
return runtime;
}

View File

@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@ -595,6 +596,53 @@ public class KubernetesInternalRuntimeTest {
assertFalse(runtimeStatesCache.get(internalRuntime.getContext().getIdentity()).isPresent());
}
@Test
public void shouldScheduleServerCheckersForRunningRuntime() throws Exception {
// given
runtimeStatesCache.putIfAbsent(
new KubernetesRuntimeState(
internalRuntime.getContext().getIdentity(), "test", WorkspaceStatus.RUNNING));
// when
internalRuntime.scheduleServersCheckers();
// then
verify(probesScheduler).schedule(any(), any());
}
@Test
public void shouldScheduleServerCheckersForStartingRuntime() throws Exception {
// given
runtimeStatesCache.putIfAbsent(
new KubernetesRuntimeState(
internalRuntime.getContext().getIdentity(), "test", WorkspaceStatus.STARTING));
// when
internalRuntime.scheduleServersCheckers();
// then
verify(probesScheduler).schedule(any(), any(), any());
}
@Test(dataProvider = "nonStartingRunningStatuses")
public void shouldNotScheduleServerCheckersIfRuntimeIsNotStartingOrRunning(WorkspaceStatus status)
throws Exception {
// given
runtimeStatesCache.putIfAbsent(
new KubernetesRuntimeState(internalRuntime.getContext().getIdentity(), "test", status));
// when
internalRuntime.scheduleServersCheckers();
// then
verifyZeroInteractions(probesScheduler);
}
@DataProvider(name = "nonStartingRunningStatuses")
public Object[][] nonStartingRunningStatuses() {
return new Object[][] {{WorkspaceStatus.STOPPED}, {WorkspaceStatus.STOPPING}};
}
private static MachineStatusEvent newEvent(String machineName, MachineStatus status) {
return newDto(MachineStatusEvent.class)
.withIdentity(DtoConverter.asDto(IDENTITY))

View File

@ -15,7 +15,6 @@ import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.che.api.core.ValidationException;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.core.model.workspace.runtime.RuntimeIdentity;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure;
@ -74,9 +73,7 @@ public class OpenShiftRuntimeContext extends KubernetesRuntimeContext<OpenShiftE
projectFactory.create(workspaceId, runtimeState.getNamespace()),
getEnvironment().getWarnings());
if (runtime.getStatus() == WorkspaceStatus.RUNNING) {
runtime.startServersCheckers();
}
runtime.scheduleServersCheckers();
return runtime;
}

View File

@ -23,9 +23,11 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.workspace.server.hc.probe.ProbeResult.ProbeStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,17 +82,55 @@ public class ProbeScheduler {
probeFactory -> schedule(probes.getWorkspaceId(), probeFactory, probeResultConsumer));
}
/**
* Schedules provided {@link WorkspaceProbes} when a workspace becomes {@link
* WorkspaceStatus#RUNNING}.
*
* <p>Note that probes scheduling will be canceled when {@link #cancel(String)} is called or when
* a workspace becomes {@link WorkspaceStatus#STOPPING} or {@link WorkspaceStatus#STOPPED}.
*
* @param probes probes to schedule
* @param probeResultConsumer consumer of {@link ProbeResult} instances produced on retrieving
* probe execution results
* @param statusSupplier supplier to retrieve workspace status. Scheduling will be delayed if
* {@link RuntimeException} is thrown
* @see #schedule(WorkspaceProbes, Consumer)
*/
public void schedule(
WorkspaceProbes probes,
Consumer<ProbeResult> probeResultConsumer,
Supplier<WorkspaceStatus> statusSupplier) {
DelayedSchedulingTask task =
new DelayedSchedulingTask(statusSupplier, probes, probeResultConsumer);
// scheduleWithFixedDelay is used in favor of scheduleAtFixedRate because in case of big amount
// of scheduled probes start time of tasks may shift and this may lead to a situation when
// another probeConfig is needed immediately after the previous one is finished which doesn't
// seem a good thing
ScheduledFuture scheduledFuture =
probesExecutor.scheduleWithFixedDelay(task, 10L, 10L, TimeUnit.SECONDS);
probesFutures.compute(
probes.getWorkspaceId(),
(key, scheduledFutures) -> {
List<ScheduledFuture> target = scheduledFutures;
if (target == null) {
target = new ArrayList<>();
}
target.add(scheduledFuture);
return target;
});
}
/**
* Dismisses following and if possible current executions of probes of a workspace with a
* specified ID.
*/
public void cancel(String workspaceId) {
List<ScheduledFuture> tasks = probesFutures.remove(workspaceId);
if (tasks == null) {
return;
if (tasks != null) {
tasks.forEach(task -> task.cancel(true));
}
tasks.forEach(task -> task.cancel(true));
}
/** Denies starting of new probes and terminates active one if scheduler not terminated yet. */
@ -216,6 +256,49 @@ public class ProbeScheduler {
}
}
private class DelayedSchedulingTask implements Runnable {
private final String workspaceId;
private final Supplier<WorkspaceStatus> statusSupplier;
private final WorkspaceProbes probes;
private final Consumer<ProbeResult> probeResultConsumer;
DelayedSchedulingTask(
Supplier<WorkspaceStatus> statusSupplier,
WorkspaceProbes probes,
Consumer<ProbeResult> probeResultConsumer) {
this.workspaceId = probes.getWorkspaceId();
this.statusSupplier = statusSupplier;
this.probes = probes;
this.probeResultConsumer = probeResultConsumer;
}
@Override
public void run() {
WorkspaceStatus status;
try {
status = statusSupplier.get();
} catch (RuntimeException e) {
// delay
return;
}
switch (status) {
case STARTING:
// delay
return;
case RUNNING:
ProbeScheduler.this.cancel(workspaceId);
schedule(probes, probeResultConsumer);
return;
case STOPPED:
case STOPPING:
default:
ProbeScheduler.this.cancel(workspaceId);
}
}
}
private class TimeoutProbeTask extends TimerTask {
private final Probe probe;

View File

@ -64,9 +64,20 @@ public class WorkspaceProbesFactory {
*/
public WorkspaceProbes getProbes(RuntimeIdentity runtimeId, Runtime runtime)
throws InfrastructureException {
return getProbes(runtimeId, runtime.getMachines());
}
/**
* Get {@link WorkspaceProbes} for servers of the specified machines
*
* @throws InfrastructureException when the operation fails
*/
public WorkspaceProbes getProbes(
RuntimeIdentity runtimeId, Map<String, ? extends Machine> machines)
throws InfrastructureException {
List<ProbeFactory> factories = new ArrayList<>();
try {
for (Entry<String, ? extends Machine> entry : runtime.getMachines().entrySet()) {
for (Entry<String, ? extends Machine> entry : machines.entrySet()) {
fillProbes(runtimeId, entry.getKey(), factories, entry.getValue().getServers());
}
} catch (MalformedURLException e) {