Monitoring & Tracing capability for multiple Thread pools of Che Server (#14727)

* Monitoring & Tracing capability for multiple Thread pools of Che Server
Signed-off-by: Sergii Kabashniuk <skabashniuk@redhat.com>
7.20.x
Sergii Kabashniuk 2019-10-30 14:11:02 +01:00 committed by GitHub
parent 09c9de959b
commit 3de4e7ffac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 4848 additions and 1131 deletions

View File

@ -75,14 +75,6 @@
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-tracerresolver</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
@ -187,6 +179,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-mail</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-schedule</artifactId>

View File

@ -1,107 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.api.deploy;
import com.google.inject.Binder;
import com.google.inject.Module;
import io.micrometer.core.instrument.Tags;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.che.api.deploy.jsonrpc.CheMajorWebSocketEndpointConfiguration;
import org.eclipse.che.api.deploy.jsonrpc.CheMajorWebSocketEndpointExecutorServiceProvider;
import org.eclipse.che.api.deploy.jsonrpc.CheMinorWebSocketEndpointConfiguration;
import org.eclipse.che.api.deploy.jsonrpc.CheMinorWebSocketEndpointExecutorServiceProvider;
import org.eclipse.che.core.metrics.ExecutorServiceMetrics;
/**
* {@link Module} that provides metered implementation for different classes. Metrics will be
* published to {@link PrometheusMeterRegistry}.
*/
public class MetricsOverrideBinding implements Module {
@Override
public void configure(Binder binder) {
binder
.bind(CheMajorWebSocketEndpointExecutorServiceProvider.class)
.to(MeteredCheMajorWebSocketEndpointExecutorServiceProvider.class);
binder
.bind(CheMinorWebSocketEndpointExecutorServiceProvider.class)
.to(MeteredCheMinorWebSocketEndpointExecutorServiceProvider.class);
}
@Singleton
public static class MeteredCheMajorWebSocketEndpointExecutorServiceProvider
extends CheMajorWebSocketEndpointExecutorServiceProvider {
private final PrometheusMeterRegistry meterRegistry;
private ExecutorService executorService;
@Inject
public MeteredCheMajorWebSocketEndpointExecutorServiceProvider(
@Named(JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize,
@Named(JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize,
@Named(JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity,
PrometheusMeterRegistry meterRegistry) {
super(corePoolSize, maxPoolSize, queueCapacity);
this.meterRegistry = meterRegistry;
}
@Override
public synchronized ExecutorService get() {
if (executorService == null) {
executorService =
ExecutorServiceMetrics.monitor(
meterRegistry,
super.get(),
CheMajorWebSocketEndpointConfiguration.EXECUTOR_NAME,
Tags.empty());
}
return executorService;
}
}
@Singleton
public static class MeteredCheMinorWebSocketEndpointExecutorServiceProvider
extends CheMinorWebSocketEndpointExecutorServiceProvider {
private final PrometheusMeterRegistry meterRegistry;
private ExecutorService executorService;
@Inject
public MeteredCheMinorWebSocketEndpointExecutorServiceProvider(
@Named(JSON_RPC_MINOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize,
@Named(JSON_RPC_MINOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize,
@Named(JSON_RPC_MINOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity,
PrometheusMeterRegistry meterRegistry) {
super(corePoolSize, maxPoolSize, queueCapacity);
this.meterRegistry = meterRegistry;
}
@Override
public synchronized ExecutorService get() {
if (executorService == null) {
executorService =
ExecutorServiceMetrics.monitor(
meterRegistry,
super.get(),
CheMinorWebSocketEndpointConfiguration.EXECUTOR_NAME,
Tags.empty());
}
return executorService;
}
}
}

View File

@ -68,6 +68,7 @@ import org.eclipse.che.api.workspace.server.token.MachineTokenProvider;
import org.eclipse.che.api.workspace.server.wsplugins.ChePluginsApplier;
import org.eclipse.che.commons.auth.token.ChainedTokenExtractor;
import org.eclipse.che.commons.auth.token.RequestTokenExtractor;
import org.eclipse.che.commons.observability.deploy.ExecutorWrapperModule;
import org.eclipse.che.core.db.DBTermination;
import org.eclipse.che.core.db.schema.SchemaInitializer;
import org.eclipse.che.core.tracing.metrics.TracingMetricsModule;
@ -260,12 +261,12 @@ public class WsMasterModule extends AbstractModule {
if (Boolean.valueOf(System.getenv("CHE_METRICS_ENABLED"))) {
install(new org.eclipse.che.core.metrics.MetricsModule());
install(new WsMasterMetricsModule());
install(new MetricsOverrideBinding());
}
if (Boolean.valueOf(System.getenv("CHE_TRACING_ENABLED"))
&& Boolean.valueOf(System.getenv("CHE_METRICS_ENABLED"))) {
install(new TracingMetricsModule());
}
install(new ExecutorWrapperModule());
}
private void configureSingleUserMode(Map<String, String> persistenceProperties) {

View File

@ -14,23 +14,12 @@ package org.eclipse.che.api.deploy.jsonrpc;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import java.util.concurrent.ExecutorService;
import org.eclipse.che.api.core.jsonrpc.commons.RequestProcessorConfigurationProvider;
/** Configures JSON RPC WebSocket Endpoints. */
public class CheJsonRpcWebSocketConfigurationModule implements Module {
@Override
public void configure(Binder binder) {
binder
.bind(ExecutorService.class)
.annotatedWith(Names.named(CheMajorWebSocketEndpointConfiguration.EXECUTOR_NAME))
.toProvider(CheMajorWebSocketEndpointExecutorServiceProvider.class);
binder
.bind(ExecutorService.class)
.annotatedWith(Names.named(CheMinorWebSocketEndpointConfiguration.EXECUTOR_NAME))
.toProvider(CheMinorWebSocketEndpointExecutorServiceProvider.class);
Multibinder<RequestProcessorConfigurationProvider.Configuration> configurationMultibinder =
Multibinder.newSetBinder(binder, RequestProcessorConfigurationProvider.Configuration.class);

View File

@ -11,10 +11,17 @@
*/
package org.eclipse.che.api.deploy.jsonrpc;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.che.api.core.jsonrpc.commons.RequestProcessorConfigurationProvider;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.lang.execution.ExecutorServiceBuilder;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.slf4j.Logger;
/**
* {@link RequestProcessorConfigurationProvider.Configuration} implementation used to configure
@ -23,13 +30,44 @@ import org.eclipse.che.api.core.jsonrpc.commons.RequestProcessorConfigurationPro
public class CheMajorWebSocketEndpointConfiguration
implements RequestProcessorConfigurationProvider.Configuration {
private static final Logger LOG = getLogger(CheMajorWebSocketEndpointConfiguration.class);
private final ExecutorService executor;
public static final String EXECUTOR_NAME = "che.core.jsonrpc.major_executor";
public static final String JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.processor_core_pool_size";
public static final String JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.processor_max_pool_size";
public static final String JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME =
"che.core.jsonrpc.processor_queue_capacity";
@Inject
public CheMajorWebSocketEndpointConfiguration(@Named(EXECUTOR_NAME) ExecutorService executor) {
this.executor = executor;
public CheMajorWebSocketEndpointConfiguration(
@Named(JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize,
@Named(JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize,
@Named(JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity,
ExecutorServiceWrapper wrapper) {
this.executor =
wrapper.wrap(
new ExecutorServiceBuilder()
.corePoolSize(corePoolSize)
.maxPoolSize(maxPoolSize)
.queueCapacity(queueCapacity)
.threadFactory(
new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setNameFormat(CheMajorWebSocketEndpoint.ENDPOINT_ID + "-%d")
.setDaemon(true)
.build())
.rejectedExecutionHandler(
(r, executor) ->
LOG.error(
"Executor on major websocket endpoint rejected to handle the payload {}. Some important messages may be lost. Consider increasing `{}`. Now it's configured to {}",
r,
JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME,
queueCapacity))
.build(),
CheMajorWebSocketEndpoint.ENDPOINT_ID);
}
@Override

View File

@ -1,52 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.api.deploy.jsonrpc;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.che.commons.lang.execution.ExecutorServiceProvider;
import org.slf4j.Logger;
/** {@link ExecutorService} provider used in {@link CheMajorWebSocketEndpoint}. */
@Singleton
public class CheMajorWebSocketEndpointExecutorServiceProvider extends ExecutorServiceProvider {
private static final Logger LOG = getLogger(ExecutorServiceProvider.class);
public static final String JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.processor_core_pool_size";
public static final String JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.processor_max_pool_size";
public static final String JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME =
"che.core.jsonrpc.processor_queue_capacity";
@Inject
public CheMajorWebSocketEndpointExecutorServiceProvider(
@Named(JSON_RPC_MAJOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize,
@Named(JSON_RPC_MAJOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize,
@Named(JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity) {
super(
corePoolSize,
maxPoolSize,
queueCapacity,
(r, executor) ->
LOG.error(
"Executor on major websocket endpoint rejected to handle the payload {}. Some important messages may be lost. Consider increasing `{}`. Now it's configured to {}",
r,
JSON_RPC_MAJOR_QUEUE_CAPACITY_PARAMETER_NAME,
queueCapacity));
}
}

View File

@ -11,10 +11,14 @@
*/
package org.eclipse.che.api.deploy.jsonrpc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.che.api.core.jsonrpc.commons.RequestProcessorConfigurationProvider;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.lang.execution.ExecutorServiceBuilder;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
/**
* {@link RequestProcessorConfigurationProvider.Configuration} implementation used to configure
@ -25,11 +29,33 @@ public class CheMinorWebSocketEndpointConfiguration
private final ExecutorService executor;
public static final String EXECUTOR_NAME = "che.core.jsonrpc.minor_executor";
public static final String JSON_RPC_MINOR_CORE_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.minor_processor_core_pool_size";
public static final String JSON_RPC_MINOR_MAX_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.minor_processor_max_pool_size";
public static final String JSON_RPC_MINOR_QUEUE_CAPACITY_PARAMETER_NAME =
"che.core.jsonrpc.minor_processor_queue_capacity";
@Inject
public CheMinorWebSocketEndpointConfiguration(@Named(EXECUTOR_NAME) ExecutorService executor) {
this.executor = executor;
public CheMinorWebSocketEndpointConfiguration(
@Named(JSON_RPC_MINOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize,
@Named(JSON_RPC_MINOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize,
@Named(JSON_RPC_MINOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity,
ExecutorServiceWrapper wrapper) {
this.executor =
wrapper.wrap(
new ExecutorServiceBuilder()
.corePoolSize(corePoolSize)
.maxPoolSize(maxPoolSize)
.queueCapacity(queueCapacity)
.threadFactory(
new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setNameFormat(CheMinorWebSocketEndpoint.ENDPOINT_ID + "-%d")
.setDaemon(true)
.build())
.build(),
CheMinorWebSocketEndpoint.ENDPOINT_ID);
}
@Override

View File

@ -1,39 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.api.deploy.jsonrpc;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.che.commons.lang.execution.ExecutorServiceProvider;
/** * {@link ExecutorService} provider used in {@link CheMinorWebSocketEndpoint}. */
@Singleton
public class CheMinorWebSocketEndpointExecutorServiceProvider extends ExecutorServiceProvider {
public static final String JSON_RPC_MINOR_CORE_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.minor_processor_core_pool_size";
public static final String JSON_RPC_MINOR_MAX_POOL_SIZE_PARAMETER_NAME =
"che.core.jsonrpc.minor_processor_max_pool_size";
public static final String JSON_RPC_MINOR_QUEUE_CAPACITY_PARAMETER_NAME =
"che.core.jsonrpc.minor_processor_queue_capacity";
@Inject
public CheMinorWebSocketEndpointExecutorServiceProvider(
@Named(JSON_RPC_MINOR_CORE_POOL_SIZE_PARAMETER_NAME) int corePoolSize,
@Named(JSON_RPC_MINOR_MAX_POOL_SIZE_PARAMETER_NAME) int maxPoolSize,
@Named(JSON_RPC_MINOR_QUEUE_CAPACITY_PARAMETER_NAME) int queueCapacity) {
super(corePoolSize, maxPoolSize, queueCapacity);
}
}

View File

@ -522,7 +522,6 @@ che.core.jsonrpc.processor_max_pool_size=50
# Initial json processing pool. Minimum number of threads that used to process major JSON RPC messages.
che.core.jsonrpc.processor_core_pool_size=5
# Configuration of queue used to process Json RPC messages.
# org.eclipse.che.commons.lang.execution.ExecutorServiceProvider contains more information about this parameter
che.core.jsonrpc.processor_queue_capacity=100000
## Configuration of major "/websocket-minor" endpoint
@ -532,7 +531,6 @@ che.core.jsonrpc.minor_processor_max_pool_size=100
# Initial json processing pool. Minimum number of threads that used to process minor JSON RPC messages.
che.core.jsonrpc.minor_processor_core_pool_size=15
# Configuration of queue used to process Json RPC messages.
# org.eclipse.che.commons.lang.execution.ExecutorServiceProvider contains more information about this parameter
che.core.jsonrpc.minor_processor_queue_capacity=10000
## Port the the http server endpoint that would be exposed with Prometheus metrics

View File

@ -1,72 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.micrometer.core.instrument.internal.TimedExecutorService;
import java.util.concurrent.ExecutorService;
import org.eclipse.che.api.deploy.jsonrpc.CheJsonRpcWebSocketConfigurationModule;
import org.eclipse.che.api.deploy.jsonrpc.CheMajorWebSocketEndpointConfiguration;
import org.eclipse.che.core.metrics.MetricsModule;
import org.testng.Assert;
import org.testng.annotations.Test;
public class MetricsOverrideBindingTest {
@Test
public void shouldInjectMettered() {
Injector injector =
Guice.createInjector(
new CheJsonRpcWebSocketConfigurationModule(),
new org.eclipse.che.api.deploy.MetricsOverrideBinding(),
new MetricsModule(),
new Module() {
@Override
public void configure(Binder binder) {
binder
.bindConstant()
.annotatedWith(Names.named("che.core.jsonrpc.processor_max_pool_size"))
.to(100);
binder
.bindConstant()
.annotatedWith(Names.named("che.core.jsonrpc.processor_core_pool_size"))
.to(4);
binder
.bindConstant()
.annotatedWith(Names.named("che.core.jsonrpc.minor_processor_max_pool_size"))
.to(100);
binder
.bindConstant()
.annotatedWith(Names.named("che.core.jsonrpc.minor_processor_core_pool_size"))
.to(3);
binder
.bindConstant()
.annotatedWith(Names.named("che.core.jsonrpc.processor_queue_capacity"))
.to(100);
binder
.bindConstant()
.annotatedWith(Names.named("che.core.jsonrpc.minor_processor_queue_capacity"))
.to(100);
binder.bindConstant().annotatedWith(Names.named("che.metrics.port")).to(100);
}
});
CheMajorWebSocketEndpointConfiguration configuration =
injector.getInstance(CheMajorWebSocketEndpointConfiguration.class);
ExecutorService exc = configuration.getExecutorService();
Assert.assertTrue(exc instanceof TimedExecutorService);
}
}

View File

@ -1,114 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.core.metrics;
import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class that is adding monitoring rejected messages ability to standard {@link
* io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics} metrics
*/
public class ExecutorServiceMetrics implements MeterBinder {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceMetrics.class);
private final Tags tags;
private final RejectedExecutionHandlerWrapper rejectedExecutionHandlerWrapper;
public ExecutorServiceMetrics(
RejectedExecutionHandlerWrapper rejectedExecutionHandlerWrapper,
String executorServiceName,
Iterable<Tag> tags) {
this.rejectedExecutionHandlerWrapper = rejectedExecutionHandlerWrapper;
this.tags = Tags.concat(tags, "name", executorServiceName);
}
public static ExecutorService monitor(
MeterRegistry registry, ExecutorService executor, String executorName, Iterable<Tag> tags) {
if (executor instanceof ThreadPoolExecutor) {
LOG.debug("Adding rejection monitoring for {} {}", executor, executorName);
monitorRejections(registry, (ThreadPoolExecutor) executor, executorName, tags);
}
return io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics.monitor(
registry, executor, executorName, tags);
}
public static ExecutorService monitorRejections(
MeterRegistry registry,
ThreadPoolExecutor executor,
String executorName,
Iterable<Tag> tags) {
RejectedExecutionHandlerWrapper rejectedExecutionHandler =
new RejectedExecutionHandlerWrapper(executor.getRejectedExecutionHandler());
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
new ExecutorServiceMetrics(rejectedExecutionHandler, executorName, tags).bindTo(registry);
return io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics.monitor(
registry, executor, executorName, tags);
}
@Override
public void bindTo(MeterRegistry registry) {
FunctionCounter.builder(
"executor.rejected",
rejectedExecutionHandlerWrapper,
RejectedExecutionHandlerWrapper::getCounter)
.tags(tags)
.description("The total number of tasks that have been rejected for execution")
.baseUnit("tasks")
.register(registry);
}
private static class RejectedExecutionHandlerWrapper
implements java.util.concurrent.RejectedExecutionHandler {
private final RejectedExecutionHandler handler;
private volatile long counter;
private RejectedExecutionHandlerWrapper(RejectedExecutionHandler handler) {
this.handler = handler;
}
@Override
public String toString() {
return "RejectedExecutionHandlerWrapper{"
+ "handler="
+ handler
+ ", counter="
+ counter
+ '}';
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
counter++;
if (handler != null) {
handler.rejectedExecution(r, executor);
}
}
public long getCounter() {
return this.counter;
}
}
}

View File

@ -16,6 +16,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics;
import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
@ -38,6 +39,7 @@ public class MetricsModule extends AbstractModule {
bind(PrometheusMeterRegistry.class)
.toProvider(PrometheusMeterRegistryProvider.class)
.asEagerSingleton();
bind(MeterRegistry.class).to(PrometheusMeterRegistry.class);
Multibinder<MeterBinder> meterMultibinder =
Multibinder.newSetBinder(binder(), MeterBinder.class);

View File

@ -30,10 +30,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>

View File

@ -0,0 +1,131 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.lang.execution;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Helper class to build {@link ExecutorService} from parts like corePoolSize, maxPoolSize,
* threadFactory, etc.
*
* @author Sergii Kabashniuk
*/
public class ExecutorServiceBuilder {
private int corePoolSize;
private int maxPoolSize;
private boolean allowCoreThreadTimeOut;
private Duration keepAliveTime;
private BlockingQueue<Runnable> workQueue;
private ThreadFactory threadFactory;
private RejectedExecutionHandler handler;
public ExecutorServiceBuilder() {
this.corePoolSize = 0;
this.maxPoolSize = 1;
this.allowCoreThreadTimeOut = false;
this.keepAliveTime = Duration.ofSeconds(60);
this.workQueue = new LinkedBlockingQueue<>();
this.threadFactory = Executors.defaultThreadFactory();
this.handler = new ThreadPoolExecutor.AbortPolicy();
}
/**
* @param corePoolSize - configure corePoolSize the number of threads to keep in the pool, even if
* they are idle, unless {@code allowCoreThreadTimeOut} is set.
*/
public ExecutorServiceBuilder corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
/** @param maxPoolSize - configure the maximum number of threads to allow in the pool. */
public ExecutorServiceBuilder maxPoolSize(int maxPoolSize) {
if (maxPoolSize < corePoolSize) {
throw new IllegalArgumentException("maxPoolSize has to be grater then corePoolSize");
}
this.maxPoolSize = maxPoolSize;
return this;
}
/** @param allowCoreThreadTimeOut - allow core threads to time out an terminate. */
public ExecutorServiceBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
return this;
}
/** @param time - configure keepAliveTime parameter of {@code ThreadPoolExecutor} */
public ExecutorServiceBuilder keepAliveTime(Duration time) {
this.keepAliveTime = time;
return this;
}
/**
* @param workQueue - configure the type of the task queue that would be used in {@code
* ThreadPoolExecutor}
*/
public ExecutorServiceBuilder workQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
return this;
}
/**
* @param queueCapacity - configure {@code LinkedBlockingQueue} with queueCapacity capacity to be
* used in {@code ThreadPoolExecutor}
*/
public ExecutorServiceBuilder queueCapacity(int queueCapacity) {
this.workQueue = new LinkedBlockingQueue<>(queueCapacity);
return this;
}
/**
* @param handler - configure instance of {@code RejectedExecutionHandler} to be used in {@code
* ThreadPoolExecutor}
*/
public ExecutorServiceBuilder rejectedExecutionHandler(RejectedExecutionHandler handler) {
this.handler = handler;
return this;
}
/**
* @param threadFactory - configure instance of {@code ThreadFactory} to be used in {@code
* ThreadPoolExecutor}
*/
public ExecutorServiceBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
public ExecutorService build() {
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime.toMillis(),
TimeUnit.MILLISECONDS,
workQueue,
threadFactory,
handler);
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return executor;
}
}

View File

@ -1,97 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.lang.execution;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Provider;
import javax.inject.Singleton;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.slf4j.Logger;
/**
* Configurable {@link Provider} of {@link ExecutorService}.
*
* <p>Allow to configure corePoolSize, maximumPoolSize, queueCapacity. It uses two different
* implementation of Queue. if queueCapacity > 0 then this is capacity of {@link
* LinkedBlockingQueue} if <=0 then {@link SynchronousQueue} is used.
*
* <p>Implementation add {@link java.util.concurrent.RejectedExecutionHandler} that is printing
* rejected message to the LOG.error. This can happen in case if there is no available Thread in
* ThreadPool and there is no capacity in the queue.
*
* @author Sergii Kabashniuk
*/
@Singleton
public class ExecutorServiceProvider implements Provider<ExecutorService> {
private static final Logger LOG = getLogger(ExecutorServiceProvider.class);
private final ThreadPoolExecutor executor;
/**
* @param corePoolSize - corePoolSize of ThreadPoolExecutor
* @param maxPoolSize - maximumPoolSize of ThreadPoolExecutor
* @param queueCapacity - queue capacity. if > 0 then this is capacity of {@link
* LinkedBlockingQueue} if <=0 then {@link SynchronousQueue} are used.
* @param rejectedExecutionHandler - {@link RejectedExecutionHandler} of ThreadPoolExecutor
*/
public ExecutorServiceProvider(
int corePoolSize,
int maxPoolSize,
int queueCapacity,
RejectedExecutionHandler rejectedExecutionHandler) {
ThreadFactory factory =
new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setNameFormat(this.getClass().getSimpleName() + "-%d")
.setDaemon(true)
.build();
executor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
SECONDS,
queueCapacity > 0 ? new LinkedBlockingQueue<>(queueCapacity) : new SynchronousQueue<>(),
factory);
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
executor.prestartCoreThread();
}
/**
* @param corePoolSize - corePoolSize of ThreadPoolExecutor
* @param maxPoolSize - maximumPoolSize of ThreadPoolExecutor
* @param queueCapacity - queue capacity. if > 0 then this is capacity of {@link
* LinkedBlockingQueue} if <=0 then {@link SynchronousQueue} are used.
*/
public ExecutorServiceProvider(int corePoolSize, int maxPoolSize, int queueCapacity) {
this(
corePoolSize,
maxPoolSize,
queueCapacity,
(r, e) -> LOG.warn("Executor rejected to handle the payload {}", r));
}
@Override
public ExecutorService get() {
return executor;
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.lang.execution;
import static org.testng.Assert.*;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.testng.annotations.Test;
public class ExecutorServiceBuilderTest {
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setNameFormat(ExecutorServiceBuilderTest.class + "-%d")
.setDaemon(true)
.build();
@Test
public void testBuild() {
ExecutorService executorService =
new ExecutorServiceBuilder()
.corePoolSize(6)
.maxPoolSize(12)
.queueCapacity(5000)
.threadFactory(threadFactory)
.build();
assertTrue(executorService instanceof ThreadPoolExecutor);
ThreadPoolExecutor threadPoolExecutorService = (ThreadPoolExecutor) executorService;
assertEquals(threadPoolExecutorService.getCorePoolSize(), 6);
assertEquals(threadPoolExecutorService.getMaximumPoolSize(), 12);
assertEquals(threadPoolExecutorService.getThreadFactory(), threadFactory);
}
@Test(
expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "maxPoolSize has to be grater then corePoolSize")
public void testSetMaxLowerThenCore() {
new ExecutorServiceBuilder().corePoolSize(6).maxPoolSize(1).build();
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.lang.execution;
import static org.testng.Assert.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ExecutorServiceProviderTest {
@Test
public void shouldProvideExecutorService() {
// given
ExecutorServiceProvider executorServiceProvider = new ExecutorServiceProvider(10, 10, 10);
// when
ExecutorService executorService = executorServiceProvider.get();
// then
Assert.assertNotNull(executorService);
}
@Test
public void shouldProvideExecutorServiceWithSynchronousQueue() {
// given
ExecutorServiceProvider executorServiceProvider = new ExecutorServiceProvider(10, 10, 0);
// when
ExecutorService executorService = executorServiceProvider.get();
// then
assertTrue(executorService instanceof ThreadPoolExecutor);
assertTrue(((ThreadPoolExecutor) executorService).getQueue() instanceof SynchronousQueue);
}
@Test
public void shouldProvideExecutorServiceWithLinkedBlockingQueue() {
// given
ExecutorServiceProvider executorServiceProvider = new ExecutorServiceProvider(10, 10, 10);
// when
ExecutorService executorService = executorServiceProvider.get();
// then
assertTrue(executorService instanceof ThreadPoolExecutor);
assertTrue(((ThreadPoolExecutor) executorService).getQueue() instanceof LinkedBlockingQueue);
}
@Test
public void shouldProvideExecutorServiceWithCorePoolSize() {
// given
ExecutorServiceProvider executorServiceProvider =
new ExecutorServiceProvider(100500, 100501, 10);
// when
ExecutorService executorService = executorServiceProvider.get();
// then
assertTrue(executorService instanceof ThreadPoolExecutor);
assertEquals(((ThreadPoolExecutor) executorService).getCorePoolSize(), 100500);
}
@Test
public void shouldProvideExecutorServiceWithMaxPoolSize() {
// given
ExecutorServiceProvider executorServiceProvider = new ExecutorServiceProvider(10, 28, 10);
// when
ExecutorService executorService = executorServiceProvider.get();
// then
assertTrue(executorService instanceof ThreadPoolExecutor);
assertEquals(((ThreadPoolExecutor) executorService).getMaximumPoolSize(), 28);
}
}

View File

@ -59,6 +59,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -33,6 +33,7 @@ import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import org.apache.commons.io.FileUtils;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,16 +51,18 @@ public class MailSender {
private final MailSessionProvider mailSessionProvider;
@Inject
public MailSender(MailSessionProvider mailSessionProvider) {
public MailSender(MailSessionProvider mailSessionProvider, ExecutorServiceWrapper wrapper) {
this.mailSessionProvider = mailSessionProvider;
this.executor =
newFixedThreadPool(
2 * Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder()
.setNameFormat("MailNotificationsPool-%d")
.setDaemon(false)
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.build());
wrapper.wrap(
newFixedThreadPool(
2 * Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder()
.setNameFormat("MailNotificationsPool-%d")
.setDaemon(false)
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.build()),
MailSender.class.getName());
}
public void sendAsync(EmailBean emailBean) {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import org.eclipse.che.commons.observability.NoopExecutorServiceWrapper;
import org.testng.ITestContext;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@ -71,7 +72,9 @@ public class MailSenderTest {
" mail.smtp.auth",
"false");
mailSender = new MailSender(new MailSessionProvider(mailConfiguration));
mailSender =
new MailSender(
new MailSessionProvider(mailConfiguration), new NoopExecutorServiceWrapper());
}
@AfterMethod

View File

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012-2018 Red Hat, Inc.
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Red Hat, Inc. - initial API and implementation
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>che-core-commons-parent</artifactId>
<groupId>org.eclipse.che.core</groupId>
<version>7.4.0-SNAPSHOT</version>
</parent>
<artifactId>che-core-commons-observability</artifactId>
<name>Che Core :: Commons :: Tracing and Monitoring wrapper</name>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-concurrent</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-schedule</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,184 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package io.micrometer.core.instrument.internal;
import static java.util.stream.Collectors.toList;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
import org.eclipse.che.commons.schedule.executor.CronExpression;
/**
* A {@link CronExecutorService} that is timed. It provides the same metrics as {@link
* io.micrometer.core.instrument.internal.TimedScheduledExecutorService} plus adds one extra counter
* {@code executor.scheduled.cron}. The counter represents the number of invocations of method
* {@link CronExecutorService#schedule(Runnable, CronExpression)}
*
* @author Sergii Kabashniuk
*/
public class TimedCronExecutorService implements CronExecutorService {
private final CronExecutorService delegate;
private final Counter scheduledCron;
private final MeterRegistry registry;
private final Timer executionTimer;
private final Timer idleTimer;
private final Counter scheduledOnce;
private final Counter scheduledRepetitively;
public TimedCronExecutorService(
MeterRegistry registry,
CronExecutorService delegate,
String executorServiceName,
Iterable<Tag> tags) {
this.registry = registry;
this.delegate = delegate;
this.executionTimer =
registry.timer("executor", Tags.concat(tags, "name", executorServiceName));
this.idleTimer =
registry.timer("executor.idle", Tags.concat(tags, "name", executorServiceName));
this.scheduledOnce =
registry.counter("executor.scheduled.once", Tags.concat(tags, "name", executorServiceName));
this.scheduledRepetitively =
registry.counter(
"executor.scheduled.repetitively", Tags.concat(tags, "name", executorServiceName));
this.scheduledCron =
registry.counter("executor.scheduled.cron", Tags.concat(tags, "name", executorServiceName));
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate.invokeAll(wrapAll(tasks));
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(wrapAll(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrapAll(tasks));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrapAll(tasks), timeout, unit);
}
@Override
public void execute(Runnable command) {
delegate.execute(wrap(command));
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
scheduledOnce.increment();
return delegate.schedule(executionTimer.wrap(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
scheduledOnce.increment();
return delegate.schedule(executionTimer.wrap(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
scheduledRepetitively.increment();
return delegate.scheduleAtFixedRate(executionTimer.wrap(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
scheduledRepetitively.increment();
return delegate.scheduleWithFixedDelay(executionTimer.wrap(command), initialDelay, delay, unit);
}
private Runnable wrap(Runnable task) {
return new TimedRunnable(registry, executionTimer, idleTimer, task);
}
private <T> Callable<T> wrap(Callable<T> task) {
return new TimedCallable<>(registry, executionTimer, idleTimer, task);
}
private <T> Collection<? extends Callable<T>> wrapAll(Collection<? extends Callable<T>> tasks) {
return tasks.stream().map(this::wrap).collect(toList());
}
@Override
public Future<?> schedule(Runnable task, CronExpression expression) {
scheduledCron.increment();
return delegate.schedule(executionTimer.wrap(task), expression);
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.BaseUnits;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Wrapper of {@link RejectedExecutionHandler} that reports to {@link MeterRegistry} number of
* rejections.
*/
public class CountedRejectedExecutionHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegate;
private final Counter counter;
public CountedRejectedExecutionHandler(
RejectedExecutionHandler delegate, MeterRegistry registry, String name, Iterable<Tag> tags) {
this.delegate = delegate;
this.counter =
Counter.builder("executor.rejected")
.tags(Tags.concat(tags, "name", name))
.description("The number of tasks that was not accepted for execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
counter.increment();
delegate.rejectedExecution(r, executor);
}
public static void monitorRejections(
MeterRegistry registry, ThreadPoolExecutor executor, String name, Iterable<Tag> tags) {
executor.setRejectedExecutionHandler(
new CountedRejectedExecutionHandler(
executor.getRejectedExecutionHandler(), registry, name, tags));
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.BaseUnits;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.constraints.NotNull;
/** A {@link ThreadFactory} that monitors the number of threads created, running and terminated. */
public class CountedThreadFactory implements ThreadFactory {
private final ThreadFactory delegate;
private final Counter created;
private final AtomicInteger running = new AtomicInteger(0);
private final Counter terminated;
/**
* Wraps a {@link ThreadFactory} with an explicit name and records the number of created, running
* and terminated threads.
*
* @param delegate {@link ThreadFactory} to wrap.
* @param registry {@link MeterRegistry} that will contain the metrics.
* @param name name for this delegate.
* @param tags tags that can provide additional context.
*/
public CountedThreadFactory(
ThreadFactory delegate, MeterRegistry registry, String name, Iterable<Tag> tags) {
this.delegate = delegate;
this.created =
Counter.builder("thread.factory.created")
.tags(Tags.concat(tags, "name", name))
.description("The approximate number of threads that create with thread factory")
.baseUnit(BaseUnits.THREADS)
.register(registry);
this.terminated =
Counter.builder("thread.factory.terminated")
.tags(Tags.concat(tags, "name", name))
.description("The approximate number of threads that is finished execution")
.baseUnit(BaseUnits.THREADS)
.register(registry);
Gauge.builder("thread.factory.running", running, AtomicInteger::get)
.tags(Tags.concat(tags, "name", name))
.description(
"The approximate number of threads that are started executing, but not terminated")
.baseUnit(BaseUnits.THREADS)
.register(registry);
}
/** {@inheritDoc} */
@Override
public Thread newThread(@NotNull Runnable runnable) {
Thread thread =
delegate.newThread(
() -> {
running.incrementAndGet();
try {
runnable.run();
} finally {
running.decrementAndGet();
terminated.increment();
}
});
created.increment();
return thread;
}
public static void monitorThreads(
MeterRegistry registry, ThreadPoolExecutor executor, String name, Iterable<Tag> tags) {
executor.setThreadFactory(
new CountedThreadFactory(executor.getThreadFactory(), registry, name, tags));
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import com.google.common.annotations.Beta;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
/**
* Wrapper of different implemntations of {@link ExecutorService}. At this moment supported: {@link
* ExecutorService}, {@link ScheduledExecutorService} and {@link CronExecutorService}.
*
* <p>Depending on implementation and environment configuration wrapper can add to the original
* class different capabilities like monitoring or tracing.
*
* @author Sergii Kabashniuk
*/
@Beta
public interface ExecutorServiceWrapper {
/**
* Creates wrapper for the given executor.
*
* @param executor {@link ExecutorService} that has to be wrapped.
* @param name unique name that can identify concrete instance of executor.
* @param tags key/value pairs that gives some context about provided executor.
* @return wrapped instance of given executor.
*/
ExecutorService wrap(ExecutorService executor, String name, String... tags);
/**
* Creates wrapper for the given executor.
*
* @param executor {@link ScheduledExecutorService} that has to be wrapped.
* @param name unique name that can identify concrete instance of executor.
* @param tags key/value pairs that gives some context about provided executor.
* @return wrapped instance of given executor.
*/
ScheduledExecutorService wrap(ScheduledExecutorService executor, String name, String... tags);
/**
* Creates wrapper for the given executor.
*
* @param executor {@link CronExecutorService} that has to be wrapped.
* @param name unique name that can identify concrete instance of executor.
* @param tags key/value pairs that gives some context about provided executor.
* @return wrapped instance of given executor.
*/
CronExecutorService wrap(CronExecutorService executor, String name, String... tags);
}

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
/**
* Implementation of {@code ExecutorServiceWrapper} that add all sort of monitoring and tracing
* capabilities. Monitoring allayed first, tracing second.
*/
@Singleton
public class MeteredAndTracedExecutorServiceWrapper implements ExecutorServiceWrapper {
private final MeteredExecutorServiceWrapper meteredExecutorServiceWrapper;
private final TracedExecutorServiceWrapper tracedExecutorServiceWrapper;
@Inject
public MeteredAndTracedExecutorServiceWrapper(
MeteredExecutorServiceWrapper meteredExecutorServiceWrapper,
TracedExecutorServiceWrapper tracedExecutorServiceWrapper) {
this.meteredExecutorServiceWrapper = meteredExecutorServiceWrapper;
this.tracedExecutorServiceWrapper = tracedExecutorServiceWrapper;
}
@Override
public ExecutorService wrap(ExecutorService executor, String name, String... tags) {
return tracedExecutorServiceWrapper.wrap(
meteredExecutorServiceWrapper.wrap(executor, name, tags), name, tags);
}
@Override
public ScheduledExecutorService wrap(
ScheduledExecutorService executor, String name, String... tags) {
return tracedExecutorServiceWrapper.wrap(
meteredExecutorServiceWrapper.wrap(executor, name, tags), name, tags);
}
@Override
public CronExecutorService wrap(CronExecutorService executor, String name, String... tags) {
return tracedExecutorServiceWrapper.wrap(
meteredExecutorServiceWrapper.wrap(executor, name, tags), name, tags);
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.internal.TimedCronExecutorService;
import io.micrometer.core.lang.Nullable;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@code ExecutorServiceWrapper} that add all sort of monitoring capabilities
* from {@code ExecutorServiceMetrics}.
*
* <p>Also in case if a provided executor is an instance of {@code ThreadPoolExecutor} it will add
* metrics provided by {@code CountedThreadFactory} and {@code CountedRejectedExecutionHandler}. In
* case if {@code ExecutorService} provided by {@code Executors} class are the instances of
* java.util.concurrent.Executors$DelegatedScheduledExecutorService or
* java.util.concurrent.Executors$FinalizableDelegatedExecutorService there would be an attempt to
* unwrap them to get underlying {@code ThreadPoolExecutor} to be able to provide {@code
* CountedThreadFactory} and {@code CountedRejectedExecutionHandler} statistics. Failed unwrapping
* attempt would be only logged, no exception would be raised and no additional statistic would be
* published.
*/
@Singleton
public class MeteredExecutorServiceWrapper implements ExecutorServiceWrapper {
private static final Logger LOG = LoggerFactory.getLogger(MeteredExecutorServiceWrapper.class);
private final MeterRegistry meterRegistry;
@Inject
public MeteredExecutorServiceWrapper(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Override
public ScheduledExecutorService wrap(
ScheduledExecutorService executor, String name, String... tags) {
monitorThreadPoolExecutor(executor, name, tags);
return ExecutorServiceMetrics.monitor(meterRegistry, executor, name, Tags.of(tags));
}
@Override
public ExecutorService wrap(ExecutorService executor, String name, String... tags) {
monitorThreadPoolExecutor(executor, name, tags);
return ExecutorServiceMetrics.monitor(meterRegistry, executor, name, Tags.of(tags));
}
@Override
public CronExecutorService wrap(CronExecutorService executor, String name, String... tags) {
monitorThreadPoolExecutor(executor, name, tags);
new io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics(
executor, name, Tags.of(tags))
.bindTo(meterRegistry);
return new TimedCronExecutorService(meterRegistry, executor, name, Tags.of(tags));
}
private void monitorThreadPoolExecutor(ExecutorService executor, String name, String... tags) {
String className = executor.getClass().getName();
ThreadPoolExecutor unwrappedThreadPoolExecutor = null;
if (executor instanceof ThreadPoolExecutor) {
unwrappedThreadPoolExecutor = (ThreadPoolExecutor) executor;
} else if (className.equals(
"java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {
unwrappedThreadPoolExecutor = unwrapThreadPoolExecutor(executor, executor.getClass());
} else if (className.equals(
"java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {
unwrappedThreadPoolExecutor =
unwrapThreadPoolExecutor(executor, executor.getClass().getSuperclass());
}
if (unwrappedThreadPoolExecutor != null) {
CountedThreadFactory.monitorThreads(
meterRegistry, unwrappedThreadPoolExecutor, name, Tags.of(tags));
CountedRejectedExecutionHandler.monitorRejections(
meterRegistry, unwrappedThreadPoolExecutor, name, Tags.of(tags));
}
}
/**
* Every ScheduledThreadPoolExecutor created by {@link Executors} is wrapped. Also, {@link
* Executors#newSingleThreadExecutor()} wrap a regular {@link ThreadPoolExecutor}.
*/
@Nullable
private ThreadPoolExecutor unwrapThreadPoolExecutor(ExecutorService executor, Class<?> wrapper) {
try {
Field e = wrapper.getDeclaredField("e");
e.setAccessible(true);
return (ThreadPoolExecutor) e.get(executor);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error(
String.format(
"Unable to unwrap ThreadPoolExecutor from %s instance of %s."
+ " CountedThreadFactory and CountedThreadFactory statistic would be omitted",
executor, wrapper),
e);
}
return null;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Singleton;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
/**
* Implementation of {@link ExecutorServiceWrapper} that adds nothing. All wrap methods return the
* same instance as result.
*
* @author Sergii Kabashniuk
*/
@Singleton
public class NoopExecutorServiceWrapper implements ExecutorServiceWrapper {
@Override
public ExecutorService wrap(ExecutorService executor, String name, String... tags) {
return executor;
}
@Override
public ScheduledExecutorService wrap(
ScheduledExecutorService executor, String name, String... tags) {
return executor;
}
@Override
public CronExecutorService wrap(CronExecutorService executor, String name, String... tags) {
return executor;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.schedule.executor.CronThreadPoolExecutor;
import org.eclipse.che.commons.schedule.executor.ThreadPullLauncher;
/** Monitored and traced implementation of {@code ThreadPullLauncher}. */
@Singleton
public class ObservableThreadPullLauncher extends ThreadPullLauncher {
@Inject
public ObservableThreadPullLauncher(
ExecutorServiceWrapper wrapper, @Named("schedule.core_pool_size") Integer corePoolSize) {
super(
wrapper.wrap(
new CronThreadPoolExecutor(
corePoolSize,
new ThreadFactoryBuilder()
.setNameFormat("Annotated-scheduler-%d")
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setDaemon(false)
.build()),
CronThreadPoolExecutor.class.getName()));
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import io.opentracing.Tracer;
import io.opentracing.contrib.concurrent.TracedRunnable;
import io.opentracing.contrib.concurrent.TracedScheduledExecutorService;
import java.util.concurrent.Future;
import javax.inject.Inject;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
import org.eclipse.che.commons.schedule.executor.CronExpression;
/**
* Executor which propagates span from parent thread to submitted. Optionally it creates parent span
* if traceWithActiveSpanOnly = false.
*/
public class TracedCronExecutorService extends TracedScheduledExecutorService
implements CronExecutorService {
private final CronExecutorService delegate;
@Inject
public TracedCronExecutorService(CronExecutorService delegate, Tracer tracer) {
super(delegate, tracer);
this.delegate = delegate;
}
@Override
public Future<?> schedule(Runnable task, CronExpression expression) {
return delegate.schedule(
tracer.activeSpan() == null ? task : new TracedRunnable(task, tracer), expression);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import io.opentracing.Tracer;
import io.opentracing.contrib.concurrent.TracedExecutorService;
import io.opentracing.contrib.concurrent.TracedScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
/**
* Implementation of {@code ExecutorServiceWrapper} that add all sort of tracing capabilities with
* help of traced implementation.
*/
@Singleton
public class TracedExecutorServiceWrapper implements ExecutorServiceWrapper {
private final Tracer tracer;
@Inject
public TracedExecutorServiceWrapper(Tracer tracer) {
this.tracer = tracer;
}
@Override
public ExecutorService wrap(ExecutorService executor, String name, String... tags) {
return new TracedExecutorService(executor, tracer);
}
@Override
public ScheduledExecutorService wrap(
ScheduledExecutorService executor, String name, String... tags) {
return new TracedScheduledExecutorService(executor, tracer);
}
@Override
public CronExecutorService wrap(CronExecutorService executor, String name, String... tags) {
return new TracedCronExecutorService(executor, tracer);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability.deploy;
import com.google.inject.AbstractModule;
import org.eclipse.che.commons.observability.*;
import org.eclipse.che.commons.schedule.executor.ThreadPullLauncher;
public class ExecutorWrapperModule extends AbstractModule {
@Override
protected void configure() {
if (Boolean.parseBoolean(System.getenv("CHE_METRICS_ENABLED"))) {
if (Boolean.parseBoolean(System.getenv("CHE_TRACING_ENABLED"))) {
bind(ExecutorServiceWrapper.class)
.to(MeteredAndTracedExecutorServiceWrapper.class)
.asEagerSingleton();
} else {
bind(ExecutorServiceWrapper.class)
.to(MeteredExecutorServiceWrapper.class)
.asEagerSingleton();
}
} else {
if (Boolean.parseBoolean(System.getenv("CHE_TRACING_ENABLED"))) {
bind(ExecutorServiceWrapper.class)
.to(TracedExecutorServiceWrapper.class)
.asEagerSingleton();
} else {
bind(ExecutorServiceWrapper.class).to(NoopExecutorServiceWrapper.class).asEagerSingleton();
}
}
bind(ThreadPullLauncher.class).to(ObservableThreadPullLauncher.class);
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package io.micrometer.core.instrument.internal;
import static org.testng.Assert.assertEquals;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.text.ParseException;
import java.util.concurrent.CountDownLatch;
import org.eclipse.che.commons.schedule.executor.CronExpression;
import org.eclipse.che.commons.schedule.executor.CronThreadPoolExecutor;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TimedCronExecutorServiceTest {
private SimpleMeterRegistry registry;
private Iterable<Tag> userTags = Tags.of("userTagKey", "userTagValue");
@BeforeMethod
public void setup() {
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, new MockClock());
}
@Test
public void executor() throws InterruptedException, ParseException {
// given
TimedCronExecutorService executorService =
new TimedCronExecutorService(
registry,
new CronThreadPoolExecutor(1),
TimedCronExecutorServiceTest.class.getName(),
userTags);
CountDownLatch lock = new CountDownLatch(1);
// when
executorService.schedule(
() -> {
lock.countDown();
},
// one time per second
new CronExpression(" * * * ? * * *"));
lock.await();
// then
assertEquals(
registry
.get("executor.scheduled.cron")
.tags(userTags)
.tag("name", TimedCronExecutorServiceTest.class.getName())
.counter()
.count(),
1.0);
assertEquals(
registry
.get("executor")
.tags(userTags)
.tag("name", TimedCronExecutorServiceTest.class.getName())
.timer()
.count(),
1L);
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import static org.testng.Assert.assertEquals;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class CountedRejectedExecutionHandlerTest {
private SimpleMeterRegistry registry;
private Iterable<Tag> userTags = Tags.of("userTagKey", "userTagValue");
@BeforeMethod
public void setup() {
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, new MockClock());
}
@Test
public void countRejections() {
// given
ThreadPoolExecutor executor =
new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
executor.setRejectedExecutionHandler((r, executor1) -> {});
CountedRejectedExecutionHandler.monitorRejections(
registry, executor, CountedRejectedExecutionHandler.class.getName(), userTags);
CountDownLatch runnableTaskComplete = new CountDownLatch(1);
Runnable stub =
() -> {
try {
runnableTaskComplete.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException("runnable interrupted before completion");
}
};
executor.submit(stub);
// then
for (int i = 0; i < 14; i++) {
executor.submit(
() -> {
// do nothing. Task has to be rejected.
});
}
// when
assertEquals(
registry
.get("executor.rejected")
.tags(userTags)
.tag("name", CountedRejectedExecutionHandler.class.getName())
.counter()
.count(),
14.0);
// cleanup
runnableTaskComplete.countDown();
executor.shutdownNow();
}
}

View File

@ -0,0 +1,224 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class CountedThreadFactoryTest {
private SimpleMeterRegistry registry;
private Iterable<Tag> userTags = Tags.of("userTagKey", "userTagValue");
@BeforeMethod
public void setup() {
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, new MockClock());
}
@Test
public void shouldCountCreatedThreads() {
// given
ThreadFactory factory =
new CountedThreadFactory(
Executors.defaultThreadFactory(),
registry,
CountedThreadFactoryTest.class.getName(),
userTags);
// when
factory.newThread(() -> {});
// then
assertEquals(
registry
.get("thread.factory.created")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.counter()
.count(),
1.0);
}
@Test
public void shouldCountRunningThreads() throws InterruptedException {
// given
ThreadFactory factory =
new CountedThreadFactory(
Executors.defaultThreadFactory(),
registry,
CountedThreadFactoryTest.class.getName(),
userTags);
CountDownLatch runnableTaskStart = new CountDownLatch(1);
CountDownLatch runnableTaskComplete = new CountDownLatch(1);
Runnable task =
() -> {
runnableTaskStart.countDown();
try {
Assert.assertTrue(runnableTaskComplete.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new IllegalStateException("runnable interrupted before completion");
}
};
// when
Thread thread = factory.newThread(task);
thread.start();
// then
runnableTaskStart.await();
assertEquals(
registry
.get("thread.factory.running")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.gauge()
.value(),
1.0);
runnableTaskComplete.countDown();
thread.join();
assertEquals(
registry
.get("thread.factory.running")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.gauge()
.value(),
0.0);
// put here to ensure that thread are not GCd
assertFalse(thread.isAlive());
// put here to ensure that factory are not GCd
assertNotNull(factory);
}
@Test
public void shouldCountRunningAndTerminatedThreadsInExecutorPool()
throws InterruptedException, TimeoutException, ExecutionException {
// given
JoinableThreadFactory factory =
new JoinableThreadFactory(
new CountedThreadFactory(
Executors.defaultThreadFactory(),
registry,
CountedThreadFactoryTest.class.getName(),
userTags));
ExecutorService executor = Executors.newCachedThreadPool(factory);
CountDownLatch runnableTaskStart = new CountDownLatch(10);
CountDownLatch runnableTaskComplete = new CountDownLatch(1);
Runnable task =
() -> {
runnableTaskStart.countDown();
try {
Assert.assertTrue(runnableTaskComplete.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new IllegalStateException("runnable interrupted before completion");
}
};
List<Future> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
futures.add(executor.submit(task));
}
runnableTaskStart.await();
assertEquals(
registry
.get("thread.factory.running")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.gauge()
.value(),
10.0);
assertEquals(
registry
.get("thread.factory.terminated")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.counter()
.count(),
0.0);
runnableTaskComplete.countDown();
for (Future future : futures) {
future.get(1, TimeUnit.MINUTES);
}
executor.shutdownNow();
Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
factory.joinAll();
assertEquals(
registry
.get("thread.factory.running")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.gauge()
.value(),
0.0);
assertEquals(
registry
.get("thread.factory.terminated")
.tags(userTags)
.tag("name", CountedThreadFactoryTest.class.getName())
.counter()
.count(),
10.0);
// put here to ensure that factory are not GCd
assertNotNull(factory);
}
public static class JoinableThreadFactory implements ThreadFactory {
private final ThreadFactory delegate;
private final List<Thread> threads;
public JoinableThreadFactory(ThreadFactory delegate) {
this.delegate = delegate;
this.threads = new ArrayList<>();
}
@Override
public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
threads.add(result);
return result;
}
public void joinAll() throws InterruptedException {
for (Thread thread : threads) {
if (thread.isAlive()) {
thread.join();
}
}
}
}
}

View File

@ -0,0 +1,459 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.text.ParseException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
import org.eclipse.che.commons.schedule.executor.CronExpression;
import org.eclipse.che.commons.schedule.executor.CronThreadPoolExecutor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class MeteredExecutorServiceWrapperTest {
private MeterRegistry registry;
private ExecutorServiceWrapper executorServiceWrapper;
private Iterable<Tag> userTags = Tags.of("userTagKey", "userTagValue");
private ExecutorService executor;
@BeforeMethod
public void setup() {
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, new MockClock());
executorServiceWrapper = new MeteredExecutorServiceWrapper(registry);
}
@AfterMethod
public void cleanup() {
if (executor == null) return;
// Tell threads to finish off.
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(60, TimeUnit.SECONDS))
System.out.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
@Test
public void shouldRecordExecutorServiceMetrics()
throws InterruptedException, TimeoutException, ExecutionException {
// given
executor =
executorServiceWrapper.wrap(
Executors.newSingleThreadExecutor(),
MeteredExecutorServiceWrapperTest.class.getName(),
"userTagKey",
"userTagValue");
CountDownLatch runnableTaskStart = new CountDownLatch(1);
// when
Future<?> future =
executor.submit(
() -> {
runnableTaskStart.countDown();
});
// then
runnableTaskStart.await(10, TimeUnit.SECONDS);
future.get(1, TimeUnit.MINUTES);
assertEquals(
registry
.get("thread.factory.terminated")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
0.0);
assertEquals(
registry
.get("thread.factory.running")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.gauge()
.value(),
1.0);
assertEquals(
registry
.get("thread.factory.created")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
1.0);
assertEquals(
registry
.get("executor.rejected")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
0.0);
assertEquals(
registry
.get("executor.pool.size")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
1.0);
assertEquals(
registry
.get("executor.completed")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.functionCounter()
.count(),
1.0);
assertEquals(
registry
.get("executor.queued")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
0.0);
assertEquals(
registry
.get("executor.idle")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.timer()
.count(),
1);
assertEquals(
registry
.get("executor.queue.remaining")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
new Double(Integer.MAX_VALUE));
assertEquals(
registry
.get("executor")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.timer()
.count(),
1);
assertEquals(
registry
.get("executor.active")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
0.0);
}
@Test
public void shouldRecordScheduledExecutorServiceMetrics() throws InterruptedException {
// given
executor =
executorServiceWrapper.wrap(
Executors.newSingleThreadScheduledExecutor(),
MeteredExecutorServiceWrapperTest.class.getName(),
"userTagKey",
"userTagValue");
CountDownLatch runnableTaskStart = new CountDownLatch(1);
// when
((ScheduledExecutorService) executor)
.scheduleAtFixedRate(
() -> {
runnableTaskStart.countDown();
},
0,
100,
TimeUnit.SECONDS);
// then
runnableTaskStart.await(10, TimeUnit.SECONDS);
assertEquals(
registry
.get("thread.factory.terminated")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
0.0);
assertEquals(
registry
.get("thread.factory.running")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.gauge()
.value(),
1.0);
assertEquals(
registry
.get("thread.factory.created")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
1.0);
assertEquals(
registry
.get("executor.rejected")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
0.0);
assertEquals(
registry
.get("executor.pool.size")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
1.0);
assertWithRetry(
() ->
registry
.get("executor.completed")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.functionCounter()
.count(),
1.0,
10,
50);
assertEquals(
registry
.get("executor.queued")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
1.0);
assertEquals(
registry
.get("executor.idle")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.timer()
.count(),
0);
assertEquals(
registry
.get("executor.queue.remaining")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
new Double(Integer.MAX_VALUE));
assertEquals(
registry
.get("executor")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.timer()
.count(),
1);
assertEquals(
registry
.get("executor.active")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
0.0);
assertEquals(
registry
.get("executor.scheduled.once")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.counter()
.count(),
0.0);
assertEquals(
registry
.get("executor.scheduled.repetitively")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.counter()
.count(),
1.0);
}
@Test
public void shouldRecordCronExecutorServiceMetrics() throws InterruptedException, ParseException {
// given
CronExecutorService executor =
executorServiceWrapper.wrap(
new CronThreadPoolExecutor(1),
MeteredExecutorServiceWrapperTest.class.getName(),
"userTagKey",
"userTagValue");
CountDownLatch runnableTaskStart = new CountDownLatch(1);
// when
executor.schedule(
() -> {
runnableTaskStart.countDown();
},
new CronExpression(" * * * ? * * *"));
// then
runnableTaskStart.await(10, TimeUnit.SECONDS);
assertEquals(
registry
.get("thread.factory.terminated")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
0.0);
assertEquals(
registry
.get("thread.factory.running")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.gauge()
.value(),
2.0);
assertEquals(
registry
.get("thread.factory.created")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
2.0);
assertEquals(
registry
.get("executor.rejected")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.counter()
.count(),
0.0);
assertEquals(
registry
.get("executor.pool.size")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
2.0);
assertEquals(
registry
.get("executor.completed")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.functionCounter()
.count(),
1.0);
assertWithRetry(
() ->
registry
.get("executor.queued")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
1.0,
10,
50);
assertEquals(
registry
.get("executor.idle")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.timer()
.count(),
0);
assertTrue(
registry
.get("executor.queue.remaining")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value()
> 0.0);
assertEquals(
registry
.get("executor")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.timer()
.count(),
1);
assertEquals(
registry
.get("executor.active")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.gauge()
.value(),
1.0);
assertEquals(
registry
.get("executor.scheduled.once")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.counter()
.count(),
0.0);
assertEquals(
registry
.get("executor.scheduled.repetitively")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.counter()
.count(),
0.0);
assertEquals(
registry
.get("executor.scheduled.cron")
.tag("name", MeteredExecutorServiceWrapperTest.class.getName())
.tags(userTags)
.counter()
.count(),
1.0);
}
public <V> void assertWithRetry(Supplier<V> predicate, V expected, int times, int pause_millis)
throws InterruptedException {
for (int i = 0; i <= times; i++) {
V actual = predicate.get();
if (expected.equals(actual)) {
return;
} else if (i + 1 <= times) {
Thread.sleep(pause_millis);
}
}
Assert.fail("Not able to get expected value " + expected + " with " + times + " retries");
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2012-2018 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Red Hat, Inc. - initial API and implementation
*/
package org.eclipse.che.commons.observability;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.che.commons.schedule.executor.CronExecutorService;
import org.eclipse.che.commons.schedule.executor.CronThreadPoolExecutor;
import org.testng.Assert;
import org.testng.annotations.Test;
public class NoopExecutorServiceWrapperTest {
NoopExecutorServiceWrapper noopExecutorServiceWrapper = new NoopExecutorServiceWrapper();
@Test
public void testWrapExecutorService() {
// given
ExecutorService executorService = Executors.newSingleThreadExecutor();
// when
ExecutorService result =
noopExecutorServiceWrapper.wrap(
executorService, NoopExecutorServiceWrapper.class.getName(), "key", "value");
// then
Assert.assertSame(result, executorService);
}
@Test
public void testWrapScheduledExecutorService() {
// given
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// when
ScheduledExecutorService result =
noopExecutorServiceWrapper.wrap(
executorService, NoopExecutorServiceWrapper.class.getName(), "key", "value");
// then
Assert.assertSame(result, executorService);
}
@Test
public void testWrapCronExecutorService() {
// given
CronExecutorService executorService = new CronThreadPoolExecutor(1);
// when
CronExecutorService result =
noopExecutorServiceWrapper.wrap(
executorService, NoopExecutorServiceWrapper.class.getName(), "key", "value");
// then
Assert.assertSame(result, executorService);
}
}

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012-2018 Red Hat, Inc.
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Red Hat, Inc. - initial API and implementation
-->
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-41(%date[%.15thread]) %-45([%-5level] [%.30logger{30} %L]) - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
</configuration>

View File

@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
@Singleton
public class ThreadPullLauncher implements Launcher {
private static final Logger LOG = LoggerFactory.getLogger(CronThreadPoolExecutor.class);
private final CronThreadPoolExecutor service;
private final CronExecutorService service;
/**
* @param corePoolSize the number of threads to keep in the pool, even if they are idle, unless
@ -41,14 +41,18 @@ public class ThreadPullLauncher implements Launcher {
*/
@Inject
public ThreadPullLauncher(@Named("schedule.core_pool_size") Integer corePoolSize) {
this.service =
this(
new CronThreadPoolExecutor(
corePoolSize,
new ThreadFactoryBuilder()
.setNameFormat("Annotated-scheduler-%d")
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setDaemon(false)
.build());
.build()));
}
protected ThreadPullLauncher(CronExecutorService service) {
this.service = service;
}
@PreDestroy

View File

@ -36,5 +36,6 @@
<module>che-core-commons-j2ee</module>
<module>che-core-commons-mail</module>
<module>che-core-commons-tracing</module>
<module>che-core-commons-observability</module>
</modules>
</project>

File diff suppressed because it is too large Load Diff

View File

@ -150,6 +150,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-schedule</artifactId>

View File

@ -36,6 +36,7 @@ import javax.inject.Singleton;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.lang.concurrent.ThreadLocalPropagateContext;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespaceFactory;
import org.eclipse.che.workspace.infrastructure.kubernetes.provision.SecurityContextProvisioner;
@ -85,19 +86,22 @@ public class PVCSubPathHelper {
@Named("che.infra.kubernetes.pvc.jobs.memorylimit") String jobMemoryLimit,
@Named("che.infra.kubernetes.pvc.jobs.image") String jobImage,
KubernetesNamespaceFactory factory,
SecurityContextProvisioner securityContextProvisioner) {
SecurityContextProvisioner securityContextProvisioner,
ExecutorServiceWrapper executorServiceWrapper) {
this.jobMemoryLimit = jobMemoryLimit;
this.jobImage = jobImage;
this.factory = factory;
this.securityContextProvisioner = securityContextProvisioner;
this.executor =
Executors.newFixedThreadPool(
COUNT_THREADS,
new ThreadFactoryBuilder()
.setNameFormat("PVCSubPathHelper-ThreadPool-%d")
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setDaemon(false)
.build());
executorServiceWrapper.wrap(
Executors.newFixedThreadPool(
COUNT_THREADS,
new ThreadFactoryBuilder()
.setNameFormat("PVCSubPathHelper-ThreadPool-%d")
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.setDaemon(false)
.build()),
PVCSubPathHelper.class.getName());
}
/**

View File

@ -17,6 +17,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
/**
* Provides single {@link ExecutorService} instance with daemon threads for Kubernetes/Openshfit
@ -30,13 +31,15 @@ public class KubernetesSharedPool {
private final ExecutorService executor;
@Inject
public KubernetesSharedPool() {
public KubernetesSharedPool(ExecutorServiceWrapper executorServiceWrapper) {
final ThreadFactory factory =
new ThreadFactoryBuilder()
.setNameFormat("KubernetesMachineSharedPool-%d")
.setDaemon(true)
.build();
this.executor = Executors.newCachedThreadPool(factory);
this.executor =
executorServiceWrapper.wrap(
Executors.newCachedThreadPool(factory), KubernetesSharedPool.class.getName());
}
public ExecutorService getExecutor() {

View File

@ -105,6 +105,7 @@ import org.eclipse.che.api.workspace.server.spi.environment.InternalMachineConfi
import org.eclipse.che.api.workspace.server.spi.provision.InternalEnvironmentProvisioner;
import org.eclipse.che.api.workspace.shared.dto.event.MachineStatusEvent;
import org.eclipse.che.api.workspace.shared.dto.event.RuntimeLogEvent;
import org.eclipse.che.commons.observability.NoopExecutorServiceWrapper;
import org.eclipse.che.workspace.infrastructure.kubernetes.cache.KubernetesMachineCache;
import org.eclipse.che.workspace.infrastructure.kubernetes.cache.KubernetesRuntimeStateCache;
import org.eclipse.che.workspace.infrastructure.kubernetes.environment.KubernetesEnvironment;
@ -250,7 +251,7 @@ public class KubernetesInternalRuntimeTest {
probesScheduler,
workspaceProbesFactory,
eventPublisher,
new KubernetesSharedPool(),
new KubernetesSharedPool(new NoopExecutorServiceWrapper()),
runtimeStatesCache,
machinesCache,
startSynchronizerFactory,

View File

@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.eclipse.che.api.workspace.server.spi.InfrastructureException;
import org.eclipse.che.commons.observability.NoopExecutorServiceWrapper;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesDeployments;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespace;
import org.eclipse.che.workspace.infrastructure.kubernetes.namespace.KubernetesNamespaceFactory;
@ -79,7 +80,11 @@ public class PVCSubPathHelperTest {
public void setup() throws Exception {
pvcSubPathHelper =
new PVCSubPathHelper(
jobMemoryLimit, jobImage, k8sNamespaceFactory, securityContextProvisioner);
jobMemoryLimit,
jobImage,
k8sNamespaceFactory,
securityContextProvisioner,
new NoopExecutorServiceWrapper());
lenient().when(k8sNamespaceFactory.create(anyString())).thenReturn(k8sNamespace);
lenient().when(k8sNamespace.deployments()).thenReturn(osDeployments);
lenient().when(pod.getStatus()).thenReturn(podStatus);

View File

@ -118,6 +118,11 @@
<artifactId>che-core-commons-lang</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-test</artifactId>

View File

@ -81,6 +81,8 @@ import org.eclipse.che.api.workspace.server.spi.WorkspaceDao;
import org.eclipse.che.api.workspace.server.spi.environment.InternalEnvironmentFactory;
import org.eclipse.che.api.workspace.server.wsplugins.ChePluginsApplier;
import org.eclipse.che.commons.env.EnvironmentContext;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.eclipse.che.commons.observability.NoopExecutorServiceWrapper;
import org.eclipse.che.commons.subject.SubjectImpl;
import org.eclipse.che.commons.test.db.H2DBTestServer;
import org.eclipse.che.commons.test.db.H2JpaCleaner;
@ -218,6 +220,7 @@ public class JpaEntitiesCascadeRemovalTest {
install(new MultiuserWorkspaceJpaModule());
install(new MachineAuthModule());
install(new DevfileModule());
bind(ExecutorServiceWrapper.class).to(NoopExecutorServiceWrapper.class);
bind(FreeResourcesLimitDao.class).to(JpaFreeResourcesLimitDao.class);
bind(RemoveFreeResourcesLimitSubscriber.class).asEagerSingleton();
@ -236,7 +239,9 @@ public class JpaEntitiesCascadeRemovalTest {
.annotatedWith(Names.named("che.workspace.auto_restore"))
.toInstance(false);
bind(WorkspaceSharedPool.class)
.toInstance(new WorkspaceSharedPool("cached", null, null, null));
.toInstance(
new WorkspaceSharedPool(
"cached", null, null, new NoopExecutorServiceWrapper()));
bind(String[].class)
.annotatedWith(Names.named("che.auth.reserved_user_names"))

View File

@ -397,6 +397,11 @@
<artifactId>che-core-commons-mail</artifactId>
<version>${che.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
<version>${che.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-schedule</artifactId>

View File

@ -58,14 +58,6 @@
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-assistedinject</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-concurrent</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
@ -126,6 +118,10 @@
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-schedule</artifactId>

View File

@ -14,8 +14,6 @@ package org.eclipse.che.api.workspace.server;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.opentracing.Tracer;
import io.opentracing.contrib.concurrent.TracedExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@ -28,6 +26,7 @@ import javax.inject.Singleton;
import org.eclipse.che.commons.annotation.Nullable;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.eclipse.che.commons.lang.concurrent.ThreadLocalPropagateContext;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,7 +45,7 @@ public class WorkspaceSharedPool {
@Named("che.workspace.pool.type") String poolType,
@Named("che.workspace.pool.exact_size") @Nullable String exactSizeProp,
@Named("che.workspace.pool.cores_multiplier") @Nullable String coresMultiplierProp,
Tracer tracer) {
ExecutorServiceWrapper wrapper) {
ThreadFactory factory =
new ThreadFactoryBuilder()
@ -56,7 +55,9 @@ public class WorkspaceSharedPool {
.build();
switch (poolType.toLowerCase()) {
case "cached":
executor = new TracedExecutorService(Executors.newCachedThreadPool(factory), tracer);
executor =
wrapper.wrap(
Executors.newCachedThreadPool(factory), WorkspaceSharedPool.class.getName());
break;
case "fixed":
Integer exactSize = exactSizeProp == null ? null : Ints.tryParse(exactSizeProp);
@ -71,7 +72,9 @@ public class WorkspaceSharedPool {
size *= coresMultiplier;
}
}
executor = new TracedExecutorService(Executors.newFixedThreadPool(size, factory), tracer);
executor =
wrapper.wrap(
Executors.newFixedThreadPool(size, factory), WorkspaceSharedPool.class.getName());
break;
default:
throw new IllegalArgumentException(

View File

@ -17,11 +17,7 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -30,6 +26,7 @@ import javax.inject.Named;
import javax.inject.Singleton;
import org.eclipse.che.api.core.model.workspace.WorkspaceStatus;
import org.eclipse.che.api.workspace.server.hc.probe.ProbeResult.ProbeStatus;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class ProbeScheduler {
private static final Logger LOG = LoggerFactory.getLogger(ProbeScheduler.class);
private final ScheduledThreadPoolExecutor probesExecutor;
private final ScheduledExecutorService probesExecutor;
/**
* Use single thread for a scheduling of tasks interruption by timeout. Single thread can be used
* since it is supposed that interruption is a very quick call. Separate thread is needed to
@ -55,11 +52,18 @@ public class ProbeScheduler {
private final Map<String, List<ScheduledFuture>> probesFutures;
@Inject
public ProbeScheduler(@Named("che.workspace.probe_pool_size") int probeSchedulerPoolSize) {
public ProbeScheduler(
@Named("che.workspace.probe_pool_size") int probeSchedulerPoolSize,
ExecutorServiceWrapper executorServiceWrapper) {
probesExecutor =
new ScheduledThreadPoolExecutor(
probeSchedulerPoolSize,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerProbes-%s").build());
executorServiceWrapper.wrap(
new ScheduledThreadPoolExecutor(
probeSchedulerPoolSize,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ServerProbes-%s")
.build()),
ProbeScheduler.class.getName());
timeouts = new Timer("ServerProbesTimeouts", true);
probesFutures = new ConcurrentHashMap<>();
}

View File

@ -112,6 +112,11 @@
<artifactId>che-core-commons-lang</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-observability</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.che.core</groupId>
<artifactId>che-core-commons-test</artifactId>

View File

@ -97,6 +97,8 @@ import org.eclipse.che.api.workspace.server.model.impl.devfile.ProjectImpl;
import org.eclipse.che.api.workspace.server.model.impl.devfile.SourceImpl;
import org.eclipse.che.api.workspace.server.spi.RuntimeInfrastructure;
import org.eclipse.che.api.workspace.server.spi.WorkspaceDao;
import org.eclipse.che.commons.observability.ExecutorServiceWrapper;
import org.eclipse.che.commons.observability.NoopExecutorServiceWrapper;
import org.eclipse.che.commons.test.db.H2DBTestServer;
import org.eclipse.che.commons.test.db.PersistTestModuleBuilder;
import org.eclipse.che.core.db.DBInitializer;
@ -241,6 +243,7 @@ public class CascadeRemovalTest {
install(new WorkspaceJpaModule());
install(new WorkspaceActivityModule());
install(new JpaKubernetesRuntimeCacheModule());
bind(ExecutorServiceWrapper.class).to(NoopExecutorServiceWrapper.class);
bind(WorkspaceManager.class);
RuntimeInfrastructure infra = mock(RuntimeInfrastructure.class);
@ -264,7 +267,9 @@ public class CascadeRemovalTest {
bind(WorkspaceRuntimes.class).toInstance(wR);
bind(AccountManager.class);
bind(WorkspaceSharedPool.class)
.toInstance(new WorkspaceSharedPool("cached", null, null, null));
.toInstance(
new WorkspaceSharedPool(
"cached", null, null, new NoopExecutorServiceWrapper()));
MapBinder.newMapBinder(binder(), String.class, ComponentIntegrityValidator.class)
.addBinding("kubernetes")