From 1073fcae443e417b082ac4a745454bdef1da1f7d Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Thu, 31 Mar 2022 21:10:09 +0800 Subject: [PATCH] [Bug-9295][Master] fix repeated submit task (#9304) Co-authored-by: caishunfeng <534328519@qq.com> --- .../server/master/consumer/TaskPriorityQueueConsumer.java | 6 ++++-- .../master/processor/queue/TaskExecuteThreadPool.java | 2 +- .../server/master/runner/WorkflowExecuteThreadPool.java | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 06bbc303e..f69268500 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -186,8 +186,10 @@ public class TaskPriorityQueueConsumer extends Thread { if (result) { addDispatchEvent(context, executionContext); } - } catch (RuntimeException | ExecuteException e) { - logger.error("dispatch error: {}", e.getMessage(), e); + } catch (RuntimeException e) { + logger.error("dispatch error: ", e); + } catch (ExecuteException e) { + logger.error("dispatch error: {}", e.getMessage()); } return result; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index ccdab1b97..7bea22f08 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -110,8 +110,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { return; } ListenableFuture future = this.submitListenable(() -> { - taskExecuteThread.run(); multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread); + taskExecuteThread.run(); }); future.addCallback(new ListenableFutureCallback() { @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 8edad9803..0b2066324 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -109,8 +109,8 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { } int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); ListenableFuture future = this.submitListenable(() -> { - workflowExecuteThread.handleEvents(); multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); + workflowExecuteThread.handleEvents(); }); future.addCallback(new ListenableFutureCallback() { @Override