More information for ServerSideRequestProcessor.RejectedExecutionHandler (#11681)
More information for ServerSideRequestProcessor.RejectedExecutionHandler (#11681)6.19.x
parent
56973d2169
commit
be8a867c4e
|
|
@ -70,7 +70,7 @@ public class JsonRpcMessageReceiver implements WebSocketMessageReceiver {
|
|||
List<String> messages = jsonRpcUnmarshaller.unmarshalArray(message);
|
||||
for (String innerMessage : messages) {
|
||||
if (jsonRpcQualifier.isJsonRpcRequest(innerMessage)) {
|
||||
requestProcessor.process(() -> processRequest(endpointId, innerMessage));
|
||||
requestProcessor.process(new ProcessRequestTask(endpointId, innerMessage));
|
||||
} else if (jsonRpcQualifier.isJsonRpcResponse(innerMessage)) {
|
||||
processResponse(endpointId, innerMessage);
|
||||
} else {
|
||||
|
|
@ -91,18 +91,35 @@ public class JsonRpcMessageReceiver implements WebSocketMessageReceiver {
|
|||
responseDispatcher.dispatch(endpointId, response);
|
||||
}
|
||||
|
||||
private void processRequest(String endpointId, String innerMessage) {
|
||||
JsonRpcRequest request = null;
|
||||
try {
|
||||
request = jsonRpcUnmarshaller.unmarshalRequest(innerMessage);
|
||||
requestDispatcher.dispatch(endpointId, request);
|
||||
} catch (JsonRpcException e) {
|
||||
if (request == null || request.getId() == null) {
|
||||
errorTransmitter.transmit(endpointId, e);
|
||||
} else {
|
||||
errorTransmitter.transmit(
|
||||
endpointId, new JsonRpcException(e.getCode(), e.getMessage(), request.getId()));
|
||||
private class ProcessRequestTask implements Runnable {
|
||||
|
||||
private final String endpointId;
|
||||
private final String innerMessage;
|
||||
|
||||
public ProcessRequestTask(String endpointId, String innerMessage) {
|
||||
this.endpointId = endpointId;
|
||||
this.innerMessage = innerMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
JsonRpcRequest request = null;
|
||||
try {
|
||||
request = jsonRpcUnmarshaller.unmarshalRequest(innerMessage);
|
||||
requestDispatcher.dispatch(endpointId, request);
|
||||
} catch (JsonRpcException e) {
|
||||
if (request == null || request.getId() == null) {
|
||||
errorTransmitter.transmit(endpointId, e);
|
||||
} else {
|
||||
errorTransmitter.transmit(
|
||||
endpointId, new JsonRpcException(e.getCode(), e.getMessage(), request.getId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JsonRPC request `" + innerMessage + "` for " + endpointId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ public class ServerSideRequestProcessor implements RequestProcessor {
|
|||
public ServerSideRequestProcessor(
|
||||
@Named("che.core.jsonrpc.processor_max_pool_size") int maxPoolSize) {
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
LOG.info(" che.core.jsonrpc.processor_max_pool_siz {} ", maxPoolSize);
|
||||
LOG.debug("che.core.jsonrpc.processor_max_pool_size {} ", maxPoolSize);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
|
|
@ -59,7 +59,7 @@ public class ServerSideRequestProcessor implements RequestProcessor {
|
|||
0, maxPoolSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), factory);
|
||||
((ThreadPoolExecutor) executorService)
|
||||
.setRejectedExecutionHandler(
|
||||
(r, executor) -> LOG.warn("Message {} rejected for execution in {}", r, executor));
|
||||
(r, executor) -> LOG.warn("Message {} rejected for execution", r));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
|
|
|||
Loading…
Reference in New Issue