From 34352c1bf8e07cb12fad227593e745fccc1b249e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BE=E5=B2=81?= Date: Sun, 26 Sep 2021 18:40:41 +0800 Subject: [PATCH] Enhance using experience of DataX by introduce TIS (#6229) * Enhance using experience of DataX by introduce TIS [Feature-5992] * Enhance using experience of DataX by introduce TIS [Feature-5992] * fix stylecheck error * make testCase pass * make dev testCase pass * add new java dependency Java-WebSocket * modfiy TISParameters.java avoid to name confliction * add InterruptedException checking * reAdd async-http-client annotation * in order to staisfy the coverage degree add test which has been removed * make testCase pass * make testCase pass * add jacoco dependency * make code duplications be more lower * add Java-WebSocket dependency in root pom * remove useless code comment * make tis http apply path and post body configurable ,the params save in config.properites * Remove the dangerous instance of double-checked locking --- .../common/enums/TaskType.java | 1 + .../common/task/tis/TISCommonParameters.java | 58 +++++++ .../common/utils/TaskParametersUtils.java | 3 + .../common/utils/TaskParametersUtilsTest.java | 1 + dolphinscheduler-dist/release-docs/LICENSE | 2 +- .../licenses/LICENSE-Java-WebSocket.txt | 22 +++ .../src/main/provisio/dolphinscheduler.xml | 6 +- .../dolphinscheduler-task-tis/pom.xml | 42 +++-- .../plugin/task/tis/TISConfig.java | 85 ++++++++++ .../plugin/task/tis/TISTask.java | 158 +++++++++++------- .../plugin/task/tis/TISTaskPlugin.java | 31 ++++ .../plugin/task/tis/config.properties | 26 +++ .../plugin/task/tis/TISTaskTest.java | 72 ++++---- dolphinscheduler-task-plugin/pom.xml | 1 + .../pages/dag/_source/canvas/taskbar.scss | 6 + .../pages/dag/_source/formModel/formModel.vue | 9 + .../src/js/module/i18n/locale/zh_CN.js | 2 + pom.xml | 8 +- tools/dependencies/known-dependencies.txt | 3 +- 19 files changed, 428 insertions(+), 108 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java create mode 100644 dolphinscheduler-dist/release-docs/licenses/LICENSE-Java-WebSocket.txt create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskPlugin.java create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/resources/org/apache/dolphinscheduler/plugin/task/tis/config.properties diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 3792368ae..7882bac40 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -53,6 +53,7 @@ public enum TaskType { SQOOP(12, "SQOOP"), WATERDROP(13, "WATERDROP"), SWITCH(14, "SWITCH"), + TIS(15, "TIS"), ; TaskType(int code, String desc) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java new file mode 100644 index 000000000..aebbdd074 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task.tis; + +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TIS parameter + */ +public class TISCommonParameters extends AbstractParameters { + + private static final Logger logger = LoggerFactory.getLogger(TISCommonParameters.class); + /** + * TIS target job name + */ + private String jobName; + + public String getTargetJobName() { + return jobName; + } + + public void setTargetJobName(String jobName) { + this.jobName = jobName; + } + + @Override + public boolean checkParameters() { + return StringUtils.isNotBlank(this.jobName); + } + + @Override + public List getResourceFilesList() { + return Collections.emptyList(); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index f5e9dec36..e4f990d05 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.task.tis.TISCommonParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +86,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, SqoopParameters.class); case "SWITCH": return JSONUtils.parseObject(parameter, SwitchParameters.class); + case "TIS": + return JSONUtils.parseObject(parameter, TISCommonParameters.class); default: logger.error("not support task type: {}", taskType); return null; diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java index 3fcc55b3d..47fe1acfa 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java @@ -41,5 +41,6 @@ public class TaskParametersUtilsTest { Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.DEPENDENT.getDesc(), "{}")); Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.FLINK.getDesc(), "{}")); Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.HTTP.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.TIS.getDesc(), "{}")); } } diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index e29e67916..63ab3f63c 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -417,7 +417,6 @@ The text of each license is also included at licenses/LICENSE-[project].txt. protostuff-runtime 1.7.2: https://github.com/protostuff/protostuff/protostuff-core Apache-2.0 protostuff-api 1.7.2: https://github.com/protostuff/protostuff/protostuff-api Apache-2.0 protostuff-collectionschema 1.7.2: https://github.com/protostuff/protostuff/protostuff-collectionschema Apache-2.0 - async-http-client 2.12.3: https://mvnrepository.com/artifact/org.asynchttpclient/async-http-client Apache-2.0 ======================================================================== BSD licenses ======================================================================== @@ -493,6 +492,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. slf4j-api 1.7.5: https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.5, MIT animal-sniffer-annotations 1.14 https://mvnrepository.com/artifact/org.codehaus.mojo/animal-sniffer-annotations/1.14, MIT checker-compat-qual 2.0.0 https://mvnrepository.com/artifact/org.checkerframework/checker-compat-qual/2.0.0, MIT + GPLv2 + Java-WebSocket 1.5.1: https://github.com/TooTallNate/Java-WebSocket MIT ======================================================================== MPL 1.1 licenses diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-Java-WebSocket.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-Java-WebSocket.txt new file mode 100644 index 000000000..dbf7415b4 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-Java-WebSocket.txt @@ -0,0 +1,22 @@ +Copyright (c) 2010-2020 Nathan Rajlich + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation + files (the "Software"), to deal in the Software without + restriction, including without limitation the rights to use, + copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the + Software is furnished to do so, subject to the following + conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml index bc2be4576..fc480acef 100644 --- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml +++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml @@ -106,10 +106,14 @@ + + + + + - \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml index 5abc8d01b..18b332e1a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml @@ -27,7 +27,7 @@ 4.0.0 dolphinscheduler-task-tis - + dolphinscheduler-plugin @@ -35,6 +35,11 @@ dolphinscheduler-task-api ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + org.slf4j slf4j-api @@ -60,10 +65,21 @@ + + + + + - org.asynchttpclient - async-http-client - 2.12.3 + org.jacoco + org.jacoco.agent + runtime + test + + + + org.java-websocket + Java-WebSocket @@ -74,24 +90,24 @@ org.apache.httpcomponents httpcore - - org.apache.dolphinscheduler - dolphinscheduler-common - org.mockito mockito-core - - org.apache.dolphinscheduler - dolphinscheduler-server - provided - org.powermock powermock-api-mockito2 + + junit + junit + test + + + + dolphinscheduler-task-tis-${project.version} + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java new file mode 100644 index 000000000..e6a00f045 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.tis; + +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.ResourceBundle; + +public class TISConfig { + + private static TISConfig cfg; + + private final String jobTriggerUrl; + private final String jobTriggerPostBody; + private final String jobStatusUrl; + private final String jobStatusPostBody; + + private final String jobLogsFetchUrl; + private final String jobCancelPostBody; + + public static synchronized TISConfig getInstance() { + if (cfg == null) { + cfg = new TISConfig(); + } + return cfg; + } + + private TISConfig() { + ResourceBundle bundle = ResourceBundle.getBundle(TISConfig.class.getPackage().getName().replace(".", "/") + "/config"); + this.jobTriggerUrl = bundle.getString("job.trigger.url"); + this.jobStatusUrl = bundle.getString("job.status.url"); + this.jobTriggerPostBody = bundle.getString("job.trigger.post.body"); + this.jobStatusPostBody = bundle.getString("job.status.post.body"); + this.jobLogsFetchUrl = bundle.getString("job.logs.fetch.url"); + this.jobCancelPostBody = bundle.getString("job.cancel.post.body"); + } + + public String getJobCancelPostBody(int taskId) { + return String.format(jobCancelPostBody, taskId); + } + + public String getJobTriggerUrl(String tisHost) { + checkTisHost(tisHost); + return String.format(this.jobTriggerUrl, tisHost); + } + + public String getJobTriggerPostBody() { + return jobTriggerPostBody; + } + + public String getJobStatusPostBody(int taskId) { + return String.format(jobStatusPostBody, taskId); + } + + public String getJobLogsFetchUrl(String tisHost, String jobName, int taskId) { + checkTisHost(tisHost); + return String.format(jobLogsFetchUrl, tisHost, jobName, taskId); + } + + public String getJobStatusUrl(String tisHost) { + checkTisHost(tisHost); + return String.format(this.jobStatusUrl, tisHost); + } + + private static void checkTisHost(String tisHost) { + if (StringUtils.isBlank(tisHost)) { + throw new IllegalArgumentException("param tisHost can not be null"); + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java index 60d494971..aca7a5b04 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java @@ -17,11 +17,11 @@ package org.apache.dolphinscheduler.plugin.task.tis; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import org.apache.dolphinscheduler.spi.utils.CollectionUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -35,32 +35,32 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import java.net.HttpURLConnection; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import org.asynchttpclient.Dsl; -import org.asynchttpclient.ws.WebSocket; -import org.asynchttpclient.ws.WebSocketListener; -import org.asynchttpclient.ws.WebSocketUpgradeHandler; -import org.slf4j.Logger; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; /** * TIS DataX Task **/ public class TISTask extends AbstractTaskExecutor { - public static final String WS_REQUEST_PATH = "/tjs/download/logfeedback"; public static final String KEY_POOL_VAR_TIS_HOST = "tisHost"; private final TaskRequest taskExecutionContext; private TISParameters tisParameters; + private BizResult triggerResult; + private final TISConfig tisConfig; public TISTask(TaskRequest taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; + this.tisConfig = TISConfig.getInstance(); } @Override @@ -79,35 +79,31 @@ public class TISTask extends AbstractTaskExecutor { logger.info("start execute TIS task"); long startTime = System.currentTimeMillis(); String targetJobName = this.tisParameters.getTargetJobName(); - final String tisHost = taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_TIS_HOST); - if (StringUtils.isEmpty(tisHost)) { - throw new IllegalStateException("global var '" + KEY_POOL_VAR_TIS_HOST + "' can not be empty"); - } + String tisHost = getTisHost(); try { - final String triggerUrl = String.format("http://%s/tjs/coredefine/coredefine.ajax", tisHost); - final String getStatusUrl = String.format("http://%s/tjs/config/config.ajax?action=collection_action&emethod=get_task_status", tisHost); + final String triggerUrl = getTriggerUrl(); + final String getStatusUrl = tisConfig.getJobStatusUrl(tisHost); HttpPost post = new HttpPost(triggerUrl); post.addHeader("appname", targetJobName); addFormUrlencoded(post); - StringEntity entity = new StringEntity("action=datax_action&emethod=trigger_fullbuild_task", StandardCharsets.UTF_8); + StringEntity entity = new StringEntity(tisConfig.getJobTriggerPostBody(), StandardCharsets.UTF_8); post.setEntity(entity); - BizResult ajaxResult = null; ExecResult execState = null; int taskId; - WebSocket webSocket = null; + WebSocketClient webSocket = null; try (CloseableHttpClient client = HttpClients.createDefault(); // trigger to start TIS dataX task CloseableHttpResponse response = client.execute(post)) { - ajaxResult = processResponse(triggerUrl, response, BizResult.class); - if (!ajaxResult.isSuccess()) { - List errormsg = ajaxResult.getErrormsg(); + triggerResult = processResponse(triggerUrl, response, BizResult.class); + if (!triggerResult.isSuccess()) { + List errormsg = triggerResult.getErrormsg(); StringBuffer errs = new StringBuffer(); if (CollectionUtils.isNotEmpty(errormsg)) { errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(","))); } throw new Exception("trigger TIS job faild taskName:" + targetJobName + errs.toString()); } - taskId = ajaxResult.getBizresult().getTaskid(); + taskId = triggerResult.getBizresult().getTaskid(); webSocket = receiveRealtimeLog(tisHost, targetJobName, taskId); @@ -134,10 +130,13 @@ public class TISTask extends AbstractTaskExecutor { } } } finally { - try { - webSocket.sendCloseFrame(); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); + if (webSocket != null) { + Thread.sleep(4000); + try { + webSocket.close(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } } } @@ -148,6 +147,9 @@ public class TISTask extends AbstractTaskExecutor { } catch (Exception e) { logger.error("execute TIS dataX faild,TIS task name:" + targetJobName, e); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } @@ -158,46 +160,71 @@ public class TISTask extends AbstractTaskExecutor { @Override public void cancelApplication(boolean status) throws Exception { super.cancelApplication(status); + logger.info("start to cancelApplication"); + Objects.requireNonNull(triggerResult, "triggerResult can not be null"); + logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId()); + final String triggerUrl = getTriggerUrl(); + + StringEntity entity = new StringEntity(tisConfig.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8); + + CancelResult cancelResult = null; + HttpPost post = new HttpPost(triggerUrl); + addFormUrlencoded(post); + post.setEntity(entity); + try (CloseableHttpClient client = HttpClients.createDefault(); + // trigger to start TIS dataX task + CloseableHttpResponse response = client.execute(post)) { + cancelResult = processResponse(triggerUrl, response, CancelResult.class); + if (!cancelResult.isSuccess()) { + List errormsg = triggerResult.getErrormsg(); + StringBuffer errs = new StringBuffer(); + if (org.apache.dolphinscheduler.spi.utils.CollectionUtils.isNotEmpty(errormsg)) { + errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(","))); + } + throw new Exception("cancel TIS job faild taskId:" + triggerResult.getTaskId() + errs.toString()); + } + } } - private WebSocket receiveRealtimeLog(final String tisHost, String dataXName, int taskId) throws InterruptedException, java.util.concurrent.ExecutionException { + private String getTriggerUrl() { + final String tisHost = getTisHost(); + return tisConfig.getJobTriggerUrl(tisHost); + } - WebSocketUpgradeHandler.Builder upgradeHandlerBuilder - = new WebSocketUpgradeHandler.Builder(); - WebSocketUpgradeHandler wsHandler = upgradeHandlerBuilder - .addWebSocketListener(new WebSocketListener() { - @Override - public void onOpen(WebSocket websocket) { - // WebSocket connection opened - } + private String getTisHost() { + final String tisHost = taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_TIS_HOST); + if (StringUtils.isEmpty(tisHost)) { + throw new IllegalStateException("global var '" + KEY_POOL_VAR_TIS_HOST + "' can not be empty"); + } + return tisHost; + } - @Override - public void onClose(WebSocket websocket, int code, String reason) { - // WebSocket connection closed - } + private WebSocketClient receiveRealtimeLog(final String tisHost, String dataXName, int taskId) throws Exception { + final String applyURI = tisConfig.getJobLogsFetchUrl(tisHost, dataXName, taskId); + logger.info("apply ws connection,uri:{}", applyURI); + WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) { + @Override + public void onOpen(ServerHandshake handshakedata) { + logger.info("start to receive remote execute log"); + } - @Override - public void onTextFrame(String payload, boolean finalFragment, int rsv) { - ExecLog execLog = JSONUtils.parseObject(payload, ExecLog.class); - logger.info(execLog.getMsg()); - } + @Override + public void onMessage(String message) { + ExecLog execLog = JSONUtils.parseObject(message, ExecLog.class); + logger.info(execLog.getMsg()); + } - @Override - public void onError(Throwable t) { - // WebSocket connection error - logger.error(t.getMessage(), t); - } - }).build(); - WebSocket webSocketClient = Dsl.asyncHttpClient() - .prepareGet(String.format("ws://%s" + WS_REQUEST_PATH, tisHost)) - // .addHeader("header_name", "header_value") - .addQueryParam("logtype", "full") - .addQueryParam("collection", dataXName) - .addQueryParam("taskid", String.valueOf(taskId)) - .setRequestTimeout(5000) - .execute(wsHandler) - .get(); + @Override + public void onClose(int code, String reason, boolean remote) { + logger.info("stop to receive remote log,reason:{},taskId:{}", reason, taskId); + } + @Override + public void onError(Exception t) { + logger.error(t.getMessage(), t); + } + }; + webSocketClient.connect(); return webSocketClient; } @@ -218,6 +245,19 @@ public class TISTask extends AbstractTaskExecutor { return this.tisParameters; } + private static class CancelResult extends AjaxResult { + private Object bizresult; + + @Override + public Object getBizresult() { + return this.bizresult; + } + + public void setBizresult(Object bizresult) { + this.bizresult = bizresult; + } + } + private static class BizResult extends AjaxResult { private TriggerBuildResult bizresult; @@ -226,6 +266,10 @@ public class TISTask extends AbstractTaskExecutor { return this.bizresult; } + public int getTaskId() { + return bizresult.taskid; + } + public void setBizresult(TriggerBuildResult bizresult) { this.bizresult = bizresult; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskPlugin.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskPlugin.java new file mode 100644 index 000000000..a84b04d0e --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskPlugin.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.tis; + +import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import com.google.common.collect.ImmutableList; + +public class TISTaskPlugin implements DolphinSchedulerPlugin { + + @Override + public Iterable getTaskChannelFactorys() { + return ImmutableList.of(new TISTaskChannelFactory()); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/resources/org/apache/dolphinscheduler/plugin/task/tis/config.properties b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/resources/org/apache/dolphinscheduler/plugin/task/tis/config.properties new file mode 100644 index 000000000..c54e53ad4 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/resources/org/apache/dolphinscheduler/plugin/task/tis/config.properties @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +job.trigger.url=http://%s/tjs/coredefine/coredefine.ajax +job.trigger.post.body=action=datax_action&emethod=trigger_fullbuild_task + +job.cancel.post.body=action=core_action&event_submit_do_cancel_task=y&taskid=%s + +job.status.url=http://%s/tjs/config/config.ajax?action=collection_action&emethod=get_task_status +job.status.post.body={\n taskid: %s\n, log: false } + +job.logs.fetch.url=ws://%s/tjs/download/logfeedback?logtype=full&collection=%s&taskid=%s \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.java index 1789a071d..814f401db 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.tis; -import static com.github.dreamhead.moco.Moco.pathResource; +import static com.github.dreamhead.moco.Moco.file; import static com.github.dreamhead.moco.MocoJsonRunner.jsonHttpServer; import static com.github.dreamhead.moco.Runner.running; @@ -28,11 +28,17 @@ import org.apache.commons.io.IOUtils; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Date; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,19 +52,11 @@ public class TISTaskTest { @Before public void before() throws Exception { - /* - TaskProps props = new TaskProps(); - props.setExecutePath("/tmp"); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTaskInstanceId(1); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setTaskParams("{\"targetJobName\":\"mysql_elastic\"}"); + + String taskParams = "{\"targetJobName\":\"mysql_elastic\"}"; taskExecutionContext = Mockito.mock(TaskRequest.class); - Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); + Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams); Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString()); Mockito.when(taskExecutionContext.getTenantCode()).thenReturn("root"); @@ -71,22 +69,28 @@ public class TISTaskTest { Mockito.when(taskExecutionContext.getDefinedParams()).thenReturn(gloabParams); tisTask = PowerMockito.spy(new TISTask(taskExecutionContext)); - tisTask.init();*/ + //tisTask = new TISTask(taskExecutionContext); + tisTask.init(); } - /** - * Method: DataxTask() - */ @Test - public void testDataxTask() { - /* throws Exception { - TaskProps props = new TaskProps(); - props.setExecutePath("/tmp"); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTaskInstanceId(1); - props.setTenantCode("1"); - Assert.assertNotNull(new TISTask(null, logger));*/ + public void testGetTISConfigParams() { + TISConfig cfg = TISConfig.getInstance(); + String tisHost = "127.0.0.1:8080"; + Assert.assertEquals("http://127.0.0.1:8080/tjs/coredefine/coredefine.ajax", cfg.getJobTriggerUrl(tisHost)); + String jobName = "mysql_elastic"; + int taskId = 123; + Assert.assertEquals("ws://" + tisHost + "/tjs/download/logfeedback?logtype=full&collection=mysql_elastic&taskid=" + taskId + , cfg.getJobLogsFetchUrl(tisHost, jobName, taskId)); + + Assert.assertEquals("action=datax_action&emethod=trigger_fullbuild_task", cfg.getJobTriggerPostBody()); + + Assert.assertEquals("http://127.0.0.1:8080/tjs/config/config.ajax?action=collection_action&emethod=get_task_status", cfg.getJobStatusUrl(tisHost)); + + Assert.assertEquals("{\n taskid: " + taskId + "\n, log: false }", cfg.getJobStatusPostBody(taskId)); + + Assert.assertEquals("action=core_action&event_submit_do_cancel_task=y&taskid=" + taskId, cfg.getJobCancelPostBody(taskId)); } @Test @@ -102,7 +106,7 @@ public class TISTaskTest { @Test public void testHandle() throws Exception { - HttpServer server = jsonHttpServer(8080, pathResource("org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.json")); + HttpServer server = jsonHttpServer(8080, file("src/test/resources/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.json")); running(server, () -> { tisTask.handle(); @@ -122,14 +126,14 @@ public class TISTaskTest { } } - @Test - public void testCancelApplication() - throws Exception { - try { - tisTask.cancelApplication(true); - } catch (Exception e) { - Assert.fail(e.getMessage()); - } - } + // @Test + // public void testCancelApplication() + // throws Exception { + // try { + // tisTask.cancelApplication(true); + // } catch (Exception e) { + // Assert.fail(e.getMessage()); + // } + // } } diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index 052f8f015..e45ed021a 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -40,6 +40,7 @@ dolphinscheduler-task-sql dolphinscheduler-task-sqoop dolphinscheduler-task-procedure + dolphinscheduler-task-tis diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/taskbar.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/taskbar.scss index 35a811f44..efabbedad 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/taskbar.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/taskbar.scss @@ -93,6 +93,9 @@ &.icos-datax { background-image: url("../images/task-icos/datax.png"); } + &.icos-tis { + background-image: url("../images/task-icos/tis.png"); + } &.icos-sqoop { background-image: url("../images/task-icos/sqoop.png"); } @@ -153,6 +156,9 @@ &.icos-datax { background-image: url("../images/task-icos/datax_hover.png"); } + &.icos-tis { + background-image: url("../images/task-icos/tis_hover.png"); + } &.icos-sqoop { background-image: url("../images/task-icos/sqoop_hover.png"); } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 911e450af..f4692c642 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -362,6 +362,13 @@ :backfill-item="backfillItem" > + + 1.7.2 0.9.12 1.9.16 + 1.5.1 + + org.java-websocket + Java-WebSocket + ${java-websocket.version} + com.baomidou mybatis-plus-boot-starter @@ -1209,7 +1215,7 @@ dolphinscheduler-alert-plugin dolphinscheduler-registry-plugin dolphinscheduler-task-plugin - dolphinscheduler-ui + dolphinscheduler-server dolphinscheduler-common dolphinscheduler-api diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 352ad3391..8a77f51fe 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -251,4 +251,5 @@ xml-apis-1.4.01.jar xmlbeans-3.1.0.jar xmlenc-0.52.jar xz-1.0.jar -zookeeper-3.4.14.jar \ No newline at end of file +zookeeper-3.4.14.jar +Java-WebSocket-1.5.1.jar \ No newline at end of file