diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index 7bea22f08..75c027251 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -109,10 +109,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) { return; } - ListenableFuture future = this.submitListenable(() -> { - multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread); - taskExecuteThread.run(); - }); + multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread); + ListenableFuture future = this.submitListenable(taskExecuteThread::run); future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 0b2066324..cd337b4c9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -107,11 +107,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { return; } + multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); - ListenableFuture future = this.submitListenable(() -> { - multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); - workflowExecuteThread.handleEvents(); - }); + ListenableFuture future = this.submitListenable(workflowExecuteThread::handleEvents); future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) {