diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java index ba71d790a..41395fc01 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.spi.alert.ShowType; import org.apache.dolphinscheduler.spi.params.InputParam; import org.apache.dolphinscheduler.spi.params.PasswordParam; import org.apache.dolphinscheduler.spi.params.RadioParam; -import org.apache.dolphinscheduler.spi.params.base.DataType; import org.apache.dolphinscheduler.spi.params.base.ParamsOptions; import org.apache.dolphinscheduler.spi.params.base.PluginParams; import org.apache.dolphinscheduler.spi.params.base.Validate; @@ -65,10 +64,9 @@ public class EmailAlertChannelFactory implements AlertChannelFactory { .build(); InputParam mailSmtpPort = InputParam.newBuilder(MailParamsConstants.NAME_MAIL_SMTP_PORT, MailParamsConstants.MAIL_SMTP_PORT) - .setValue(25) + .setValue("25") .addValidate(Validate.newBuilder() .setRequired(true) - .setType(DataType.NUMBER.getDataType()) .build()) .build(); diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java index fc28df272..2ddd42ebd 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java @@ -91,9 +91,8 @@ public class EmailAlertChannelTest { InputParam mailSmtpPort = InputParam.newBuilder("serverPort", "smtp.port") .addValidate(Validate.newBuilder() .setRequired(true) - .setType(DataType.NUMBER.getDataType()) .build()) - .setValue(25) + .setValue("25") .build(); InputParam mailSender = InputParam.newBuilder("sender", "sender") @@ -102,10 +101,10 @@ public class EmailAlertChannelTest { .build(); RadioParam enableSmtpAuth = RadioParam.newBuilder("enableSmtpAuth", "smtp.auth") - .addParamsOptions(new ParamsOptions("YES", true, false)) - .addParamsOptions(new ParamsOptions("NO", false, false)) + .addParamsOptions(new ParamsOptions("YES", "true", false)) + .addParamsOptions(new ParamsOptions("NO", "false", false)) .addValidate(Validate.newBuilder().setRequired(true).build()) - .setValue(false) + .setValue("false") .build(); InputParam mailUser = InputParam.newBuilder("user", "user") @@ -119,17 +118,17 @@ public class EmailAlertChannelTest { .build(); RadioParam enableTls = RadioParam.newBuilder("starttlsEnable", "starttls.enable") - .addParamsOptions(new ParamsOptions("YES", true, false)) - .addParamsOptions(new ParamsOptions("NO", false, false)) + .addParamsOptions(new ParamsOptions("YES", "true", false)) + .addParamsOptions(new ParamsOptions("NO", "false", false)) .addValidate(Validate.newBuilder().setRequired(true).build()) - .setValue(true) + .setValue("true") .build(); RadioParam enableSsl = RadioParam.newBuilder("sslEnable", "smtp.ssl.enable") - .addParamsOptions(new ParamsOptions("YES", true, false)) - .addParamsOptions(new ParamsOptions("NO", false, false)) + .addParamsOptions(new ParamsOptions("YES", "true", false)) + .addParamsOptions(new ParamsOptions("NO", "false", false)) .addValidate(Validate.newBuilder().setRequired(true).build()) - .setValue(true) + .setValue("true") .build(); InputParam sslTrust = InputParam.newBuilder("smtpSslTrust", "smtp.ssl.trust") diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index e6c779238..18882a2fb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -25,12 +25,14 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; -import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; + +import javax.annotation.PostConstruct; + import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +42,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; -import javax.annotation.PostConstruct; - @@ -73,12 +73,6 @@ public class MasterServer { */ private NettyRemotingServer nettyRemotingServer; - /** - * master registry - */ - @Autowired - private MasterRegistry masterRegistry; - /** * zk master client */ @@ -117,9 +111,6 @@ public class MasterServer { this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.start(); - // register - this.masterRegistry.registry(); - // self tolerant this.zkMasterClient.start(); @@ -178,7 +169,6 @@ public class MasterServer { // this.masterSchedulerService.close(); this.nettyRemotingServer.close(); - this.masterRegistry.unRegistry(); this.zkMasterClient.close(); //close quartz try{ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index d7de840df..69058a49b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -124,12 +125,11 @@ public class TaskPriorityQueueConsumer extends Thread { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); for (int i = 0; i < fetchTaskNum; i++) { - if (taskPriorityQueue.size() <= 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); + TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (Objects.isNull(taskPriority)) { continue; } - // if not task , blocking here - TaskPriority taskPriority = taskPriorityQueue.take(); + boolean dispatchResult = dispatch(taskPriority); if (!dispatchResult) { failedDispatchTasks.add(taskPriority); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 308808084..aafd7a152 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -97,7 +97,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private void setTaskCache(TaskExecutionContext taskExecutionContext) { TaskExecutionContext preTaskCache = new TaskExecutionContext(); preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + taskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache); } public TaskExecuteProcessor(AlertClientService alertClientService) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 037bde6c7..73f2e700f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -84,6 +84,8 @@ public abstract class AbstractCommandExecutor { * log list */ protected final List logBuffer; + + protected boolean logOutputIsScuccess = false; /** * SHELL result string @@ -348,34 +350,46 @@ public abstract class AbstractCommandExecutor { */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); - ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); - parseProcessOutputExecutorService.submit(new Runnable() { - @Override - public void run() { - BufferedReader inReader = null; - - try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line; - - long lastFlushTime = System.currentTimeMillis(); - - while ((line = inReader.readLine()) != null) { - if (line.startsWith("${setValue(")) { - varPool.append(line.substring("${setValue(".length(), line.length() - 2)); - varPool.append("$VarPool$"); - } else { - logBuffer.add(line); - taskResultString = line; - lastFlushTime = flush(lastFlushTime); - } + ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); + getOutputLogService.submit(() -> { + BufferedReader inReader = null; + try { + inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line; + logBuffer.add("welcome to use bigdata scheduling system..."); + while ((line = inReader.readLine()) != null) { + if (line.startsWith("${setValue(")) { + varPool.append(line.substring("${setValue(".length(), line.length() - 2)); + varPool.append("$VarPool$"); + } else { + logBuffer.add(line); + taskResultString = line; } - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - clear(); - close(inReader); } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + logOutputIsScuccess = true; + close(inReader); + } + }); + getOutputLogService.shutdown(); + + ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); + parseProcessOutputExecutorService.submit(() -> { + try { + long lastFlushTime = System.currentTimeMillis(); + while (logBuffer.size() > 0 || !logOutputIsScuccess) { + if (logBuffer.size() > 0) { + lastFlushTime = flush(lastFlushTime); + } else { + Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL); + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + clear(); } }); parseProcessOutputExecutorService.shutdown(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 37484dafa..1f0926ba0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; @@ -66,6 +67,12 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private ProcessService processService; + /** + * master registry + */ + @Autowired + private MasterRegistry masterRegistry; + public void start() { InterProcessMutex mutex = null; @@ -75,6 +82,9 @@ public class ZKMasterClient extends AbstractZKClient { mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex.acquire(); + // Master registry + masterRegistry.registry(); + // init system znode this.initSystemZNode(); @@ -98,6 +108,7 @@ public class ZKMasterClient extends AbstractZKClient { @Override public void close() { super.close(); + masterRegistry.unRegistry(); } /** diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index ed3479ee0..ac91e1d9b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -16,39 +16,61 @@ */ package org.apache.dolphinscheduler.server.worker.shell; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.dolphinscheduler.common.utils.*; +import org.springframework.context.ApplicationContext; import java.util.Date; +import java.util.List; /** - * python shell command executor test + * python shell command executor test */ -@Ignore +@RunWith(PowerMockRunner.class) +@PrepareForTest(OSUtils.class) +@PowerMockIgnore({"javax.management.*"}) public class ShellCommandExecutorTest { private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class); private ProcessService processService = null; + private ApplicationContext applicationContext; @Before - public void before(){ - processService = SpringApplicationContext.getBean(ProcessService.class); + public void before() { + applicationContext = PowerMockito.mock(ApplicationContext.class); + processService = PowerMockito.mock(ProcessService.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); } + @Ignore @Test public void test() throws Exception { @@ -63,19 +85,18 @@ public class ShellCommandExecutorTest { taskProps.setTaskInstanceId(7657); - TaskInstance taskInstance = processService.findTaskInstanceById(7657); String taskJson = taskInstance.getTaskJson(); - TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - taskProps.setTaskParams(taskNode.getParams()); +// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); +// taskProps.setTaskParams(taskNode.getParams()); // custom logger - Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInstance.getProcessDefinitionId(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); +// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, +// taskInstance.getProcessDefinitionId(), +// taskInstance.getProcessInstanceId(), +// taskInstance.getId())); // AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); @@ -91,14 +112,141 @@ public class ShellCommandExecutorTest { task.handle(); ExecutionStatus status = ExecutionStatus.SUCCESS; - if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) { status = ExecutionStatus.SUCCESS; - }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ + } else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL) { status = ExecutionStatus.KILL; - }else { + } else { status = ExecutionStatus.FAILURE; } logger.info(status.toString()); } -} \ No newline at end of file + + @Test + public void testParseProcessOutput() { + Class shellCommandExecutorClass = AbstractCommandExecutor.class; + try { + + Method method = shellCommandExecutorClass.getDeclaredMethod("parseProcessOutput", Process.class); + method.setAccessible(true); + Object[] arg1s = {new Process() { + @Override + public OutputStream getOutputStream() { + return new OutputStream() { + @Override + public void write(int b) throws IOException { + logger.info("unit test"); + } + }; + } + + @Override + public InputStream getInputStream() { + return new InputStream() { + @Override + public int read() throws IOException { + return 0; + } + }; + } + + @Override + public InputStream getErrorStream() { + return null; + } + + @Override + public int waitFor() throws InterruptedException { + return 0; + } + + @Override + public int exitValue() { + return 0; + } + + @Override + public void destroy() { + logger.info("unit test"); + } + } }; + method.invoke(new AbstractCommandExecutor(null, new TaskExecutionContext(), logger) { + @Override + protected String buildCommandFilePath() { + return null; + } + + @Override + protected String commandInterpreter() { + return null; + } + + @Override + protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { + logger.info("unit test"); + } + }, arg1s); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testFindAppId() { + Class shellCommandExecutorClass = AbstractCommandExecutor.class; + try { + + Method method = shellCommandExecutorClass.getDeclaredMethod("findAppId", new Class[]{String.class}); + method.setAccessible(true); + Object[] arg1s = {"11111"}; + String result = (String) method.invoke(new AbstractCommandExecutor(null, null, null) { + @Override + protected String buildCommandFilePath() { + return null; + } + + @Override + protected String commandInterpreter() { + return null; + } + + @Override + protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { + logger.info("unit test"); + } + }, arg1s); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Test + public void testConvertFile2List() { + Class shellCommandExecutorClass = AbstractCommandExecutor.class; + try { + Method method = shellCommandExecutorClass.getDeclaredMethod("convertFile2List", String.class); + method.setAccessible(true); + Object[] arg1s = {"/opt/1.txt"}; + List result = (List) method.invoke(new AbstractCommandExecutor(null, null, null) { + @Override + protected String buildCommandFilePath() { + return null; + } + + @Override + protected String commandInterpreter() { + return null; + } + + @Override + protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { + logger.info("unit test"); + } + }, arg1s); + Assert.assertTrue(true); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index 8c734af2c..160e92f61 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -27,11 +27,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.sql.DriverManager; import java.util.Date; import java.util.HashMap; import java.util.Map; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -103,6 +103,8 @@ public class ShellTaskTest { public void testComplementData() throws Exception { shellTask = new ShellTask(taskExecutionContext, logger); shellTask.init(); + shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>()); + shellCommandExecutor.isSuccessOfYarnState(null); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); shellTask.handle(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 5ce42242e..aa278a624 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Task instances priority queue implementation @@ -39,6 +41,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + /** + * Lock used for all public operations + */ + private final ReentrantLock lock = new ReentrantLock(true); + /** * put task instance to priority queue * @@ -61,6 +68,23 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue + * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit) + * because this method of override interface used without considering accuracy of timeout + * + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + @Override + public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException { + throw new TaskPriorityQueueException("This operation is not currently supported and suggest to use PriorityBlockingQueue if you want!"); + } + /** * peek taskInfo * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java index 14c6b382d..1325afe9b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; +import java.util.concurrent.TimeUnit; + /** * task priority queue * @param @@ -41,6 +43,17 @@ public interface TaskPriorityQueue { */ T take() throws TaskPriorityQueueException, InterruptedException; + + /** + * poll taskInfo with timeout + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException; + /** * size * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 694d4c476..8775a272e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import org.springframework.stereotype.Service; @@ -61,6 +62,20 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { return queue.take(); } + /** + * poll taskInfo with timeout + * + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + @Override + public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { + return queue.poll(timeout,unit); + } + /** * queue size * diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java similarity index 60% rename from dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index cf39d57b8..550c4bea3 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -15,48 +15,78 @@ * limitations under the License. */ -package queue; +package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; + +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -/** - * Task instances priority queue implementation - * All the task instances are in the same process instance. - */ public class PeerTaskInstancePriorityQueueTest { @Test - public void testPut() throws Exception { + public void put() throws TaskPriorityQueueException { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); - Assert.assertEquals(2,queue.size()); + Assert.assertEquals(2, queue.size()); } @Test - public void testPeek() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - int peekBeforeLength = queue.size(); - queue.peek(); - Assert.assertEquals(peekBeforeLength,queue.size()); - - } - - @Test - public void testTake() throws Exception { + public void take() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); queue.take(); Assert.assertTrue(queue.size() < peekBeforeLength); } + @Test + public void poll() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + try { + queue.poll(1000, TimeUnit.MILLISECONDS); + } catch (TaskPriorityQueueException e) { + e.printStackTrace(); + } + } + + @Test + public void peek() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + int peekBeforeLength = queue.size(); + queue.peek(); + Assert.assertEquals(peekBeforeLength, queue.size()); + } + + @Test + public void size() throws Exception { + Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size()); + } + + @Test + public void contains() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + queue.put(taskInstanceMediumPriority); + Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); + } + + @Test + public void remove() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + queue.put(taskInstanceMediumPriority); + int peekBeforeLength = queue.size(); + queue.remove(taskInstanceMediumPriority); + Assert.assertNotEquals(peekBeforeLength, queue.size()); + } + /** * get queue * @@ -66,7 +96,7 @@ public class PeerTaskInstancePriorityQueueTest { private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); - TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); return queue; @@ -75,8 +105,8 @@ public class PeerTaskInstancePriorityQueueTest { /** * create task instance * - * @param name name - * @param priority priority + * @param name name + * @param priority priority * @return */ private TaskInstance createTaskInstance(String name, Priority priority) { @@ -85,5 +115,4 @@ public class PeerTaskInstancePriorityQueueTest { taskInstance.setTaskInstancePriority(priority); return taskInstance; } - } \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java similarity index 52% rename from dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index 151177016..d90011b84 100644 --- a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -15,18 +15,19 @@ * limitations under the License. */ -package queue; +package org.apache.dolphinscheduler.service.queue; -import org.apache.dolphinscheduler.service.queue.TaskPriority; +import org.apache.dolphinscheduler.common.enums.Priority; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -public class TaskPriorityTest { +public class TaskPriorityQueueImplTest { @Test public void testSort() { @@ -36,8 +37,8 @@ public class TaskPriorityTest { List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 1, 0, 0, "default"); @@ -46,8 +47,8 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 0, 1, 0, "default"); @@ -56,8 +57,8 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 0, 0, 1, "default"); @@ -66,8 +67,8 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); @@ -76,8 +77,66 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); } -} + + @Test + public void put() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + Assert.assertEquals(2, queue.size()); + } + + @Test + public void take() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + int peekBeforeLength = queue.size(); + queue.take(); + Assert.assertTrue(queue.size() < peekBeforeLength); + } + + @Test + public void poll() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + int peekBeforeLength = queue.size(); + queue.poll(1000, TimeUnit.MILLISECONDS); + queue.poll(1000, TimeUnit.MILLISECONDS); + Assert.assertTrue(queue.size() == 0); + System.out.println(System.currentTimeMillis()); + queue.poll(1000, TimeUnit.MILLISECONDS); + System.out.println(System.currentTimeMillis()); + } + + @Test + public void size() throws Exception { + Assert.assertTrue(getPriorityQueue().size() == 2); + } + + /** + * get queue + * + * @return queue + * @throws Exception + */ + private TaskPriorityQueue getPriorityQueue() throws Exception { + TaskPriorityQueue queue = new TaskPriorityQueueImpl(); + TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1); + TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2); + queue.put(taskInstanceHigPriority); + queue.put(taskInstanceMediumPriority); + return queue; + } + + /** + * create task priority + * + * @param priority + * @param processInstanceId + * @return + */ + private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) { + TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default"); + return priorityOne; + } +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java deleted file mode 100644 index 2c13afa22..000000000 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package queue; - -import org.apache.dolphinscheduler.service.queue.TaskPriority; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TaskUpdateQueueTest { - - /** - * test put - */ - @Test - public void testQueue() throws Exception{ - - /** - * 1_1_2_1_default - * 1_1_2_2_default - * 1_1_0_3_default - * 1_1_0_4_default - */ - TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default"); - TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default"); - TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default"); - TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default"); - - TaskPriorityQueue queue = new TaskPriorityQueueImpl(); - queue.put(taskInfo1); - queue.put(taskInfo2); - queue.put(taskInfo3); - queue.put(taskInfo4); - - assertEquals(taskInfo3, queue.take()); - assertEquals(taskInfo4, queue.take()); - assertEquals(taskInfo1, queue.take()); - assertEquals(taskInfo2, queue.take()); - } -} diff --git a/dolphinscheduler-ui.zip b/dolphinscheduler-ui.zip deleted file mode 100644 index 1f11f8f04..000000000 Binary files a/dolphinscheduler-ui.zip and /dev/null differ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue index d6aeaf9cd..2b478be06 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue @@ -110,21 +110,6 @@ -
-
- {{$t('Startup parameter')}} -
-
-
- - -
-
-
+
+
+ {{$t('Startup parameter')}} +
+
+
+ + +
+
+
{{$t('Cancel')}} {{spinnerLoading ? 'Loading...' : $t('Start')}}