diff --git a/.asf.yaml b/.asf.yaml index b6ed2e7ce..2eca3ad1b 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -16,10 +16,7 @@ # github: - description: | - Apache DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG - visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing - various types of jobs available `out of the box`. + description: Apache DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing various types of jobs available out of box. homepage: https://dolphinscheduler.apache.org/ labels: - airflow diff --git a/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini b/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini index c8c4e126c..19166f48d 100644 --- a/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini +++ b/docker/build/conf/dolphinscheduler/supervisor/supervisor.ini @@ -90,3 +90,18 @@ killasgroup=true redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 + +[program:standalone] +command=%(ENV_DOLPHINSCHEDULER_BIN)s/dolphinscheduler-daemon.sh start standalone-server +directory=%(ENV_DOLPHINSCHEDULER_HOME)s +priority=999 +autostart=%(ENV_STANDALONE_START_ENABLED)s +autorestart=true +startsecs=5 +stopwaitsecs=3 +exitcodes=0 +stopasgroup=true +killasgroup=true +redirect_stderr=true +stdout_logfile=/dev/fd/1 +stdout_logfile_maxbytes=0 diff --git a/docker/build/startup.sh b/docker/build/startup.sh index ae1ed3677..7f3b7d0d2 100755 --- a/docker/build/startup.sh +++ b/docker/build/startup.sh @@ -24,6 +24,7 @@ export WORKER_START_ENABLED=false export API_START_ENABLED=false export ALERT_START_ENABLED=false export LOGGER_START_ENABLED=false +export STANDALONE_START_ENABLED=false # wait database waitDatabase() { @@ -67,12 +68,13 @@ waitZK() { printUsage() { echo -e "Dolphin Scheduler is a distributed and easy-to-expand visual DAG workflow scheduling system," echo -e "dedicated to solving the complex dependencies in data processing, making the scheduling system out of the box for data processing.\n" - echo -e "Usage: [ all | master-server | worker-server | api-server | alert-server ]\n" - printf "%-13s: %s\n" "all" "Run master-server, worker-server, api-server and alert-server" - printf "%-13s: %s\n" "master-server" "MasterServer is mainly responsible for DAG task split, task submission monitoring." - printf "%-13s: %s\n" "worker-server" "WorkerServer is mainly responsible for task execution and providing log services." - printf "%-13s: %s\n" "api-server" "ApiServer is mainly responsible for processing requests and providing the front-end UI layer." - printf "%-13s: %s\n" "alert-server" "AlertServer mainly include Alarms." + echo -e "Usage: [ all | master-server | worker-server | api-server | alert-server | standalone-server ]\n" + printf "%-13s: %s\n" "all" "Run master-server, worker-server, api-server and alert-server" + printf "%-13s: %s\n" "master-server" "MasterServer is mainly responsible for DAG task split, task submission monitoring." + printf "%-13s: %s\n" "worker-server" "WorkerServer is mainly responsible for task execution and providing log services." + printf "%-13s: %s\n" "api-server" "ApiServer is mainly responsible for processing requests and providing the front-end UI layer." + printf "%-13s: %s\n" "alert-server" "AlertServer mainly include Alarms." + printf "%-13s: %s\n" "standalone-server" "Standalone server that uses embedded zookeeper and database, only for testing and demostration." } # init config file @@ -110,6 +112,9 @@ case "$1" in waitDatabase export ALERT_START_ENABLED=true ;; + (standalone-server) + export STANDALONE_START_ENABLED=true + ;; (help) printUsage exit 1 diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/MailSender.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/MailSender.java index 7afdf862c..33701de7b 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/MailSender.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/MailSender.java @@ -80,7 +80,7 @@ public class MailSender { private String sslTrust; private String showType; private AlertTemplate alertTemplate; - private String mustNotNull = "must not be null"; + private String mustNotNull = " must not be null"; public MailSender(Map config) { diff --git a/dolphinscheduler-alert/pom.xml b/dolphinscheduler-alert/pom.xml index cf586c38d..e695af523 100644 --- a/dolphinscheduler-alert/pom.xml +++ b/dolphinscheduler-alert/pom.xml @@ -72,8 +72,13 @@ com.google.guava guava + + + jsr305 + com.google.code.findbugs + + - ch.qos.logback logback-classic diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index b76cdb710..b0a8c0348 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -24,8 +24,6 @@ import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.processor.AlertRequestProcessor; import org.apache.dolphinscheduler.alert.runner.AlertSender; import org.apache.dolphinscheduler.alert.utils.Constants; -import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader; -import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.AlertDao; @@ -35,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader; +import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig; import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.util.List; @@ -44,45 +44,29 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; -/** - * alert of start - */ public class AlertServer { private static final Logger logger = LoggerFactory.getLogger(AlertServer.class); - /** - * Plugin Dao - */ - private PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class); + private final PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class); - /** - * Alert Dao - */ - private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); - - private AlertSender alertSender; + private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); private AlertPluginManager alertPluginManager; - private DolphinPluginManagerConfig alertPluginManagerConfig; - public static final String ALERT_PLUGIN_BINDING = "alert.plugin.binding"; public static final String ALERT_PLUGIN_DIR = "alert.plugin.dir"; public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository"; - /** - * netty server - */ private NettyRemotingServer server; private static class AlertServerHolder { private static final AlertServer INSTANCE = new AlertServer(); } - public static final AlertServer getInstance() { + public static AlertServer getInstance() { return AlertServerHolder.INSTANCE; } @@ -98,8 +82,7 @@ public class AlertServer { } private void initPlugin() { - alertPluginManager = new AlertPluginManager(); - alertPluginManagerConfig = new DolphinPluginManagerConfig(); + DolphinPluginManagerConfig alertPluginManagerConfig = new DolphinPluginManagerConfig(); alertPluginManagerConfig.setPlugins(PropertyUtils.getString(ALERT_PLUGIN_BINDING)); if (StringUtils.isNotBlank(PropertyUtils.getString(ALERT_PLUGIN_DIR))) { alertPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(ALERT_PLUGIN_DIR, Constants.ALERT_PLUGIN_PATH).trim()); @@ -109,6 +92,7 @@ public class AlertServer { alertPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim()); } + alertPluginManager = new AlertPluginManager(); DolphinPluginLoader alertPluginLoader = new DolphinPluginLoader(alertPluginManagerConfig, ImmutableList.of(alertPluginManager)); try { alertPluginLoader.loadPlugins(); @@ -117,9 +101,6 @@ public class AlertServer { } } - /** - * init netty remoting server - */ private void initRemoteServer() { NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(ALERT_RPC_PORT); @@ -128,30 +109,10 @@ public class AlertServer { this.server.start(); } - /** - * Cyclic alert info sending alert - */ private void runSender() { - while (Stopper.isRunning()) { - try { - Thread.sleep(Constants.ALERT_SCAN_INTERVAL); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - Thread.currentThread().interrupt(); - } - if (alertPluginManager == null || alertPluginManager.getAlertChannelMap().size() == 0) { - logger.warn("No Alert Plugin . Cannot send alert info. "); - } else { - List alerts = alertDao.listWaitExecutionAlert(); - alertSender = new AlertSender(alerts, alertDao, alertPluginManager); - alertSender.run(); - } - } + new Thread(new Sender()).start(); } - /** - * start - */ public void start() { PropertyUtils.loadPropertyFile(ALERT_PROPERTIES_PATH); checkTable(); @@ -161,23 +122,35 @@ public class AlertServer { runSender(); } - /** - * stop - */ public void stop() { this.server.close(); logger.info("alert server shut down"); } + final class Sender implements Runnable { + @Override + public void run() { + while (Stopper.isRunning()) { + try { + Thread.sleep(Constants.ALERT_SCAN_INTERVAL); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } + if (alertPluginManager == null || alertPluginManager.getAlertChannelMap().size() == 0) { + logger.warn("No Alert Plugin . Cannot send alert info. "); + } else { + List alerts = alertDao.listWaitExecutionAlert(); + new AlertSender(alerts, alertDao, alertPluginManager).run(); + } + } + } + } + public static void main(String[] args) { AlertServer alertServer = AlertServer.getInstance(); alertServer.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - alertServer.stop(); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(alertServer::stop)); } } diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java index 4fbe2bd91..02f4b0ff8 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java @@ -56,7 +56,7 @@ public class AlertPluginManager extends AbstractDolphinPluginManager { */ private final Map pluginDefineMap = new HashMap<>(); - private PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class); + private final PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class); private void addAlertChannelFactory(AlertChannelFactory alertChannelFactory) { requireNonNull(alertChannelFactory, "alertChannelFactory is null"); diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java index ec716d987..e576d0049 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java @@ -33,14 +33,11 @@ import org.slf4j.LoggerFactory; import io.netty.channel.Channel; -/** - * alert request processor - */ public class AlertRequestProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class); - private AlertDao alertDao; - private AlertPluginManager alertPluginManager; + private final AlertDao alertDao; + private final AlertPluginManager alertPluginManager; public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager) { this.alertDao = alertDao; diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java index 114d01a84..d7bcc2c95 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java @@ -38,16 +38,13 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * alert sender - */ public class AlertSender { private static final Logger logger = LoggerFactory.getLogger(AlertSender.class); private List alertList; private AlertDao alertDao; - private AlertPluginManager alertPluginManager; + private final AlertPluginManager alertPluginManager; public AlertSender(AlertPluginManager alertPluginManager) { this.alertPluginManager = alertPluginManager; diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/FuncUtilsTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/FuncUtilsTest.java deleted file mode 100644 index 818fac98b..000000000 --- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/FuncUtilsTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.alert.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.util.Arrays; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FuncUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(FuncUtilsTest.class); - - /** - * Test mkString - */ - @Test - public void testMKString() { - - //Define users list - Iterable users = Arrays.asList("user1", "user2", "user3"); - //Define split - String split = "|"; - - //Invoke mkString with correctParams - String result = FuncUtils.mkString(users, split); - logger.info(result); - - //Expected result string - assertEquals("user1|user2|user3", result); - - //Null list expected return null - result = FuncUtils.mkString(null, split); - assertNull(result); - - //Null split expected return null - result = FuncUtils.mkString(users, null); - assertNull(result); - - } -} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/EnvironmentController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/EnvironmentController.java new file mode 100644 index 000000000..79bebb745 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/EnvironmentController.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_ENVIRONMENT_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_ENVIRONMENT_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_ENVIRONMENT_BY_CODE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_ENVIRONMENT_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_ENVIRONMENT_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_ENVIRONMENT_ERROR; + +import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.EnvironmentService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import springfox.documentation.annotations.ApiIgnore; + +/** + * environment controller + */ +@Api(tags = "ENVIRONMENT_TAG") +@RestController +@RequestMapping("environment") +public class EnvironmentController extends BaseController { + + @Autowired + private EnvironmentService environmentService; + + /** + * create environment + * + * @param loginUser login user + * @param name environment name + * @param config config + * @param description description + * @return returns an error if it exists + */ + @ApiOperation(value = "createEnvironment", notes = "CREATE_ENVIRONMENT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "name", value = "ENVIRONMENT_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "config", value = "CONFIG", required = true, dataType = "String"), + @ApiImplicitParam(name = "description", value = "ENVIRONMENT_DESC", dataType = "String"), + @ApiImplicitParam(name = "workerGroups", value = "WORKER_GROUP_LIST", dataType = "String") + }) + @PostMapping(value = "/create") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(CREATE_ENVIRONMENT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result createProject(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("name") String name, + @RequestParam("config") String config, + @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "workerGroups", required = false) String workerGroups) { + + Map result = environmentService.createEnvironment(loginUser, name, config, description, workerGroups); + return returnDataList(result); + } + + /** + * update environment + * + * @param loginUser login user + * @param code environment code + * @param name environment name + * @param config environment config + * @param description description + * @return update result code + */ + @ApiOperation(value = "updateEnvironment", notes = "UPDATE_ENVIRONMENT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "code", value = "ENVIRONMENT_CODE", required = true, dataType = "Long", example = "100"), + @ApiImplicitParam(name = "name", value = "ENVIRONMENT_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "config", value = "ENVIRONMENT_CONFIG", required = true, dataType = "String"), + @ApiImplicitParam(name = "description", value = "ENVIRONMENT_DESC", dataType = "String"), + @ApiImplicitParam(name = "workerGroups", value = "WORKER_GROUP_LIST", dataType = "String") + }) + @PostMapping(value = "/update") + @ResponseStatus(HttpStatus.OK) + @ApiException(UPDATE_ENVIRONMENT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result updateEnvironment(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("code") Long code, + @RequestParam("name") String name, + @RequestParam("config") String config, + @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "workerGroups", required = false) String workerGroups) { + Map result = environmentService.updateEnvironmentByCode(loginUser, code, name, config, description, workerGroups); + return returnDataList(result); + } + + /** + * query environment details by code + * + * @param environmentCode environment code + * @return environment detail information + */ + @ApiOperation(value = "queryEnvironmentByCode", notes = "QUERY_ENVIRONMENT_BY_CODE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", required = true, dataType = "Long", example = "100") + }) + @GetMapping(value = "/query-by-code") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_ENVIRONMENT_BY_CODE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryEnvironmentByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("environmentCode") Long environmentCode) { + + Map result = environmentService.queryEnvironmentByCode(environmentCode); + return returnDataList(result); + } + + /** + * query environment list paging + * + * @param searchVal search value + * @param pageSize page size + * @param pageNo page number + * @return environment list which the login user have permission to see + */ + @ApiOperation(value = "queryEnvironmentListPaging", notes = "QUERY_ENVIRONMENT_LIST_PAGING_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"), + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1") + }) + @GetMapping(value = "/list-paging") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_ENVIRONMENT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryEnvironmentListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "searchVal", required = false) String searchVal, + @RequestParam("pageSize") Integer pageSize, + @RequestParam("pageNo") Integer pageNo + ) { + + Result result = checkPageParams(pageNo, pageSize); + if (!result.checkResult()) { + return result; + } + searchVal = ParameterUtils.handleEscapes(searchVal); + result = environmentService.queryEnvironmentListPaging(pageNo, pageSize, searchVal); + return result; + } + + /** + * delete environment by code + * + * @param loginUser login user + * @param environmentCode environment code + * @return delete result code + */ + @ApiOperation(value = "deleteEnvironmentByCode", notes = "DELETE_ENVIRONMENT_BY_CODE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", required = true, dataType = "Long", example = "100") + }) + @PostMapping(value = "/delete") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_ENVIRONMENT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result deleteEnvironment(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("environmentCode") Long environmentCode + ) { + + Map result = environmentService.deleteEnvironmentByCode(loginUser, environmentCode); + return returnDataList(result); + } + + /** + * query all environment list + * + * @param loginUser login user + * @return all environment list + */ + @ApiOperation(value = "queryAllEnvironmentList", notes = "QUERY_ALL_ENVIRONMENT_LIST_NOTES") + @GetMapping(value = "/query-environment-list") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_ENVIRONMENT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryAllEnvironmentList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) { + Map result = environmentService.queryAllEnvironmentList(); + return returnDataList(result); + } + + /** + * verify environment and environment name + * + * @param loginUser login user + * @param environmentName environment name + * @return true if the environment name not exists, otherwise return false + */ + @ApiOperation(value = "verifyEnvironment", notes = "VERIFY_ENVIRONMENT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "environmentName", value = "ENVIRONMENT_NAME", required = true, dataType = "String") + }) + @PostMapping(value = "/verify-environment") + @ResponseStatus(HttpStatus.OK) + @ApiException(VERIFY_ENVIRONMENT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result verifyEnvironment(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "environmentName") String environmentName + ) { + Map result = environmentService.verifyEnvironment(environmentName); + return returnDataList(result); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 87a70428d..e6159369e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -99,8 +99,9 @@ public class ExecutorController extends BaseController { @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "default"), @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"), + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8") }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -119,6 +120,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, + @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode, @RequestParam(value = "timeout", required = false) Integer timeout, @RequestParam(value = "startParams", required = false) String startParams, @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber @@ -133,7 +135,7 @@ public class ExecutorController extends BaseController { } Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, - warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap, expectedParallelismNumber); + warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber); return returnDataList(result); } @@ -149,8 +151,8 @@ public class ExecutorController extends BaseController { */ @ApiOperation(value = "execute", notes = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) @@ -174,7 +176,7 @@ public class ExecutorController extends BaseController { */ @ApiOperation(value = "startCheckProcessDefinition", notes = "START_CHECK_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") }) @PostMapping(value = "/start-check") @ResponseStatus(HttpStatus.OK) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java index 051889477..7ced43d3e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java @@ -74,7 +74,6 @@ public class SchedulerController extends BaseController { @Autowired private SchedulerService schedulerService; - /** * create schedule * @@ -91,15 +90,16 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "createSchedule", notes = "CREATE_SCHEDULE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "schedule", value = "SCHEDULE", required = true, dataType = "String", - example = "{'startTime':'2019-06-10 00:00:00','endTime':'2019-06-13 00:00:00','timezoneId':'America/Phoenix','crontab':'0 0 3/6 * * ? *'}"), - @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", type = "WarningType"), - @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", type = "FailureStrategy"), - @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String"), - @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", type = "Priority"), + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "schedule", value = "SCHEDULE", required = true, dataType = "String", + example = "{'startTime':'2019-06-10 00:00:00','endTime':'2019-06-13 00:00:00','timezoneId':'America/Phoenix','crontab':'0 0 3/6 * * ? *'}"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", type = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", type = "FailureStrategy"), + @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String"), + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", type = "Priority"), }) @PostMapping("/create") @ResponseStatus(HttpStatus.CREATED) @@ -113,9 +113,10 @@ public class SchedulerController extends BaseController { @RequestParam(value = "warningGroupId", required = false, defaultValue = DEFAULT_NOTIFY_GROUP_ID) int warningGroupId, @RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, + @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode, @RequestParam(value = "processInstancePriority", required = false, defaultValue = DEFAULT_PROCESS_INSTANCE_PRIORITY) Priority processInstancePriority) { Map result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule, - warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup); + warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode); return returnDataList(result); } @@ -136,16 +137,17 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "updateSchedule", notes = "UPDATE_SCHEDULE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "schedule", value = "SCHEDULE", required = true, dataType = "String", - example = "{'startTime':'2019-06-10 00:00:00','endTime':'2019-06-13 00:00:00'," - + "'crontab':'0 0 3/6 * * ? *'}"), - @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", type = "WarningType"), - @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", type = "FailureStrategy"), - @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int", example = "100"), - @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String"), - @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", type = "Priority") + @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "schedule", value = "SCHEDULE", required = true, dataType = "String", + example = "{'startTime':'2019-06-10 00:00:00','endTime':'2019-06-13 00:00:00'," + + "'crontab':'0 0 3/6 * * ? *'}"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", type = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", type = "FailureStrategy"), + @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String"), + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", type = "Priority") }) @PostMapping("/update") @ApiException(UPDATE_SCHEDULE_ERROR) @@ -158,10 +160,11 @@ public class SchedulerController extends BaseController { @RequestParam(value = "warningGroupId", required = false) int warningGroupId, @RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, + @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { Map result = schedulerService.updateSchedule(loginUser, projectName, id, schedule, - warningType, warningGroupId, failureStrategy, null, processInstancePriority, workerGroup); + warningType, warningGroupId, failureStrategy, null, processInstancePriority, workerGroup, environmentCode); return returnDataList(result); } @@ -175,7 +178,7 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "online", notes = "ONLINE_SCHEDULE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100") + @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100") }) @PostMapping("/online") @ApiException(PUBLISH_SCHEDULE_ONLINE_ERROR) @@ -197,7 +200,7 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "offline", notes = "OFFLINE_SCHEDULE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100") + @ApiImplicitParam(name = "id", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100") }) @PostMapping("/offline") @ApiException(OFFLINE_SCHEDULE_ERROR) @@ -223,10 +226,10 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "queryScheduleListPaging", notes = "QUERY_SCHEDULE_LIST_PAGING_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"), - @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100") + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"), + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100") }) @GetMapping("/list-paging") @@ -257,8 +260,8 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "deleteScheduleById", notes = "OFFLINE_SCHEDULE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "scheduleId", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "projectName", value = "PROJECT_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "scheduleId", value = "SCHEDULE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "projectName", value = "PROJECT_NAME", required = true, dataType = "String"), }) @GetMapping(value = "/delete") @ResponseStatus(HttpStatus.OK) @@ -299,9 +302,9 @@ public class SchedulerController extends BaseController { */ @ApiOperation(value = "previewSchedule", notes = "PREVIEW_SCHEDULE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "schedule", value = "SCHEDULE", required = true, dataType = "String", - example = "{'startTime':'2019-06-10 00:00:00'," - + "'endTime':'2019-06-13 00:00:00','crontab':'0 0 3/6 * * ? *'}"), + @ApiImplicitParam(name = "schedule", value = "SCHEDULE", required = true, dataType = "String", + example = "{'startTime':'2019-06-10 00:00:00'," + + "'endTime':'2019-06-13 00:00:00','crontab':'0 0 3/6 * * ? *'}"), }) @PostMapping("/preview") @ResponseStatus(HttpStatus.CREATED) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/EnvironmentDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/EnvironmentDto.java new file mode 100644 index 000000000..a89d34fe4 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/EnvironmentDto.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto; + +import java.util.Date; +import java.util.List; + +/** + * EnvironmentDto + */ +public class EnvironmentDto { + + private int id; + + /** + * environment code + */ + private Long code; + + /** + * environment name + */ + private String name; + + /** + * config content + */ + private String config; + + private String description; + + private List workerGroups; + + /** + * operator user id + */ + private Integer operator; + + private Date createTime; + + private Date updateTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Long getCode() { + return this.code; + } + + public void setCode(Long code) { + this.code = code; + } + + public String getConfig() { + return this.config; + } + + public void setConfig(String config) { + this.config = config; + } + + public String getDescription() { + return this.description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Integer getOperator() { + return this.operator; + } + + public void setOperator(Integer operator) { + this.operator = operator; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + public List getWorkerGroups() { + return workerGroups; + } + + public void setWorkerGroups(List workerGroups) { + this.workerGroups = workerGroups; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 4c7d25efc..04446b00a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -310,7 +310,22 @@ public enum Status { LIST_PAGING_ALERT_PLUGIN_INSTANCE_ERROR(110011, "query plugin instance page error", "分页查询告警实例失败"), DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED(110012, "failed to delete the alert instance, there is an alarm group associated with this alert instance", "删除告警实例失败,存在与此告警实例关联的警报组"), - PROCESS_DEFINITION_VERSION_IS_USED(110013,"this process definition version is used","此工作流定义版本被使用"); + PROCESS_DEFINITION_VERSION_IS_USED(110013,"this process definition version is used","此工作流定义版本被使用"), + + CREATE_ENVIRONMENT_ERROR(120001, "create environment error", "创建环境失败"), + ENVIRONMENT_NAME_EXISTS(120002,"this enviroment name [{0}] already exists","环境名称[{0}]已经存在"), + ENVIRONMENT_NAME_IS_NULL(120003,"this enviroment name shouldn't be empty.","环境名称不能为空"), + ENVIRONMENT_CONFIG_IS_NULL(120004,"this enviroment config shouldn't be empty.","环境配置信息不能为空"), + UPDATE_ENVIRONMENT_ERROR(120005, "update environment [{0}] info error", "更新环境[{0}]信息失败"), + DELETE_ENVIRONMENT_ERROR(120006, "delete environment error", "删除环境信息失败"), + DELETE_ENVIRONMENT_RELATED_TASK_EXISTS(120007, "this environment has been used in tasks,so you can't delete it.", "该环境已经被任务使用,所以不能删除该环境信息"), + QUERY_ENVIRONMENT_BY_NAME_ERROR(1200008, "not found environment [{0}] ", "查询环境名称[{0}]信息不存在"), + QUERY_ENVIRONMENT_BY_CODE_ERROR(1200009, "not found environment [{0}] ", "查询环境编码[{0}]不存在"), + QUERY_ENVIRONMENT_ERROR(1200010, "login user query environment error", "分页查询环境列表错误"), + VERIFY_ENVIRONMENT_ERROR(1200011, "verify environment error", "验证环境信息错误"), + ENVIRONMENT_WORKER_GROUPS_IS_INVALID(1200012, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"), + UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(1200013,"You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]", + "您不能修改工作组选项,因为该工作组 [{0}] 和 该环境 [{1}] 已经被用在任务 [{2}] 中"); private final int code; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/EnvironmentService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/EnvironmentService.java new file mode 100644 index 000000000..5702980bf --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/EnvironmentService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; + +/** + * environment service + */ +public interface EnvironmentService { + + /** + * create environment + * + * @param loginUser login user + * @param name environment name + * @param config environment config + * @param desc environment desc + * @param workerGroups worker groups + */ + Map createEnvironment(User loginUser, String name, String config, String desc, String workerGroups); + + /** + * query environment + * + * @param name environment name + */ + Map queryEnvironmentByName(String name); + + /** + * query environment + * + * @param code environment code + */ + Map queryEnvironmentByCode(Long code); + + + /** + * delete environment + * + * @param loginUser login user + * @param code environment code + */ + Map deleteEnvironmentByCode(User loginUser, Long code); + + /** + * update environment + * + * @param loginUser login user + * @param code environment code + * @param name environment name + * @param config environment config + * @param desc environment desc + * @param workerGroups worker groups + */ + Map updateEnvironmentByCode(User loginUser, Long code, String name, String config, String desc, String workerGroups); + + /** + * query environment paging + * + * @param pageNo page number + * @param searchVal search value + * @param pageSize page size + * @return environment list page + */ + Result queryEnvironmentListPaging(Integer pageNo, Integer pageSize, String searchVal); + + /** + * query all environment + * + * @return all environment list + */ + Map queryAllEnvironmentList(); + + /** + * verify environment name + * + * @param environmentName environment name + * @return true if the environment name not exists, otherwise return false + */ + Map verifyEnvironment(String environmentName); + +} + diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/EnvironmentWorkerGroupRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/EnvironmentWorkerGroupRelationService.java new file mode 100644 index 000000000..9db770158 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/EnvironmentWorkerGroupRelationService.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import java.util.Map; + +/** + * environment worker group relation service + */ +public interface EnvironmentWorkerGroupRelationService { + + /** + * query environment worker group relation + * + * @param environmentCode environment code + */ + Map queryEnvironmentWorkerGroupRelation(Long environmentCode); + + /** + * query all environment worker group relation + * + * @return all relation list + */ + Map queryAllEnvironmentWorkerGroupRelationList(); +} + diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 910f2235a..323fa7a23 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -49,6 +49,7 @@ public interface ExecutorService { * @param warningGroupId notify group id * @param processInstancePriority process instance priority * @param workerGroup worker group name + * @param environmentCode environment code * @param runMode run mode * @param timeout timeout * @param startParams the global param values which pass to new process instance @@ -60,7 +61,7 @@ public interface ExecutorService { FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, - Priority processInstancePriority, String workerGroup, Integer timeout, + Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber); /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index af9714167..d8902b156 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -43,6 +43,7 @@ public interface SchedulerService { * @param failureStrategy failure strategy * @param processInstancePriority process instance priority * @param workerGroup worker group + * @param environmentCode environment code * @return create result code */ Map insertSchedule(User loginUser, String projectName, @@ -52,7 +53,8 @@ public interface SchedulerService { int warningGroupId, FailureStrategy failureStrategy, Priority processInstancePriority, - String workerGroup); + String workerGroup, + Long environmentCode); /** * updateProcessInstance schedule @@ -65,6 +67,7 @@ public interface SchedulerService { * @param warningGroupId warning group id * @param failureStrategy failure strategy * @param workerGroup worker group + * @param environmentCode environment code * @param processInstancePriority process instance priority * @param scheduleStatus schedule status * @return update result code @@ -78,7 +81,8 @@ public interface SchedulerService { FailureStrategy failureStrategy, ReleaseState scheduleStatus, Priority processInstancePriority, - String workerGroup); + String workerGroup, + Long environmentCode); /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java new file mode 100644 index 000000000..f0310a1bf --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.dto.EnvironmentDto; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.EnvironmentService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Environment; +import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; + +import org.apache.commons.collections4.SetUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.core.type.TypeReference; + +/** + * task definition service impl + */ +@Service +public class EnvironmentServiceImpl extends BaseServiceImpl implements EnvironmentService { + + private static final Logger logger = LoggerFactory.getLogger(EnvironmentServiceImpl.class); + + @Autowired + private EnvironmentMapper environmentMapper; + + @Autowired + private EnvironmentWorkerGroupRelationMapper relationMapper; + + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + + /** + * create environment + * + * @param loginUser login user + * @param name environment name + * @param config environment config + * @param desc environment desc + * @param workerGroups worker groups + */ + @Transactional(rollbackFor = RuntimeException.class) + @Override + public Map createEnvironment(User loginUser, String name, String config, String desc, String workerGroups) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + Map checkResult = checkParams(name,config,workerGroups); + if (checkResult.get(Constants.STATUS) != Status.SUCCESS) { + return checkResult; + } + + Environment environment = environmentMapper.queryByEnvironmentName(name); + if (environment != null) { + putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, name); + return result; + } + + Environment env = new Environment(); + env.setName(name); + env.setConfig(config); + env.setDescription(desc); + env.setOperator(loginUser.getId()); + env.setCreateTime(new Date()); + env.setUpdateTime(new Date()); + long code = 0L; + try { + code = SnowFlakeUtils.getInstance().nextId(); + env.setCode(code); + } catch (SnowFlakeException e) { + logger.error("Environment code get error, ", e); + } + if (code == 0L) { + putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating environment code"); + return result; + } + + if (environmentMapper.insert(env) > 0) { + if (StringUtils.isNotEmpty(workerGroups)) { + List workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference>(){}); + if (CollectionUtils.isNotEmpty(workerGroupList)) { + workerGroupList.stream().forEach(workerGroup -> { + if (StringUtils.isNotEmpty(workerGroup)) { + EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); + relation.setEnvironmentCode(env.getCode()); + relation.setWorkerGroup(workerGroup); + relation.setOperator(loginUser.getId()); + relation.setCreateTime(new Date()); + relation.setUpdateTime(new Date()); + relationMapper.insert(relation); + } + }); + } + } + result.put(Constants.DATA_LIST, env.getCode()); + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.CREATE_ENVIRONMENT_ERROR); + } + return result; + } + + /** + * query environment paging + * + * @param pageNo page number + * @param searchVal search value + * @param pageSize page size + * @return environment list page + */ + @Override + public Result queryEnvironmentListPaging(Integer pageNo, Integer pageSize, String searchVal) { + Result result = new Result(); + + Page page = new Page<>(pageNo, pageSize); + + IPage environmentIPage = environmentMapper.queryEnvironmentListPaging(page, searchVal); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotal((int) environmentIPage.getTotal()); + + if (CollectionUtils.isNotEmpty(environmentIPage.getRecords())) { + Map> relationMap = relationMapper.selectList(null).stream() + .collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup,Collectors.toList()))); + + List dtoList = environmentIPage.getRecords().stream().map(environment -> { + EnvironmentDto dto = new EnvironmentDto(); + BeanUtils.copyProperties(environment,dto); + List workerGroups = relationMap.getOrDefault(environment.getCode(),new ArrayList()); + dto.setWorkerGroups(workerGroups); + return dto; + }).collect(Collectors.toList()); + + pageInfo.setTotalList(dtoList); + } else { + pageInfo.setTotalList(new ArrayList<>()); + } + + result.setData(pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query all environment + * + * @return all environment list + */ + @Override + public Map queryAllEnvironmentList() { + Map result = new HashMap<>(); + List environmentList = environmentMapper.queryAllEnvironmentList(); + + if (CollectionUtils.isNotEmpty(environmentList)) { + Map> relationMap = relationMapper.selectList(null).stream() + .collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup,Collectors.toList()))); + + List dtoList = environmentList.stream().map(environment -> { + EnvironmentDto dto = new EnvironmentDto(); + BeanUtils.copyProperties(environment,dto); + List workerGroups = relationMap.getOrDefault(environment.getCode(),new ArrayList()); + dto.setWorkerGroups(workerGroups); + return dto; + }).collect(Collectors.toList()); + result.put(Constants.DATA_LIST,dtoList); + } else { + result.put(Constants.DATA_LIST, new ArrayList<>()); + } + + putMsg(result,Status.SUCCESS); + return result; + } + + /** + * query environment + * + * @param code environment code + */ + @Override + public Map queryEnvironmentByCode(Long code) { + Map result = new HashMap<>(); + + Environment env = environmentMapper.queryByEnvironmentCode(code); + + if (env == null) { + putMsg(result, Status.QUERY_ENVIRONMENT_BY_CODE_ERROR, code); + } else { + List workerGroups = relationMapper.queryByEnvironmentCode(env.getCode()).stream() + .map(item -> item.getWorkerGroup()) + .collect(Collectors.toList()); + + EnvironmentDto dto = new EnvironmentDto(); + BeanUtils.copyProperties(env,dto); + dto.setWorkerGroups(workerGroups); + result.put(Constants.DATA_LIST, dto); + putMsg(result, Status.SUCCESS); + } + return result; + } + + /** + * query environment + * + * @param name environment name + */ + @Override + public Map queryEnvironmentByName(String name) { + Map result = new HashMap<>(); + + Environment env = environmentMapper.queryByEnvironmentName(name); + if (env == null) { + putMsg(result, Status.QUERY_ENVIRONMENT_BY_NAME_ERROR, name); + } else { + List workerGroups = relationMapper.queryByEnvironmentCode(env.getCode()).stream() + .map(item -> item.getWorkerGroup()) + .collect(Collectors.toList()); + + EnvironmentDto dto = new EnvironmentDto(); + BeanUtils.copyProperties(env,dto); + dto.setWorkerGroups(workerGroups); + result.put(Constants.DATA_LIST, dto); + putMsg(result, Status.SUCCESS); + } + return result; + } + + /** + * delete environment + * + * @param loginUser login user + * @param code environment code + */ + @Transactional(rollbackFor = RuntimeException.class) + @Override + public Map deleteEnvironmentByCode(User loginUser, Long code) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + Integer relatedTaskNumber = taskDefinitionMapper + .selectCount(new QueryWrapper().lambda().eq(TaskDefinition::getEnvironmentCode,code)); + + if (relatedTaskNumber > 0) { + putMsg(result, Status.DELETE_ENVIRONMENT_RELATED_TASK_EXISTS); + return result; + } + + int delete = environmentMapper.deleteByCode(code); + if (delete > 0) { + relationMapper.delete(new QueryWrapper() + .lambda() + .eq(EnvironmentWorkerGroupRelation::getEnvironmentCode,code)); + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.DELETE_ENVIRONMENT_ERROR); + } + return result; + } + + /** + * update environment + * + * @param loginUser login user + * @param code environment code + * @param name environment name + * @param config environment config + * @param desc environment desc + * @param workerGroups worker groups + */ + @Transactional(rollbackFor = RuntimeException.class) + @Override + public Map updateEnvironmentByCode(User loginUser, Long code, String name, String config, String desc, String workerGroups) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + Map checkResult = checkParams(name,config,workerGroups); + if (checkResult.get(Constants.STATUS) != Status.SUCCESS) { + return checkResult; + } + + Environment environment = environmentMapper.queryByEnvironmentName(name); + if (environment != null && !environment.getCode().equals(code)) { + putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, name); + return result; + } + + Set workerGroupSet; + if (StringUtils.isNotEmpty(workerGroups)) { + workerGroupSet = JSONUtils.parseObject(workerGroups, new TypeReference>() {}); + } else { + workerGroupSet = new TreeSet<>(); + } + + Set existWorkerGroupSet = relationMapper + .queryByEnvironmentCode(code) + .stream() + .map(item -> item.getWorkerGroup()) + .collect(Collectors.toSet()); + + Set deleteWorkerGroupSet = SetUtils.difference(existWorkerGroupSet,workerGroupSet).toSet(); + Set addWorkerGroupSet = SetUtils.difference(workerGroupSet,existWorkerGroupSet).toSet(); + + // verify whether the relation of this environment and worker groups can be adjusted + checkResult = checkUsedEnvironmentWorkerGroupRelation(deleteWorkerGroupSet, name, code); + if (checkResult.get(Constants.STATUS) != Status.SUCCESS) { + return checkResult; + } + + Environment env = new Environment(); + env.setCode(code); + env.setName(name); + env.setConfig(config); + env.setDescription(desc); + env.setOperator(loginUser.getId()); + env.setUpdateTime(new Date()); + + int update = environmentMapper.update(env, new UpdateWrapper().lambda().eq(Environment::getCode,code)); + if (update > 0) { + deleteWorkerGroupSet.stream().forEach(key -> { + if (StringUtils.isNotEmpty(key)) { + relationMapper.delete(new QueryWrapper() + .lambda() + .eq(EnvironmentWorkerGroupRelation::getEnvironmentCode,code)); + } + }); + addWorkerGroupSet.stream().forEach(key -> { + if (StringUtils.isNotEmpty(key)) { + EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); + relation.setEnvironmentCode(code); + relation.setWorkerGroup(key); + relation.setUpdateTime(new Date()); + relation.setCreateTime(new Date()); + relation.setOperator(loginUser.getId()); + relationMapper.insert(relation); + } + }); + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.UPDATE_ENVIRONMENT_ERROR, name); + } + return result; + } + + + + /** + * verify environment name + * + * @param environmentName environment name + * @return true if the environment name not exists, otherwise return false + */ + @Override + public Map verifyEnvironment(String environmentName) { + Map result = new HashMap<>(); + + if (StringUtils.isEmpty(environmentName)) { + putMsg(result, Status.ENVIRONMENT_NAME_IS_NULL); + return result; + } + + Environment environment = environmentMapper.queryByEnvironmentName(environmentName); + if (environment != null) { + putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, environmentName); + return result; + } + + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } + + private Map checkUsedEnvironmentWorkerGroupRelation(Set deleteKeySet,String environmentName, Long environmentCode) { + Map result = new HashMap<>(); + for (String workerGroup : deleteKeySet) { + TaskDefinition taskDefinition = taskDefinitionMapper + .selectOne(new QueryWrapper().lambda() + .eq(TaskDefinition::getEnvironmentCode,environmentCode) + .eq(TaskDefinition::getWorkerGroup,workerGroup)); + + if (Objects.nonNull(taskDefinition)) { + putMsg(result, Status.UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR,workerGroup,environmentName,taskDefinition.getName()); + return result; + } + } + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } + + public Map checkParams(String name, String config, String workerGroups) { + Map result = new HashMap<>(); + if (StringUtils.isEmpty(name)) { + putMsg(result, Status.ENVIRONMENT_NAME_IS_NULL); + return result; + } + if (StringUtils.isEmpty(config)) { + putMsg(result, Status.ENVIRONMENT_CONFIG_IS_NULL); + return result; + } + if (StringUtils.isNotEmpty(workerGroups)) { + List workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference>(){}); + if (Objects.isNull(workerGroupList)) { + putMsg(result, Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID); + return result; + } + } + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } + +} + diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentWorkerGroupRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentWorkerGroupRelationServiceImpl.java new file mode 100644 index 000000000..7fa7104eb --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentWorkerGroupRelationServiceImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.EnvironmentWorkerGroupRelationService; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * task definition service impl + */ +@Service +public class EnvironmentWorkerGroupRelationServiceImpl extends BaseServiceImpl implements + EnvironmentWorkerGroupRelationService { + + private static final Logger logger = LoggerFactory.getLogger(EnvironmentWorkerGroupRelationServiceImpl.class); + + @Autowired + private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper; + + /** + * query environment worker group relation + * + * @param environmentCode environment code + */ + @Override + public Map queryEnvironmentWorkerGroupRelation(Long environmentCode) { + Map result = new HashMap<>(); + List relations = environmentWorkerGroupRelationMapper.queryByEnvironmentCode(environmentCode); + result.put(Constants.DATA_LIST, relations); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query all environment worker group relation + * + * @return all relation list + */ + @Override + public Map queryAllEnvironmentWorkerGroupRelationList() { + Map result = new HashMap<>(); + + List relations = environmentWorkerGroupRelationMapper.selectList(null); + + result.put(Constants.DATA_LIST,relations); + putMsg(result,Status.SUCCESS); + return result; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index a87e7aed6..e15fb69c9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -53,6 +53,8 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -98,6 +100,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private ProcessService processService; + @Autowired + StateEventCallbackService stateEventCallbackService; + /** * execute process instance * @@ -113,6 +118,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param warningGroupId notify group id * @param processInstancePriority process instance priority * @param workerGroup worker group name + * @param environmentCode environment code * @param runMode run mode * @param timeout timeout * @param startParams the global param values which pass to new process instance @@ -125,7 +131,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, - Priority processInstancePriority, String workerGroup, Integer timeout, + Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber) { Map result = new HashMap<>(); // timeout is invalid @@ -163,7 +169,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ */ int create = this.createCommand(commandType, processDefinitionId, taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup, startParams, expectedParallelismNumber); + warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -383,6 +389,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // determine whether the process is normal if (update > 0) { + String host = processInstance.getHost(); + String address = host.split(":")[0]; + int port = Integer.parseInt(host.split(":")[1]); + StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( + processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0 + ); + stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); @@ -483,13 +496,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param runMode runMode * @param processInstancePriority processInstancePriority * @param workerGroup workerGroup + * @param environmentCode environmentCode * @return command id */ private int createCommand(CommandType commandType, int processDefineId, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, - RunMode runMode, Priority processInstancePriority, String workerGroup, + RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Map startParams, Integer expectedParallelismNumber) { /** @@ -525,6 +539,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setWarningGroupId(warningGroupId); command.setProcessInstancePriority(processInstancePriority); command.setWorkerGroup(workerGroup); + command.setEnvironmentCode(environmentCode); Date start = null; Date end = null; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 142a611af..400ef88b5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -592,7 +592,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } - processService.removeTaskLogFile(processInstanceId); + try { + processService.removeTaskLogFile(processInstanceId); + } catch (Exception e) { + logger.error("remove task log failed", e); + } + // delete database cascade int delete = processService.deleteWorkProcessInstanceById(processInstanceId); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 13175d48a..ca433cd96 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -106,6 +106,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe * @param failureStrategy failure strategy * @param processInstancePriority process instance priority * @param workerGroup worker group + * @param environmentCode environment code * @return create result code */ @Override @@ -117,7 +118,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe int warningGroupId, FailureStrategy failureStrategy, Priority processInstancePriority, - String workerGroup) { + String workerGroup, + Long environmentCode) { Map result = new HashMap<>(); @@ -169,6 +171,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe scheduleObj.setReleaseState(ReleaseState.OFFLINE); scheduleObj.setProcessInstancePriority(processInstancePriority); scheduleObj.setWorkerGroup(workerGroup); + scheduleObj.setEnvironmentCode(environmentCode); scheduleMapper.insert(scheduleObj); /** @@ -196,6 +199,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe * @param warningGroupId warning group id * @param failureStrategy failure strategy * @param workerGroup worker group + * @param environmentCode environment code * @param processInstancePriority process instance priority * @param scheduleStatus schedule status * @return update result code @@ -211,7 +215,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe FailureStrategy failureStrategy, ReleaseState scheduleStatus, Priority processInstancePriority, - String workerGroup) { + String workerGroup, + Long environmentCode) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -277,6 +282,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe schedule.setReleaseState(scheduleStatus); } schedule.setWorkerGroup(workerGroup); + schedule.setEnvironmentCode(environmentCode); schedule.setUpdateTime(now); schedule.setProcessInstancePriority(processInstancePriority); scheduleMapper.updateById(schedule); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index 1b223c440..ad2f574cb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.dolphinscheduler.api.utils; import org.apache.dolphinscheduler.api.enums.Status; @@ -31,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; + /** * check utils */ @@ -53,8 +53,7 @@ public class CheckUtils { /** * check email * - * @param email - * email + * @param email email * @return true if email regex valid, otherwise return false */ public static boolean checkEmail(String email) { @@ -68,8 +67,7 @@ public class CheckUtils { /** * check project description * - * @param desc - * desc + * @param desc desc * @return true if description regex valid, otherwise return false */ public static Map checkDesc(String desc) { @@ -77,7 +75,7 @@ public class CheckUtils { if (StringUtils.isNotEmpty(desc) && desc.length() > 200) { result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); result.put(Constants.MSG, - MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length")); + MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length")); } else { result.put(Constants.STATUS, Status.SUCCESS); } @@ -87,8 +85,7 @@ public class CheckUtils { /** * check extra info * - * @param otherParams - * other parames + * @param otherParams other parames * @return true if other parameters are valid, otherwise return false */ public static boolean checkOtherParams(String otherParams) { @@ -98,8 +95,7 @@ public class CheckUtils { /** * check password * - * @param password - * password + * @param password password * @return true if password regex valid, otherwise return false */ public static boolean checkPassword(String password) { @@ -109,8 +105,7 @@ public class CheckUtils { /** * check phone phone can be empty. * - * @param phone - * phone + * @param phone phone * @return true if phone regex valid, otherwise return false */ public static boolean checkPhone(String phone) { @@ -120,8 +115,7 @@ public class CheckUtils { /** * check task node parameter * - * @param taskNode - * TaskNode + * @param taskNode TaskNode * @return true if task node parameters are valid, otherwise return false */ public static boolean checkTaskNodeParameters(TaskNode taskNode) { @@ -132,6 +126,8 @@ public class CheckUtils { } if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) { abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence()); + } else if (TaskType.SWITCH.getDesc().equalsIgnoreCase(taskType)) { + abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getSwitchResult()); } else { abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams()); } @@ -146,25 +142,21 @@ public class CheckUtils { /** * check params * - * @param userName - * user name - * @param password - * password - * @param email - * email - * @param phone - * phone + * @param userName user name + * @param password password + * @param email email + * @param phone phone * @return true if user parameters are valid, other return false */ public static boolean checkUserParams(String userName, String password, String email, String phone) { return CheckUtils.checkUserName(userName) && CheckUtils.checkEmail(email) && CheckUtils.checkPassword(password) - && CheckUtils.checkPhone(phone); + && CheckUtils.checkPhone(phone); } /** * regex check * - * @param str input string + * @param str input string * @param pattern regex pattern * @return true if regex pattern is right, otherwise return false */ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java new file mode 100644 index 000000000..7ba51ae78 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.Preconditions; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import com.fasterxml.jackson.core.type.TypeReference; + +/** + * environment controller test + */ +public class EnvironmentControllerTest extends AbstractControllerTest { + + private static Logger logger = LoggerFactory.getLogger(EnvironmentControllerTest.class); + + private String environmentCode; + + public static final String environmentName = "Env1"; + + public static final String config = "this is config content"; + + public static final String desc = "this is environment description"; + + @Before + public void before() throws Exception { + testCreateEnvironment(); + } + + @After + public void after() throws Exception { + testDeleteEnvironment(); + } + + public void testCreateEnvironment() throws Exception { + + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("name",environmentName); + paramsMap.add("config",config); + paramsMap.add("description",desc); + + MvcResult mvcResult = mockMvc.perform(post("/environment/create") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), new TypeReference>() {}); + logger.info(result.toString()); + Assert.assertTrue(result != null && result.isSuccess()); + Assert.assertNotNull(result.getData()); + logger.info("create environment return result:{}", mvcResult.getResponse().getContentAsString()); + + environmentCode = (String)result.getData(); + } + + @Test + public void testUpdateEnvironment() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("code", environmentCode); + paramsMap.add("name","environment_test_update"); + paramsMap.add("config","this is config content"); + paramsMap.add("desc","the test environment update"); + + MvcResult mvcResult = mockMvc.perform(post("/environment/update") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info(result.toString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("update environment return result:{}", mvcResult.getResponse().getContentAsString()); + + } + + @Test + public void testQueryEnvironmentByCode() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("environmentCode", environmentCode); + + MvcResult mvcResult = mockMvc.perform(get("/environment/query-by-code") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info(result.toString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info(mvcResult.getResponse().getContentAsString()); + logger.info("query environment by id :{}, return result:{}", environmentCode, mvcResult.getResponse().getContentAsString()); + + } + + @Test + public void testQueryEnvironmentListPaging() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("searchVal","test"); + paramsMap.add("pageSize","2"); + paramsMap.add("pageNo","2"); + + MvcResult mvcResult = mockMvc.perform(get("/environment/list-paging") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info(result.toString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("query list-paging environment return result:{}", mvcResult.getResponse().getContentAsString()); + } + + @Test + public void testQueryAllEnvironmentList() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + + MvcResult mvcResult = mockMvc.perform(get("/environment/query-environment-list") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info(result.toString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("query all environment return result:{}", mvcResult.getResponse().getContentAsString()); + + } + + @Test + public void testVerifyEnvironment() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("environmentName",environmentName); + + MvcResult mvcResult = mockMvc.perform(post("/environment/verify-environment") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info(result.toString()); + Assert.assertTrue(result.isStatus(Status.ENVIRONMENT_NAME_EXISTS)); + logger.info("verify environment return result:{}", mvcResult.getResponse().getContentAsString()); + + } + + private void testDeleteEnvironment() throws Exception { + Preconditions.checkNotNull(environmentCode); + + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("environmentCode", environmentCode); + + MvcResult mvcResult = mockMvc.perform(post("/environment/delete") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info(result.toString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("delete environment return result:{}", mvcResult.getResponse().getContentAsString()); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentServiceTest.java new file mode 100644 index 000000000..b9b95ecae --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentServiceTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.EnvironmentServiceImpl; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.dao.entity.Environment; +import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.assertj.core.util.Lists; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * environment service test + */ +@RunWith(MockitoJUnitRunner.class) +public class EnvironmentServiceTest { + + public static final Logger logger = LoggerFactory.getLogger(EnvironmentServiceTest.class); + + @InjectMocks + private EnvironmentServiceImpl environmentService; + + @Mock + private EnvironmentMapper environmentMapper; + + @Mock + private EnvironmentWorkerGroupRelationMapper relationMapper; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + + public static final String testUserName = "environmentServerTest"; + + public static final String environmentName = "Env1"; + + public static final String workerGroups = "[\"default\"]"; + + @Before + public void setUp(){ + } + + @After + public void after(){ + } + + @Test + public void testCreateEnvironment() { + User loginUser = getGeneralUser(); + Map result = environmentService.createEnvironment(loginUser,environmentName,getConfig(),getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS)); + + loginUser = getAdminUser(); + result = environmentService.createEnvironment(loginUser,environmentName,"",getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_CONFIG_IS_NULL, result.get(Constants.STATUS)); + + result = environmentService.createEnvironment(loginUser,"",getConfig(),getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_NAME_IS_NULL, result.get(Constants.STATUS)); + + result = environmentService.createEnvironment(loginUser,environmentName,getConfig(),getDesc(),"test"); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID, result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.queryByEnvironmentName(environmentName)).thenReturn(getEnvironment()); + result = environmentService.createEnvironment(loginUser,environmentName,getConfig(),getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_NAME_EXISTS, result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.insert(Mockito.any(Environment.class))).thenReturn(1); + Mockito.when(relationMapper.insert(Mockito.any(EnvironmentWorkerGroupRelation.class))).thenReturn(1); + result = environmentService.createEnvironment(loginUser,"testName","test","test",workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + + } + + @Test + public void testCheckParams() { + Map result = environmentService.checkParams(environmentName,getConfig(),"test"); + Assert.assertEquals(Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID, result.get(Constants.STATUS)); + } + + @Test + public void testUpdateEnvironmentByCode() { + User loginUser = getGeneralUser(); + Map result = environmentService.updateEnvironmentByCode(loginUser,1L,environmentName,getConfig(),getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS)); + + loginUser = getAdminUser(); + result = environmentService.updateEnvironmentByCode(loginUser,1L,environmentName,"",getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_CONFIG_IS_NULL, result.get(Constants.STATUS)); + + result = environmentService.updateEnvironmentByCode(loginUser,1L,"",getConfig(),getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_NAME_IS_NULL, result.get(Constants.STATUS)); + + result = environmentService.updateEnvironmentByCode(loginUser,1L,environmentName,getConfig(),getDesc(),"test"); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID, result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.queryByEnvironmentName(environmentName)).thenReturn(getEnvironment()); + result = environmentService.updateEnvironmentByCode(loginUser,2L,environmentName,getConfig(),getDesc(),workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_NAME_EXISTS, result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.update(Mockito.any(Environment.class),Mockito.any(Wrapper.class))).thenReturn(1); + result = environmentService.updateEnvironmentByCode(loginUser,1L,"testName","test","test",workerGroups); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + + } + + @Test + public void testQueryAllEnvironmentList() { + Mockito.when(environmentMapper.queryAllEnvironmentList()).thenReturn(Lists.newArrayList(getEnvironment())); + Map result = environmentService.queryAllEnvironmentList(); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + + List list = (List)(result.get(Constants.DATA_LIST)); + Assert.assertEquals(1,list.size()); + } + + @Test + public void testQueryEnvironmentListPaging() { + IPage page = new Page<>(1, 10); + page.setRecords(getList()); + page.setTotal(1L); + Mockito.when(environmentMapper.queryEnvironmentListPaging(Mockito.any(Page.class), Mockito.eq(environmentName))).thenReturn(page); + + Result result = environmentService.queryEnvironmentListPaging(1, 10, environmentName); + logger.info(result.toString()); + PageInfo pageInfo = (PageInfo) result.getData(); + Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList())); + } + + @Test + public void testQueryEnvironmentByName() { + Mockito.when(environmentMapper.queryByEnvironmentName(environmentName)).thenReturn(null); + Map result = environmentService.queryEnvironmentByName(environmentName); + logger.info(result.toString()); + Assert.assertEquals(Status.QUERY_ENVIRONMENT_BY_NAME_ERROR,result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.queryByEnvironmentName(environmentName)).thenReturn(getEnvironment()); + result = environmentService.queryEnvironmentByName(environmentName); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + } + + @Test + public void testQueryEnvironmentByCode() { + Mockito.when(environmentMapper.queryByEnvironmentCode(1L)).thenReturn(null); + Map result = environmentService.queryEnvironmentByCode(1L); + logger.info(result.toString()); + Assert.assertEquals(Status.QUERY_ENVIRONMENT_BY_CODE_ERROR,result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.queryByEnvironmentCode(1L)).thenReturn(getEnvironment()); + result = environmentService.queryEnvironmentByCode(1L); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + } + + @Test + public void testDeleteEnvironmentByCode() { + User loginUser = getGeneralUser(); + Map result = environmentService.deleteEnvironmentByCode(loginUser,1L); + logger.info(result.toString()); + Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS)); + + loginUser = getAdminUser(); + Mockito.when(taskDefinitionMapper.selectCount(Mockito.any(LambdaQueryWrapper.class))).thenReturn(1); + result = environmentService.deleteEnvironmentByCode(loginUser,1L); + logger.info(result.toString()); + Assert.assertEquals(Status.DELETE_ENVIRONMENT_RELATED_TASK_EXISTS, result.get(Constants.STATUS)); + + Mockito.when(taskDefinitionMapper.selectCount(Mockito.any(LambdaQueryWrapper.class))).thenReturn(0); + Mockito.when(environmentMapper.deleteByCode(1L)).thenReturn(1); + result = environmentService.deleteEnvironmentByCode(loginUser,1L); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } + + @Test + public void testVerifyEnvironment() { + Map result = environmentService.verifyEnvironment(""); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_NAME_IS_NULL, result.get(Constants.STATUS)); + + Mockito.when(environmentMapper.queryByEnvironmentName(environmentName)).thenReturn(getEnvironment()); + result = environmentService.verifyEnvironment(environmentName); + logger.info(result.toString()); + Assert.assertEquals(Status.ENVIRONMENT_NAME_EXISTS, result.get(Constants.STATUS)); + } + + private Environment getEnvironment() { + Environment environment = new Environment(); + environment.setId(1); + environment.setCode(1L); + environment.setName(environmentName); + environment.setConfig(getConfig()); + environment.setDescription(getDesc()); + environment.setOperator(1); + return environment; + } + + /** + * create an environment description + */ + private String getDesc() { + return "create an environment to test "; + } + + /** + * create an environment config + */ + private String getConfig() { + return "export HADOOP_HOME=/opt/hadoop-2.6.5\n" + + "export HADOOP_CONF_DIR=/etc/hadoop/conf\n" + + "export SPARK_HOME1=/opt/soft/spark1\n" + + "export SPARK_HOME2=/opt/soft/spark2\n" + + "export PYTHON_HOME=/opt/soft/python\n" + + "export JAVA_HOME=/opt/java/jdk1.8.0_181-amd64\n" + + "export HIVE_HOME=/opt/soft/hive\n" + + "export FLINK_HOME=/opt/soft/flink\n" + + "export DATAX_HOME=/opt/soft/datax\n" + + "export YARN_CONF_DIR=\"/etc/hadoop/conf\"\n" + + "\n" + + "export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH\n" + + "\n" + + "export HADOOP_CLASSPATH=`hadoop classpath`\n" + + "\n" + + "#echo \"HADOOP_CLASSPATH=\"$HADOOP_CLASSPATH"; + } + + /** + * create general user + */ + private User getGeneralUser() { + User loginUser = new User(); + loginUser.setUserType(UserType.GENERAL_USER); + loginUser.setUserName(testUserName); + loginUser.setId(1); + return loginUser; + } + + /** + * create admin user + */ + private User getAdminUser() { + User loginUser = new User(); + loginUser.setUserType(UserType.ADMIN_USER); + loginUser.setUserName(testUserName); + loginUser.setId(1); + return loginUser; + } + + private List getList() { + List list = new ArrayList<>(); + list.add(getEnvironment()); + return list; + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentWorkerGroupRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentWorkerGroupRelationServiceTest.java new file mode 100644 index 000000000..5a3026fd1 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/EnvironmentWorkerGroupRelationServiceTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.EnvironmentWorkerGroupRelationServiceImpl; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; +import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; + +import java.util.Map; + +import org.assertj.core.util.Lists; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * environment service test + */ +@RunWith(MockitoJUnitRunner.class) +public class EnvironmentWorkerGroupRelationServiceTest { + + public static final Logger logger = LoggerFactory.getLogger(EnvironmentWorkerGroupRelationServiceTest.class); + + @InjectMocks + private EnvironmentWorkerGroupRelationServiceImpl relationService; + + @Mock + private EnvironmentWorkerGroupRelationMapper relationMapper; + + @Test + public void testQueryEnvironmentWorkerGroupRelation() { + Mockito.when(relationMapper.queryByEnvironmentCode(1L)).thenReturn(Lists.newArrayList(new EnvironmentWorkerGroupRelation())); + Map result = relationService.queryEnvironmentWorkerGroupRelation(1L); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + } + + @Test + public void testQueryAllEnvironmentWorkerGroupRelationList() { + Mockito.when(relationMapper.selectList(Mockito.any())).thenReturn(Lists.newArrayList(new EnvironmentWorkerGroupRelation())); + Map result = relationService.queryAllEnvironmentWorkerGroupRelationList(); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index b7d9fe182..6f7aeb244 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -153,7 +153,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, 4); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -171,13 +171,12 @@ public class ExecutorService2Test { null, "n1,n2", null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); } - /** * date error */ @@ -190,7 +189,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, null); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); } @@ -207,7 +206,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -225,7 +224,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); @@ -243,7 +242,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, 4); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(4)).createCommand(any(Command.class)); @@ -258,7 +257,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,-1L, 110, null, 4); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); } diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index fe1ed3aac..f4007fd3e 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -58,6 +58,13 @@ com.google.guava guava + provided + + + jsr305 + com.google.code.findbugs + + @@ -636,5 +643,10 @@ + + io.netty + netty-all + compile + diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index e2b8a0c0e..58c0608e7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -435,6 +435,8 @@ public final class Constants { */ public static final String DATASOURCE_PROPERTIES = "/datasource.properties"; + public static final String COMMON_TASK_TYPE = "common"; + public static final String DEFAULT = "Default"; public static final String USER = "user"; public static final String PASSWORD = "password"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java new file mode 100644 index 000000000..f24b3c154 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import io.netty.channel.Channel; + +/** + * state event + */ +public class StateEvent { + + /** + * origin_pid-origin_task_id-process_instance_id-task_instance_id + */ + private String key; + + private StateEventType type; + + private ExecutionStatus executionStatus; + + private int taskInstanceId; + + private int processInstanceId; + + private String context; + + private Channel channel; + + public ExecutionStatus getExecutionStatus() { + return executionStatus; + } + + public void setExecutionStatus(ExecutionStatus executionStatus) { + this.executionStatus = executionStatus; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public String getContext() { + return context; + } + + public void setContext(String context) { + this.context = context; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + + @Override + public String toString() { + return "State Event :" + + "key: " + key + + " type: " + type.toString() + + " executeStatus: " + executionStatus + + " task instance id: " + taskInstanceId + + " process instance id: " + processInstanceId + + " context: " + context + ; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public void setType(StateEventType type) { + this.type = type; + } + + public StateEventType getType() { + return this.type; + } +} diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java similarity index 53% rename from dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index e78b4ebec..bf93fbed8 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -15,33 +15,31 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.alert.utils; +package org.apache.dolphinscheduler.common.enums; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import com.baomidou.mybatisplus.annotation.EnumValue; -public class FuncUtils { +public enum StateEventType { - private FuncUtils() { - throw new IllegalStateException(FuncUtils.class.getName()); + PROCESS_STATE_CHANGE(0, "process statechange"), + TASK_STATE_CHANGE(1, "task state change"), + PROCESS_TIMEOUT(2, "process timeout"), + TASK_TIMEOUT(3, "task timeout"); + + StateEventType(int code, String descp) { + this.code = code; + this.descp = descp; } - public static String mkString(Iterable list, String split) { + @EnumValue + private final int code; + private final String descp; - if (null == list || StringUtils.isEmpty(split)) { - return null; - } - - StringBuilder sb = new StringBuilder(); - boolean first = true; - for (String item : list) { - if (first) { - first = false; - } else { - sb.append(split); - } - sb.append(item); - } - return sb.toString(); + public int getCode() { + return code; } + public String getDescp() { + return descp; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index 2e9262dd6..fe8258c7d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -143,6 +143,11 @@ public class TaskNode { */ private String workerGroup; + /** + * environment code + */ + private Long environmentCode; + /** * task time out */ @@ -262,6 +267,7 @@ public class TaskNode { && Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(dependence, taskNode.dependence) && Objects.equals(workerGroup, taskNode.workerGroup) + && Objects.equals(environmentCode, taskNode.environmentCode) && Objects.equals(conditionResult, taskNode.conditionResult) && CollectionUtils.equalLists(depList, taskNode.depList); } @@ -422,11 +428,20 @@ public class TaskNode { + ", conditionResult='" + conditionResult + '\'' + ", taskInstancePriority=" + taskInstancePriority + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode=" + environmentCode + ", timeout='" + timeout + '\'' + ", delayTime=" + delayTime + '}'; } + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + + public Long getEnvironmentCode() { + return this.environmentCode; + } + public String getSwitchResult() { return switchResult; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java index 686642dbd..80073d9c0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -152,7 +153,7 @@ public abstract class AbstractParameters implements IParameters { ArrayNode paramsByJson = JSONUtils.parseArray(json); Iterator listIterator = paramsByJson.iterator(); while (listIterator.hasNext()) { - Map param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class); + Map param = JSONUtils.parseObject(listIterator.next().toString(), new TypeReference>() {}); allParams.add(param); } return allParams; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java index 59259a53e..bcdf4aab7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java @@ -251,9 +251,9 @@ public class SqlParameters extends AbstractParameters { sqlResultFormat.put(key, new ArrayList<>()); } for (Map info : sqlResult) { - for (String key : info.keySet()) { - sqlResultFormat.get(key).add(String.valueOf(info.get(key))); - } + info.forEach((key, value) -> { + sqlResultFormat.get(key).add(value); + }); } for (Property info : outProperty) { if (info.getType() == DataType.LIST) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index cba015182..95b87be84 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -14,15 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; + +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import org.apache.dolphinscheduler.common.enums.*; - -import java.util.Date; /** * command @@ -33,7 +39,7 @@ public class Command { /** * id */ - @TableId(value="id", type=IdType.AUTO) + @TableId(value = "id", type = IdType.AUTO) private int id; /** @@ -114,6 +120,12 @@ public class Command { @TableField("worker_group") private String workerGroup; + /** + * environment code + */ + @TableField("environment_code") + private Long environmentCode; + public Command() { this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -132,6 +144,7 @@ public class Command { int warningGroupId, Date scheduleTime, String workerGroup, + Long environmentCode, Priority processInstancePriority) { this.commandType = commandType; this.executorId = executorId; @@ -145,10 +158,10 @@ public class Command { this.startTime = new Date(); this.updateTime = new Date(); this.workerGroup = workerGroup; + this.environmentCode = environmentCode; this.processInstancePriority = processInstancePriority; } - public TaskDependType getTaskDependType() { return taskDependType; } @@ -181,7 +194,6 @@ public class Command { this.processDefinitionId = processDefinitionId; } - public FailureStrategy getFailureStrategy() { return failureStrategy; } @@ -262,6 +274,14 @@ public class Command { this.workerGroup = workerGroup; } + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -285,6 +305,11 @@ public class Command { if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) { return false; } + + if (environmentCode != null ? environmentCode.equals(command.environmentCode) : command.environmentCode == null) { + return false; + } + if (commandType != command.commandType) { return false; } @@ -332,26 +357,29 @@ public class Command { result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0); result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0); result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); + result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); return result; } + @Override public String toString() { - return "Command{" + - "id=" + id + - ", commandType=" + commandType + - ", processDefinitionId=" + processDefinitionId + - ", executorId=" + executorId + - ", commandParam='" + commandParam + '\'' + - ", taskDependType=" + taskDependType + - ", failureStrategy=" + failureStrategy + - ", warningType=" + warningType + - ", warningGroupId=" + warningGroupId + - ", scheduleTime=" + scheduleTime + - ", startTime=" + startTime + - ", processInstancePriority=" + processInstancePriority + - ", updateTime=" + updateTime + - ", workerGroup='" + workerGroup + '\'' + - '}'; + return "Command{" + + "id=" + id + + ", commandType=" + commandType + + ", processDefinitionId=" + processDefinitionId + + ", executorId=" + executorId + + ", commandParam='" + commandParam + '\'' + + ", taskDependType=" + taskDependType + + ", failureStrategy=" + failureStrategy + + ", warningType=" + warningType + + ", warningGroupId=" + warningGroupId + + ", scheduleTime=" + scheduleTime + + ", startTime=" + startTime + + ", processInstancePriority=" + processInstancePriority + + ", updateTime=" + updateTime + + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode='" + environmentCode + '\'' + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Environment.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Environment.java new file mode 100644 index 000000000..ad0f7148a --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Environment.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * Environment + */ +@TableName("t_ds_environment") +public class Environment { + + @TableId(value = "id", type = IdType.AUTO) + private int id; + + /** + * environment code + */ + private Long code; + + /** + * environment name + */ + private String name; + + /** + * config content + */ + private String config; + + private String description; + + /** + * operator user id + */ + private Integer operator; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Long getCode() { + return this.code; + } + + public void setCode(Long code) { + this.code = code; + } + + public String getConfig() { + return this.config; + } + + public void setConfig(String config) { + this.config = config; + } + + public String getDescription() { + return this.description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Integer getOperator() { + return this.operator; + } + + public void setOperator(Integer operator) { + this.operator = operator; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + @Override + public String toString() { + return "Environment{" + + "id= " + id + + ", code= " + code + + ", name= " + name + + ", config= " + config + + ", description= " + description + + ", operator= " + operator + + ", createTime= " + createTime + + ", updateTime= " + updateTime + + "}"; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java new file mode 100644 index 000000000..d1ac97203 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * EnvironmentWorkerGroupRelation + */ +@TableName("t_ds_environment_worker_group_relation") +public class EnvironmentWorkerGroupRelation { + + @TableId(value = "id", type = IdType.AUTO) + private int id; + + /** + * environment code + */ + private Long environmentCode; + + /** + * worker group id + */ + private String workerGroup; + + /** + * operator user id + */ + private Integer operator; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + + public Integer getOperator() { + return this.operator; + } + + public void setOperator(Integer operator) { + this.operator = operator; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + @Override + public String toString() { + return "EnvironmentWorkerGroupRelation{" + + "id= " + id + + ", environmentCode= " + environmentCode + + ", workerGroup= " + workerGroup + + ", operator= " + operator + + ", createTime= " + createTime + + ", updateTime= " + updateTime + + "}"; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java index 760bb23d9..6444ee566 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java @@ -14,15 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; + +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.common.enums.*; - -import java.util.Date; /** * command @@ -33,7 +39,7 @@ public class ErrorCommand { /** * id */ - @TableId(value="id", type = IdType.INPUT) + @TableId(value = "id", type = IdType.INPUT) private int id; /** @@ -79,13 +85,13 @@ public class ErrorCommand { /** * schedule time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date scheduleTime; /** * start time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date startTime; /** @@ -96,7 +102,7 @@ public class ErrorCommand { /** * update time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date updateTime; /** @@ -109,9 +115,14 @@ public class ErrorCommand { */ private String workerGroup; + /** + * environment code + */ + private Long environmentCode; + public ErrorCommand(){} - public ErrorCommand(Command command, String message){ + public ErrorCommand(Command command, String message) { this.id = command.getId(); this.commandType = command.getCommandType(); this.executorId = command.getExecutorId(); @@ -124,6 +135,7 @@ public class ErrorCommand { this.failureStrategy = command.getFailureStrategy(); this.startTime = command.getStartTime(); this.updateTime = command.getUpdateTime(); + this.environmentCode = command.getEnvironmentCode(); this.processInstancePriority = command.getProcessInstancePriority(); this.message = message; } @@ -139,7 +151,7 @@ public class ErrorCommand { int warningGroupId, Date scheduleTime, Priority processInstancePriority, - String message){ + String message) { this.commandType = commandType; this.executorId = executorId; this.processDefinitionId = processDefinitionId; @@ -155,7 +167,6 @@ public class ErrorCommand { this.message = message; } - public TaskDependType getTaskDependType() { return taskDependType; } @@ -188,7 +199,6 @@ public class ErrorCommand { this.processDefinitionId = processDefinitionId; } - public FailureStrategy getFailureStrategy() { return failureStrategy; } @@ -277,24 +287,33 @@ public class ErrorCommand { this.message = message; } + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + @Override public String toString() { - return "ErrorCommand{" + - "id=" + id + - ", commandType=" + commandType + - ", processDefinitionId=" + processDefinitionId + - ", executorId=" + executorId + - ", commandParam='" + commandParam + '\'' + - ", taskDependType=" + taskDependType + - ", failureStrategy=" + failureStrategy + - ", warningType=" + warningType + - ", warningGroupId=" + warningGroupId + - ", scheduleTime=" + scheduleTime + - ", startTime=" + startTime + - ", processInstancePriority=" + processInstancePriority + - ", updateTime=" + updateTime + - ", message='" + message + '\'' + - ", workerGroup='" + workerGroup + '\'' + - '}'; + return "ErrorCommand{" + + "id=" + id + + ", commandType=" + commandType + + ", processDefinitionId=" + processDefinitionId + + ", executorId=" + executorId + + ", commandParam='" + commandParam + '\'' + + ", taskDependType=" + taskDependType + + ", failureStrategy=" + failureStrategy + + ", warningType=" + warningType + + ", warningGroupId=" + warningGroupId + + ", scheduleTime=" + scheduleTime + + ", startTime=" + startTime + + ", processInstancePriority=" + processInstancePriority + + ", updateTime=" + updateTime + + ", message='" + message + '\'' + + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode='" + environmentCode + '\'' + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index b24af661f..cb1eab69c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -226,6 +226,11 @@ public class ProcessInstance { */ private String workerGroup; + /** + * environment code + */ + private Long environmentCode; + /** * process timeout for warning */ @@ -505,6 +510,14 @@ public class ProcessInstance { this.executorName = executorName; } + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + /** * add command to history * @@ -666,6 +679,8 @@ public class ProcessInstance { + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode=" + + environmentCode + ", timeout=" + timeout + ", tenantId=" diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java index 74ed5c1ee..39b5bcda0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java @@ -139,6 +139,11 @@ public class Schedule { */ private String workerGroup; + /** + * environment code + */ + private Long environmentCode; + public int getWarningGroupId() { return warningGroupId; } @@ -286,6 +291,14 @@ public class Schedule { this.workerGroup = workerGroup; } + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + @Override public String toString() { return "Schedule{" @@ -308,6 +321,7 @@ public class Schedule { + ", warningGroupId=" + warningGroupId + ", processInstancePriority=" + processInstancePriority + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode='" + environmentCode + '\'' + '}'; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 08ca28d89..8f1d75284 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -128,6 +128,11 @@ public class TaskDefinition { */ private String workerGroup; + /** + * environment code + */ + private Long environmentCode; + /** * fail retry times */ @@ -395,6 +400,14 @@ public class TaskDefinition { this.delayTime = delayTime; } + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + @Override public String toString() { return "TaskDefinition{" @@ -414,6 +427,7 @@ public class TaskDefinition { + ", userName='" + userName + '\'' + ", projectName='" + projectName + '\'' + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode='" + environmentCode + '\'' + ", failRetryTimes=" + failRetryTimes + ", failRetryInterval=" + failRetryInterval + ", timeoutFlag=" + timeoutFlag diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index 96851cc7b..41713fc64 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -53,6 +53,7 @@ public class TaskDefinitionLog extends TaskDefinition { this.setUserId(taskDefinition.getUserId()); this.setUserName(taskDefinition.getUserName()); this.setWorkerGroup(taskDefinition.getWorkerGroup()); + this.setEnvironmentCode(taskDefinition.getEnvironmentCode()); this.setProjectCode(taskDefinition.getProjectCode()); this.setProjectName(taskDefinition.getProjectName()); this.setResourceIds(taskDefinition.getResourceIds()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 2be4ad659..47c6082f5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -220,6 +220,15 @@ public class TaskInstance implements Serializable { */ private String workerGroup; + /** + * environment code + */ + private Long environmentCode; + + /** + * environment config + */ + private String environmentConfig; /** * executor id @@ -421,6 +430,22 @@ public class TaskInstance implements Serializable { this.appLink = appLink; } + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + + public String getEnvironmentConfig() { + return this.environmentConfig; + } + + public void setEnvironmentConfig(String environmentConfig) { + this.environmentConfig = environmentConfig; + } + public DependentParameters getDependency() { if (this.dependency == null) { Map taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class); @@ -623,6 +648,8 @@ public class TaskInstance implements Serializable { + ", processInstancePriority=" + processInstancePriority + ", dependentResult='" + dependentResult + '\'' + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode=" + environmentCode + + ", environmentConfig='" + environmentConfig + '\'' + ", executorId=" + executorId + ", executorName='" + executorName + '\'' + ", delayTime=" + delayTime diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 2d20a5b79..c784a23b7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.dao.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; + import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.CommandCount; import org.apache.ibatis.annotations.Param; @@ -50,6 +52,10 @@ public interface CommandMapper extends BaseMapper { @Param("endTime") Date endTime, @Param("projectCodeArray") Long[] projectCodeArray); - + /** + * query command page + * @return + */ + IPage queryCommandPage(IPage page); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapper.java new file mode 100644 index 000000000..5bde2a344 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.Environment; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; + +/** + * environment mapper interface + */ +public interface EnvironmentMapper extends BaseMapper { + + /** + * query environment by name + * + * @param name name + * @return environment + */ + Environment queryByEnvironmentName(@Param("environmentName") String name); + + /** + * query environment by code + * + * @param environmentCode environmentCode + * @return environment + */ + Environment queryByEnvironmentCode(@Param("environmentCode") Long environmentCode); + + /** + * query all environment list + * @return environment list + */ + List queryAllEnvironmentList(); + + /** + * environment page + * @param page page + * @param searchName searchName + * @return environment IPage + */ + IPage queryEnvironmentListPaging(IPage page, @Param("searchName") String searchName); + + /** + * delete environment by code + * + * @param code code + * @return int + */ + int deleteByCode(@Param("code") Long code); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.java new file mode 100644 index 000000000..44375368f --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * environment worker group relation mapper interface + */ +public interface EnvironmentWorkerGroupRelationMapper extends BaseMapper { + + /** + * environment worker group relation by environmentCode + * + * @param environmentCode environmentCode + * @return EnvironmentWorkerGroupRelation list + */ + List queryByEnvironmentCode(@Param("environmentCode") Long environmentCode); + + /** + * environment worker group relation by workerGroupName + * + * @param workerGroupName workerGroupName + * @return EnvironmentWorkerGroupRelation list + */ + List queryByWorkerGroupName(@Param("workerGroupName") String workerGroupName); + + /** + * delete environment worker group relation by processCode + * + * @param environmentCode environmentCode + * @param workerGroupName workerGroupName + * @return int + */ + int deleteByCode(@Param("environmentCode") Long environmentCode, @Param("workerGroupName") String workerGroupName); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java index 1c0f00256..14eceffa7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/shell/CreateDolphinScheduler.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.upgrade.shell; import org.apache.dolphinscheduler.dao.upgrade.DolphinSchedulerManager; diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index c0728f2e4..b3572ecd4 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -21,7 +21,7 @@ + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapper.xml new file mode 100644 index 000000000..6747f40e1 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentMapper.xml @@ -0,0 +1,55 @@ + + + + + + + id, code, name, config, description, operator, create_time, update_time + + + + + + + delete from t_ds_environment where code = #{code} + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml new file mode 100644 index 000000000..7ea959d60 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml @@ -0,0 +1,40 @@ + + + + + + + id, environment_code, worker_group, operator, create_time, update_time + + + + + delete from t_ds_environment_worker_group_relation + WHERE environment_code = #{environmentCode} and worker_group = #{workerGroupName} + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/PluginDefineMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/PluginDefineMapper.xml index 0a105edcb..329d2f14a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/PluginDefineMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/PluginDefineMapper.xml @@ -46,4 +46,4 @@ where id = #{id} - \ No newline at end of file + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index db5630199..f1b074db6 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -23,7 +23,7 @@ command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, - process_instance_priority, worker_group, timeout, tenant_id, var_pool + process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool select p_f.name as process_definition_name, p.name as project_name,u.user_name, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index 36ff8b8ef..81de0a7bc 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -20,12 +20,12 @@ id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, - worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, + worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time @@ -63,7 +63,7 @@