Compare commits
26 Commits
dependabot
...
dev
| Author | SHA1 | Date |
|---|---|---|
|
|
de50f43de6 | |
|
|
7bcec7115a | |
|
|
e6dade71bb | |
|
|
9abcbbac2e | |
|
|
0176f4bf61 | |
|
|
1f9660b80d | |
|
|
15eb1618b4 | |
|
|
cc40816f87 | |
|
|
ca98a4a144 | |
|
|
8a8b63cd96 | |
|
|
cd82f45d5e | |
|
|
3faa65ef0c | |
|
|
5657cb9aec | |
|
|
ebc4253d50 | |
|
|
257380467e | |
|
|
7382284b7d | |
|
|
48d526f275 | |
|
|
0643fe44a4 | |
|
|
b276c372d4 | |
|
|
3e851940e8 | |
|
|
86bdb826dc | |
|
|
a51b710b1c | |
|
|
99678c097c | |
|
|
29a0ea32c6 | |
|
|
4799b27e33 | |
|
|
fb11525e49 |
|
|
@ -397,21 +397,41 @@ apiServers="ds1"
|
|||
|
||||
### dolphinscheduler_env.sh [load environment variables configs]
|
||||
|
||||
When using shell to commit tasks, DS will load environment variables inside dolphinscheduler_env.sh into the host.
|
||||
Types of tasks involved are: Shell, Python, Spark, Flink, DataX, etc.
|
||||
When using shell to commit tasks, DolphinScheduler will export environment variables from `bin/env/dolphinscheduler_env.sh`. The
|
||||
mainly configuration including `JAVA_HOME`, mata database, registry center, and task configuration.
|
||||
|
||||
```bash
|
||||
export HADOOP_HOME=/opt/soft/hadoop
|
||||
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
|
||||
export SPARK_HOME1=/opt/soft/spark1
|
||||
export SPARK_HOME2=/opt/soft/spark2
|
||||
export PYTHON_HOME=/opt/soft/python
|
||||
export JAVA_HOME=/opt/soft/java
|
||||
export HIVE_HOME=/opt/soft/hive
|
||||
export FLINK_HOME=/opt/soft/flink
|
||||
export DATAX_HOME=/opt/soft/datax/bin/datax.py
|
||||
# JAVA_HOME, will use it to start DolphinScheduler server
|
||||
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
|
||||
|
||||
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
|
||||
# Database related configuration, set database type, username and password
|
||||
export DATABASE=${DATABASE:-postgresql}
|
||||
export SPRING_PROFILES_ACTIVE=${DATABASE}
|
||||
export SPRING_DATASOURCE_DRIVER_CLASS_NAME
|
||||
export SPRING_DATASOURCE_URL
|
||||
export SPRING_DATASOURCE_USERNAME
|
||||
export SPRING_DATASOURCE_PASSWORD
|
||||
|
||||
# DolphinScheduler server related configuration
|
||||
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
|
||||
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
|
||||
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
|
||||
|
||||
# Registry center configuration, determines the type and link of the registry center
|
||||
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
|
||||
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
|
||||
|
||||
# Tasks related configurations, need to change the configuration if you use the related tasks.
|
||||
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
|
||||
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
|
||||
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
|
||||
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
|
||||
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
|
||||
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
|
||||
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
|
||||
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
|
||||
|
||||
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
|
||||
```
|
||||
|
||||
### Services logback configs
|
||||
|
|
|
|||
|
|
@ -218,7 +218,7 @@ A: 1, in **the process definition list**, click the **Start** button.
|
|||
|
||||
## Q : Python task setting Python version
|
||||
|
||||
A: 1,**for the version after 1.0.3** only need to modify PYTHON_HOME in conf/env/.dolphinscheduler_env.sh
|
||||
A: 1,**for the version after 1.0.3** only need to modify PYTHON_HOME in `bin/env/dolphinscheduler_env.sh`
|
||||
|
||||
```
|
||||
export PYTHON_HOME=/bin/python
|
||||
|
|
|
|||
|
|
@ -73,10 +73,10 @@ sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
|
|||
datasource.properties: database connection information
|
||||
zookeeper.properties: information for connecting zk
|
||||
common.properties: Configuration information about the resource store (if hadoop is set up, please check if the core-site.xml and hdfs-site.xml configuration files exist).
|
||||
env/dolphinscheduler_env.sh: environment Variables
|
||||
dolphinscheduler_env.sh: environment Variables
|
||||
````
|
||||
|
||||
- Modify the `dolphinscheduler_env.sh` environment variable in the `conf/env` directory according to the machine configuration (the following is the example that all the used software install under `/opt/soft`)
|
||||
- Modify the `dolphinscheduler_env.sh` environment variable in the `bin/env/dolphinscheduler_env.sh` directory according to the machine configuration (the following is the example that all the used software install under `/opt/soft`)
|
||||
|
||||
```shell
|
||||
export HADOOP_HOME=/opt/soft/hadoop
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ If you are a new hand and want to experience DolphinScheduler functions, we reco
|
|||
|
||||
## Deployment Steps
|
||||
|
||||
Cluster deployment uses the same scripts and configuration files as [pseudo-cluster deployment](pseudo-cluster.md), so the preparation and deployment steps are the same as pseudo-cluster deployment. The difference is that [pseudo-cluster deployment](pseudo-cluster.md) is for one machine, while cluster deployment (Cluster) is for multiple machines. And steps of "Modify Configuration" are quite different between pseudo-cluster deployment and cluster deployment.
|
||||
Cluster deployment uses the same scripts and configuration files as [pseudo-cluster deployment](pseudo-cluster.md), so the preparation and deployment steps are the same as pseudo-cluster deployment. The difference is that pseudo-cluster deployment is for one machine, while cluster deployment (Cluster) is for multiple machines. And steps of "Modify Configuration" are quite different between pseudo-cluster deployment and cluster deployment.
|
||||
|
||||
### Prerequisites and DolphinScheduler Startup Environment Preparations
|
||||
|
||||
|
|
@ -32,8 +32,8 @@ apiServers="ds5"
|
|||
|
||||
## Start and Login DolphinScheduler
|
||||
|
||||
Same as pseudo-cluster.md](pseudo-cluster.md)
|
||||
Same as [pseudo-cluster](pseudo-cluster.md)
|
||||
|
||||
## Start and Stop Server
|
||||
|
||||
Same as pseudo-cluster.md](pseudo-cluster.md)
|
||||
Same as [pseudo-cluster](pseudo-cluster.md)
|
||||
|
|
@ -87,7 +87,13 @@ sh script/create-dolphinscheduler.sh
|
|||
|
||||
## Modify Configuration
|
||||
|
||||
After completing the preparation of the basic environment, you need to modify the configuration file according to your environment. The configuration file is in the path of `conf/config/install_config.conf`. Generally, you just need to modify the **INSTALL MACHINE, DolphinScheduler ENV, Database, Registry Server** part to complete the deployment, the following describes the parameters that must be modified:
|
||||
After completing the preparation of the basic environment, you need to modify the configuration file according to the
|
||||
environment you used. The configuration files are both in directory `bin/env` and named `install_env.sh` and `dolphinscheduler_env.sh`.
|
||||
|
||||
### Modify `install_env.sh`
|
||||
|
||||
File `install_env.sh` describes which machines will be installed DolphinScheduler and what server will be installed on
|
||||
each machine. You could find this file in the path `bin/env/install_env.sh` and the detail of the configuration as below.
|
||||
|
||||
```shell
|
||||
# ---------------------------------------------------------
|
||||
|
|
@ -105,29 +111,30 @@ installPath="~/dolphinscheduler"
|
|||
|
||||
# Deploy user, use the user you create in section **Configure machine SSH password-free login**
|
||||
deployUser="dolphinscheduler"
|
||||
```
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# DolphinScheduler ENV
|
||||
# ---------------------------------------------------------
|
||||
# The path of JAVA_HOME, which JDK install path in section **Preparation**
|
||||
javaHome="/your/java/home/here"
|
||||
### Modify `dolphinscheduler_env.sh`
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Database
|
||||
# ---------------------------------------------------------
|
||||
# Database type, username, password, IP, port, metadata. For now `dbtype` supports `mysql` and `postgresql`
|
||||
dbtype="mysql"
|
||||
dbhost="localhost:3306"
|
||||
# Need to modify if you are not using `dolphinscheduler/dolphinscheduler` as your username and password
|
||||
username="dolphinscheduler"
|
||||
password="dolphinscheduler"
|
||||
dbname="dolphinscheduler"
|
||||
File `dolphinscheduler_env.sh` describes the database configuration of DolphinScheduler, which in the path `bin/env/dolphinscheduler_env.sh`
|
||||
and some tasks which need external dependencies or libraries such as `JAVA_HOME` and `SPARK_HOME`. You could ignore the
|
||||
task external dependencies if you do not use those tasks, but you have to change `JAVA_HOME`, registry center and database
|
||||
related configurations based on your environment.
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Registry Server
|
||||
# ---------------------------------------------------------
|
||||
# Registration center address, the address of ZooKeeper service
|
||||
registryServers="localhost:2181"
|
||||
```sh
|
||||
# JAVA_HOME, will use it to start DolphinScheduler server
|
||||
export JAVA_HOME=${JAVA_HOME:-/custom/path}
|
||||
|
||||
# Database related configuration, set database type, username and password
|
||||
export DATABASE=${DATABASE:-postgresql}
|
||||
export SPRING_PROFILES_ACTIVE=${DATABASE}
|
||||
export SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.postgresql.Driver
|
||||
export SPRING_DATASOURCE_URL="jdbc:postgresql://127.0.0.1:5432/dolphinscheduler"
|
||||
export SPRING_DATASOURCE_USERNAME="username"
|
||||
export SPRING_DATASOURCE_PASSWORD="password"
|
||||
|
||||
# Registry center configuration, determines the type and link of the registry center
|
||||
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
|
||||
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
|
||||
```
|
||||
|
||||
## Initialize the Database
|
||||
|
|
@ -178,7 +185,7 @@ sh tools/bin/create-schema.sh
|
|||
Use **deployment user** you created above, running the following command to complete the deployment, and the server log will be stored in the logs folder.
|
||||
|
||||
```shell
|
||||
sh install.sh
|
||||
sh ./bin/install.sh
|
||||
```
|
||||
|
||||
> **_Note:_** For the first time deployment, there maybe occur five times of `sh: bin/dolphinscheduler-daemon.sh: No such file or directory` in the terminal,
|
||||
|
|
@ -214,7 +221,12 @@ sh ./bin/dolphinscheduler-daemon.sh start alert-server
|
|||
sh ./bin/dolphinscheduler-daemon.sh stop alert-server
|
||||
```
|
||||
|
||||
> **_Note:_**: Please refer to the section of "System Architecture Design" for service usage. Python gateway service is
|
||||
> **_Note1:_**: Each server have `dolphinscheduler_env.sh` file in path `<server-name>/conf/dolphinscheduler_env.sh` which
|
||||
> for micro-services need. It means that you could start all servers by command `<server-name>/bin/start.sh` with different
|
||||
> environment variable from `bin/env/dolphinscheduler_env.sh`. But it will use file `bin/env/dolphinscheduler_env.sh` overwrite
|
||||
> `<server-name>/conf/dolphinscheduler_env.sh` if you start server with command `/bin/dolphinscheduler-daemon.sh start <server-name>`.
|
||||
|
||||
> **_Note2:_**: Please refer to the section of "System Architecture Design" for service usage. Python gateway service is
|
||||
> started along with the api-server, and if you do not want to start Python gateway service please disabled it by changing
|
||||
> the yaml config `python-gateway.enabled : false` in api-server's configuration path `api-server/conf/application.yaml`
|
||||
|
||||
|
|
|
|||
|
|
@ -977,7 +977,7 @@ Configure the mail service port for `alert-server`, default value `empty`.
|
|||
|
||||
Configure the mail sender for `alert-server`, default value `empty`.
|
||||
|
||||
**`MAIL_USER=`**
|
||||
**`MAIL_USER`**
|
||||
|
||||
Configure the user name of the mail service for `alert-server`, default value `empty`.
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ This example demonstrates how to import data from Hive into MySQL.
|
|||
|
||||
### Configure the DataX environment in DolphinScheduler
|
||||
|
||||
If you are using the DataX task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
|
||||
If you are using the DataX task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ This is a common introductory case in the big data ecosystem, which often apply
|
|||
|
||||
#### Configure the flink environment in DolphinScheduler
|
||||
|
||||
If you are using the flink task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
|
||||
If you are using the flink task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ This example is a common introductory type of MapReduce application, which used
|
|||
|
||||
#### Configure the MapReduce Environment in DolphinScheduler
|
||||
|
||||
If you are using the MapReduce task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
|
||||
If you are using the MapReduce task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ This is a common introductory case in the big data ecosystem, which often apply
|
|||
|
||||
#### Configure the Spark Environment in DolphinScheduler
|
||||
|
||||
If you are using the Spark task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
|
||||
If you are using the Spark task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -380,21 +380,42 @@ apiServers="ds1"
|
|||
```
|
||||
|
||||
## 11.dolphinscheduler_env.sh [环境变量配置]
|
||||
通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中.
|
||||
涉及到的任务类型有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等
|
||||
|
||||
通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中. 涉及到的 `JAVA_HOME`、元数据库、注册中心和任务类型配置,其中任务
|
||||
类型主要有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等
|
||||
|
||||
```bash
|
||||
export HADOOP_HOME=/opt/soft/hadoop
|
||||
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
|
||||
export SPARK_HOME1=/opt/soft/spark1
|
||||
export SPARK_HOME2=/opt/soft/spark2
|
||||
export PYTHON_HOME=/opt/soft/python
|
||||
export JAVA_HOME=/opt/soft/java
|
||||
export HIVE_HOME=/opt/soft/hive
|
||||
export FLINK_HOME=/opt/soft/flink
|
||||
export DATAX_HOME=/opt/soft/datax/bin/datax.py
|
||||
# JAVA_HOME, will use it to start DolphinScheduler server
|
||||
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
|
||||
|
||||
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
|
||||
# Database related configuration, set database type, username and password
|
||||
export DATABASE=${DATABASE:-postgresql}
|
||||
export SPRING_PROFILES_ACTIVE=${DATABASE}
|
||||
export SPRING_DATASOURCE_DRIVER_CLASS_NAME
|
||||
export SPRING_DATASOURCE_URL
|
||||
export SPRING_DATASOURCE_USERNAME
|
||||
export SPRING_DATASOURCE_PASSWORD
|
||||
|
||||
# DolphinScheduler server related configuration
|
||||
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
|
||||
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
|
||||
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
|
||||
|
||||
# Registry center configuration, determines the type and link of the registry center
|
||||
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
|
||||
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
|
||||
|
||||
# Tasks related configurations, need to change the configuration if you use the related tasks.
|
||||
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
|
||||
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
|
||||
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
|
||||
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
|
||||
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
|
||||
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
|
||||
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
|
||||
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
|
||||
|
||||
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
|
||||
```
|
||||
|
||||
## 12.各服务日志配置文件
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@ A: 1,在 **流程定义列表**,点击 **启动** 按钮
|
|||
|
||||
## Q:Python 任务设置 Python 版本
|
||||
|
||||
A: 只需要修改 conf/env/dolphinscheduler_env.sh 中的 PYTHON_HOME
|
||||
A: 只需要修改 `bin/env/dolphinscheduler_env.sh` 中的 PYTHON_HOME
|
||||
|
||||
```
|
||||
export PYTHON_HOME=/bin/python
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
|
|||
datasource.properties 中的数据库连接信息.
|
||||
zookeeper.properties 中的连接zk的信息.
|
||||
common.properties 中关于资源存储的配置信息(如果设置了hadoop,请检查是否存在core-site.xml和hdfs-site.xml配置文件).
|
||||
env/dolphinscheduler_env.sh 中的环境变量
|
||||
dolphinscheduler_env.sh 中的环境变量
|
||||
````
|
||||
|
||||
- 根据机器配置,修改 conf/env 目录下的 `dolphinscheduler_env.sh` 环境变量(以相关用到的软件都安装在/opt/soft下为例)
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
## 部署步骤
|
||||
|
||||
集群部署(Cluster)使用的脚本和配置文件与[伪集群部署](pseudo-cluster.md)中的配置一样,所以所需要的步骤也与[伪集群部署](pseudo-cluster.md)大致一样。区别就是[伪集群部署](pseudo-cluster.md)针对的是一台机器,而集群部署(Cluster)需要针对多台机器,且两者“修改相关配置”步骤区别较大
|
||||
集群部署(Cluster)使用的脚本和配置文件与[伪集群部署](pseudo-cluster.md)中的配置一样,所以所需要的步骤也与伪集群部署大致一样。区别就是伪集群部署针对的是一台机器,而集群部署(Cluster)需要针对多台机器,且两者“修改相关配置”步骤区别较大
|
||||
|
||||
### 前置准备工作 && 准备 DolphinScheduler 启动环境
|
||||
|
||||
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
### 修改相关配置
|
||||
|
||||
这个是与[伪集群部署](pseudo-cluster.md)差异较大的一步,因为部署脚本会通过 `scp` 的方式将安装需要的资源传输到各个机器上,所以这一步我们仅需要修改运行`install.sh`脚本的所在机器的配置即可。配置文件在路径在`conf/config/install_config.conf`下,此处我们仅需修改**INSTALL MACHINE**,**DolphinScheduler ENV、Database、Registry Server**与[伪集群部署](pseudo-cluster.md)保持一致,下面对必须修改参数进行说明
|
||||
这个是与[伪集群部署](pseudo-cluster.md)差异较大的一步,因为部署脚本会通过 `scp` 的方式将安装需要的资源传输到各个机器上,所以这一步我们仅需要修改运行`install.sh`脚本的所在机器的配置即可。配置文件在路径在`conf/config/install_config.conf`下,此处我们仅需修改**INSTALL MACHINE**,**DolphinScheduler ENV、Database、Registry Server**与伪集群部署保持一致,下面对必须修改参数进行说明
|
||||
|
||||
```shell
|
||||
# ---------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -87,47 +87,51 @@ sh script/create-dolphinscheduler.sh
|
|||
|
||||
## 修改相关配置
|
||||
|
||||
完成了基础环境的准备后,在运行部署命令前,还需要根据环境修改配置文件。配置文件在路径在`conf/config/install_config.conf`下,一般部署只需要修改**INSTALL MACHINE、DolphinScheduler ENV、Database、Registry Server**部分即可完成部署,下面对必须修改参数进行说明
|
||||
完成基础环境的准备后,需要根据你的机器环境修改配置文件。配置文件可以在目录 `bin/env` 中找到,他们分别是 并命名为 `install_env.sh` 和 `dolphinscheduler_env.sh`。
|
||||
|
||||
### 修改 `install_env.sh` 文件
|
||||
|
||||
文件 `install_env.sh` 描述了哪些机器将被安装 DolphinScheduler 以及每台机器对应安装哪些服务。您可以在路径 `bin/env/install_env.sh` 中找到此文件,配置详情如下。
|
||||
|
||||
```shell
|
||||
# ---------------------------------------------------------
|
||||
# INSTALL MACHINE
|
||||
# ---------------------------------------------------------
|
||||
# 因为是在单节点上部署master、worker、API server,所以服务器的IP均为机器IP或者localhost
|
||||
# Due to the master, worker, and API server being deployed on a single node, the IP of the server is the machine IP or localhost
|
||||
ips="localhost"
|
||||
masters="localhost"
|
||||
workers="localhost:default"
|
||||
alertServer="localhost"
|
||||
apiServers="localhost"
|
||||
|
||||
# DolphinScheduler安装路径,如果不存在会创建
|
||||
# DolphinScheduler installation path, it will auto-create if not exists
|
||||
installPath="~/dolphinscheduler"
|
||||
|
||||
# 部署用户,填写在 **配置用户免密及权限** 中创建的用户
|
||||
# Deploy user, use the user you create in section **Configure machine SSH password-free login**
|
||||
deployUser="dolphinscheduler"
|
||||
```
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# DolphinScheduler ENV
|
||||
# ---------------------------------------------------------
|
||||
# JAVA_HOME 的路径,是在 **前置准备工作** 安装的JDK中 JAVA_HOME 所在的位置
|
||||
javaHome="/your/java/home/here"
|
||||
### 修改 `dolphinscheduler_env.sh` 文件
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Database
|
||||
# ---------------------------------------------------------
|
||||
# 数据库的类型,用户名,密码,IP,端口,元数据库db。其中dbtype目前支持 mysql 和 postgresql
|
||||
dbtype="mysql"
|
||||
dbhost="localhost:3306"
|
||||
# 如果你不是以 dolphinscheduler/dolphinscheduler 作为用户名和密码的,需要进行修改
|
||||
username="dolphinscheduler"
|
||||
password="dolphinscheduler"
|
||||
dbname="dolphinscheduler"
|
||||
文件 `dolphinscheduler_env.sh` 描述了 DolphinScheduler 的数据库配置,一些任务类型外部依赖路径或库文件,注册中心,其中 `JAVA_HOME`
|
||||
和 `SPARK_HOME`都是在这里定义的,其路径是 `bin/env/dolphinscheduler_env.sh`。如果您不使用某些任务类型,您可以忽略任务外部依赖项,
|
||||
但您必须根据您的环境更改 `JAVA_HOME`、注册中心和数据库相关配置。
|
||||
|
||||
# ---------------------------------------------------------
|
||||
# Registry Server
|
||||
# ---------------------------------------------------------
|
||||
# 注册中心地址,zookeeper服务的地址
|
||||
registryServers="localhost:2181"
|
||||
```sh
|
||||
# JAVA_HOME, will use it to start DolphinScheduler server
|
||||
export JAVA_HOME=${JAVA_HOME:-/custom/path}
|
||||
|
||||
# Database related configuration, set database type, username and password
|
||||
export DATABASE=${DATABASE:-postgresql}
|
||||
export SPRING_PROFILES_ACTIVE=${DATABASE}
|
||||
export SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.postgresql.Driver
|
||||
export SPRING_DATASOURCE_URL="jdbc:postgresql://127.0.0.1:5432/dolphinscheduler"
|
||||
export SPRING_DATASOURCE_USERNAME="username"
|
||||
export SPRING_DATASOURCE_PASSWORD="password"
|
||||
|
||||
# Registry center configuration, determines the type and link of the registry center
|
||||
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
|
||||
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
|
||||
```
|
||||
|
||||
## 初始化数据库
|
||||
|
|
@ -178,7 +182,7 @@ sh tools/bin/create-schema.sh
|
|||
使用上面创建的**部署用户**运行以下命令完成部署,部署后的运行日志将存放在 logs 文件夹内
|
||||
|
||||
```shell
|
||||
sh install.sh
|
||||
sh ./bin/install.sh
|
||||
```
|
||||
|
||||
> **_注意:_** 第一次部署的话,可能出现 5 次`sh: bin/dolphinscheduler-daemon.sh: No such file or directory`相关信息,次为非重要信息直接忽略即可
|
||||
|
|
@ -213,7 +217,13 @@ sh ./bin/dolphinscheduler-daemon.sh start alert-server
|
|||
sh ./bin/dolphinscheduler-daemon.sh stop alert-server
|
||||
```
|
||||
|
||||
> **_注意:_**:服务用途请具体参见《系统架构设计》小节
|
||||
> **_注意1:_**: 每个服务在路径 `<server-name>/conf/dolphinscheduler_env.sh` 中都有 `dolphinscheduler_env.sh` 文件,这是可以为微
|
||||
> 服务需求提供便利。意味着您可以基于不同的环境变量来启动各个服务,只需要在对应服务中配置 `bin/env/dolphinscheduler_env.sh` 然后通过 `<server-name>/bin/start.sh`
|
||||
> 命令启动即可。但是如果您使用命令 `/bin/dolphinscheduler-daemon.sh start <server-name>` 启动服务器,它将会用文件 `bin/env/dolphinscheduler_env.sh`
|
||||
> 覆盖 `<server-name>/conf/dolphinscheduler_env.sh` 然后启动服务,目的是为了减少用户修改配置的成本.
|
||||
|
||||
> **_注意2:_**:服务用途请具体参见《系统架构设计》小节。Python gateway service 默认与 api-server 一起启动,如果您不想启动 Python gateway service
|
||||
> 请通过更改 api-server 配置文件 `api-server/conf/application.yaml` 中的 `python-gateway.enabled : false` 来禁用它。
|
||||
|
||||
[jdk]: https://www.oracle.com/technetwork/java/javase/downloads/index.html
|
||||
[zookeeper]: https://zookeeper.apache.org/releases.html
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker
|
|||
|
||||
### 在 DolphinScheduler 中配置 DataX 环境
|
||||
|
||||
若生产环境中要是使用到 DataX 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
|
||||
若生产环境中要是使用到 DataX 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker
|
|||
|
||||
#### 在 DolphinScheduler 中配置 flink 环境
|
||||
|
||||
若生产环境中要是使用到 flink 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
|
||||
若生产环境中要是使用到 flink 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ MapReduce(MR) 任务类型,用于执行 MapReduce 程序。对于 MapReduce
|
|||
|
||||
#### 在 DolphinScheduler 中配置 MapReduce 环境
|
||||
|
||||
若生产环境中要是使用到 MapReduce 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
|
||||
若生产环境中要是使用到 MapReduce 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ Spark 任务类型,用于执行 Spark 程序。对于 Spark 节点,worker
|
|||
|
||||
#### 在 DolphinScheduler 中配置 Spark 环境
|
||||
|
||||
若生产环境中要是使用到 Spark 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
|
||||
若生产环境中要是使用到 Spark 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
|
||||
|
||||

|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
## 4. 数据库升级
|
||||
- 将`./tools/conf/application.yaml`中的username和password改成你设定数据库用户名和密码
|
||||
|
||||
- 如果选择 MySQL,请修改`./tools/bin/dolphinscheduler_env.sh`中的如下配置, 还需要手动添加 [[ mysql-connector-java 驱动 jar ](https://downloads.MySQL.com/archives/c-j/)] 包到 lib 目录(`./tools/lib`)下,这里下载的是mysql-connector-java-8.0.16.jar
|
||||
- 如果选择 MySQL,请修改`./tools/bin/dolphinscheduler_env.sh`中的如下配置, 还需要手动添加 [ mysql-connector-java 驱动 jar ](https://downloads.MySQL.com/archives/c-j/) 包到 lib 目录(`./tools/lib`)下,这里下载的是mysql-connector-java-8.0.16.jar
|
||||
|
||||
```shell
|
||||
export DATABASE=${DATABASE:-mysql}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@
|
|||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../../script/env</directory>
|
||||
<outputDirectory>bin</outputDirectory>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
<includes>
|
||||
<include>dolphinscheduler_env.sh</include>
|
||||
</includes>
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
BIN_DIR=$(dirname $0)
|
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
|
||||
|
||||
source "$BIN_DIR/dolphinscheduler_env.sh"
|
||||
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
|
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@
|
|||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../script/env</directory>
|
||||
<outputDirectory>bin</outputDirectory>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
<includes>
|
||||
<include>dolphinscheduler_env.sh</include>
|
||||
</includes>
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
BIN_DIR=$(dirname $0)
|
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
|
||||
|
||||
source "$BIN_DIR/dolphinscheduler_env.sh"
|
||||
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
|
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,18 +17,31 @@
|
|||
|
||||
package org.apache.dolphinscheduler.api;
|
||||
|
||||
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.boot.web.servlet.ServletComponentScan;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.event.EventListener;
|
||||
|
||||
@ServletComponentScan
|
||||
@SpringBootApplication
|
||||
@ComponentScan("org.apache.dolphinscheduler")
|
||||
public class ApiApplicationServer {
|
||||
|
||||
@Autowired
|
||||
private TaskPluginManager taskPluginManager;
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ApiApplicationServer.class);
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void run(ApplicationReadyEvent readyEvent) {
|
||||
// install task plugin
|
||||
taskPluginManager.installPlugin();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -215,10 +215,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
|
|||
Map<String, Object> result = new HashMap<>();
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
// check process definition exists
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
|
||||
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
|
||||
// check process definition online
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode));
|
||||
} else if (!checkSubProcessDefinitionValid(processDefinition)){
|
||||
// check sub process definition online
|
||||
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
|
||||
|
|
@ -488,7 +488,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
|
|||
command.setProcessInstanceId(instanceId);
|
||||
|
||||
if (!processService.verifyIsNeedCreateCommand(command)) {
|
||||
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -482,7 +482,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
} else {
|
||||
Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId());
|
||||
if (tenant != null) {
|
||||
|
|
@ -576,7 +576,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
// check process definition exists
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
|
||||
|
|
@ -699,7 +699,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -711,7 +711,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
|
||||
// check process definition is already online
|
||||
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
// check process instances is already running
|
||||
|
|
@ -777,7 +777,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
switch (releaseState) {
|
||||
|
|
@ -1341,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
logger.info("process define not exists");
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
DagData dagData = processService.genDagData(processDefinition);
|
||||
|
|
@ -1487,7 +1487,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
if (null == processDefinition || projectCode != processDefinition.getProjectCode()) {
|
||||
logger.info("process define not exists");
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
|
||||
|
|
@ -1897,7 +1897,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
} else {
|
||||
if (processDefinition.getVersion() == version) {
|
||||
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
|
||||
|
|
@ -2085,7 +2085,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
// check process definition exists
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
|
||||
|
|
@ -2186,7 +2186,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
|
|||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
|
||||
if (processDefinition == null) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
|
|||
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
|
||||
|
|
@ -149,6 +150,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||
@Autowired
|
||||
private TaskPluginManager taskPluginManager;
|
||||
|
||||
@Autowired
|
||||
private ScheduleMapper scheduleMapper;
|
||||
|
||||
/**
|
||||
* return top n SUCCESS process instance order by running time which started between startTime and endTime
|
||||
*/
|
||||
|
|
@ -472,7 +476,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||
processInstance.getName(), processInstance.getState().toString(), "update");
|
||||
return result;
|
||||
}
|
||||
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
|
||||
|
||||
//
|
||||
Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
|
||||
String timezoneId = null;
|
||||
if (commandParamMap == null || StringUtils.isBlank(commandParamMap.get(Constants.SCHEDULE_TIMEZONE))) {
|
||||
timezoneId = loginUser.getTimeZone();
|
||||
} else {
|
||||
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
|
||||
}
|
||||
|
||||
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout, timezoneId);
|
||||
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
|
||||
if (taskDefinitionLogs.isEmpty()) {
|
||||
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
|
||||
|
|
@ -538,7 +552,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||
/**
|
||||
* update process instance attributes
|
||||
*/
|
||||
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout) {
|
||||
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) {
|
||||
Date schedule = processInstance.getScheduleTime();
|
||||
if (scheduleTime != null) {
|
||||
schedule = DateUtils.getScheduleDate(scheduleTime);
|
||||
|
|
@ -546,7 +560,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||
processInstance.setScheduleTime(schedule);
|
||||
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
|
||||
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
|
||||
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule);
|
||||
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone);
|
||||
processInstance.setTimeout(timeout);
|
||||
processInstance.setTenantCode(tenantCode);
|
||||
processInstance.setGlobalParams(globalParams);
|
||||
|
|
@ -672,9 +686,14 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||
return result;
|
||||
}
|
||||
|
||||
Map<String, String> commandParam = JSONUtils.toMap(processInstance.getCommandParam());
|
||||
String timezone = null;
|
||||
if (commandParam != null) {
|
||||
timezone = commandParam.get(Constants.SCHEDULE_TIMEZONE);
|
||||
}
|
||||
Map<String, String> timeParams = BusinessTimeUtils
|
||||
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
|
||||
processInstance.getScheduleTime());
|
||||
processInstance.getScheduleTime(), timezone);
|
||||
String userDefinedParams = processInstance.getGlobalParams();
|
||||
// global params
|
||||
List<Property> globalParams = new ArrayList<>();
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
|
||||
if (processDefinition == null) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
if (processDefinition.getProjectCode() != projectCode) {
|
||||
|
|
@ -122,7 +122,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|
|||
.collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation));
|
||||
if (!preTaskCodeMap.isEmpty()) {
|
||||
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
|
||||
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
|
||||
|
|
@ -202,12 +202,12 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
|
||||
if (processDefinition == null) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
|
||||
if (null == taskDefinition) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
|
||||
return result;
|
||||
}
|
||||
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
|
||||
|
|
@ -305,7 +305,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
|
||||
if (processDefinition == null) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(upstreamList.get(0).getProcessDefinitionCode()));
|
||||
return result;
|
||||
}
|
||||
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
|
||||
|
|
@ -364,7 +364,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
|
||||
if (processDefinition == null) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode());
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(downstreamList.get(0).getProcessDefinitionCode()));
|
||||
return result;
|
||||
}
|
||||
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
|
||||
|
|
@ -468,7 +468,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
|
||||
if (processDefinition == null) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
|
||||
|
|
|
|||
|
|
@ -257,7 +257,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
|
|||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode());
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(schedule.getProcessDefinitionCode()));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -306,7 +306,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
|
||||
return result;
|
||||
}
|
||||
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
|
||||
|
|
@ -336,7 +336,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
|
|||
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
|
||||
logger.info("not release process definition id: {} , name : {}",
|
||||
subProcessDefinition.getId(), subProcessDefinition.getName());
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId());
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -406,7 +406,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
|
|||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -618,7 +618,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
|
|||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -180,11 +180,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
}
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
|
||||
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
|
||||
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionCode);
|
||||
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(processDefinitionCode));
|
||||
return result;
|
||||
}
|
||||
TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
|
||||
|
|
@ -314,7 +314,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
}
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
|
||||
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
|
||||
return result;
|
||||
}
|
||||
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
|
||||
|
|
@ -406,7 +406,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
}
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
|
||||
if (taskDefinition == null) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
|
||||
return null;
|
||||
}
|
||||
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
|
||||
|
|
@ -557,7 +557,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
}
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
|
||||
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
|
||||
return result;
|
||||
}
|
||||
TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
|
||||
|
|
@ -618,7 +618,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
|
||||
|
||||
if (taskDefinition == null) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
|
||||
} else {
|
||||
if (taskDefinition.getVersion() == version) {
|
||||
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
|
||||
|
|
@ -645,7 +645,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
|
||||
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
|
||||
} else {
|
||||
result.put(Constants.DATA_LIST, taskDefinition);
|
||||
putMsg(result, Status.SUCCESS);
|
||||
|
|
@ -752,12 +752,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|||
}
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code);
|
||||
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion());
|
||||
if (taskDefinitionLog == null) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
|
||||
return result;
|
||||
}
|
||||
switch (releaseState) {
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import java.util.regex.Pattern;
|
|||
*/
|
||||
public class RegexUtils {
|
||||
|
||||
private static final String LINUX_USERNAME_PATTERN = "[a-z_][a-z\\d_]{0,30}";
|
||||
private static final String LINUX_USERNAME_PATTERN = "^[a-zA-Z0-9_].{0,30}";
|
||||
|
||||
private RegexUtils() {
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
|
|||
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
|
||||
|
|
@ -121,6 +122,10 @@ public class ProcessInstanceServiceTest {
|
|||
@Mock
|
||||
TaskPluginManager taskPluginManager;
|
||||
|
||||
@Mock
|
||||
ScheduleMapper scheduleMapper;
|
||||
|
||||
|
||||
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
|
||||
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
|
||||
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
|
||||
|
|
|
|||
|
|
@ -28,10 +28,10 @@ public class RegexUtilsTest {
|
|||
@Test
|
||||
public void testIsValidLinuxUserName() {
|
||||
String name1 = "10000";
|
||||
Assert.assertFalse(RegexUtils.isValidLinuxUserName(name1));
|
||||
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name1));
|
||||
|
||||
String name2 = "00hayden";
|
||||
Assert.assertFalse(RegexUtils.isValidLinuxUserName(name2));
|
||||
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name2));
|
||||
|
||||
String name3 = "hayde123456789123456789123456789";
|
||||
Assert.assertFalse(RegexUtils.isValidLinuxUserName(name3));
|
||||
|
|
@ -44,6 +44,12 @@ public class RegexUtilsTest {
|
|||
|
||||
String name6 = "hayden";
|
||||
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name6));
|
||||
|
||||
String name7 = "00hayden_0";
|
||||
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name2));
|
||||
|
||||
String name8 = "00hayden.8";
|
||||
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name2));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ public final class Constants {
|
|||
/**
|
||||
* environment properties default path
|
||||
*/
|
||||
public static final String ENV_PATH = "env/dolphinscheduler_env.sh";
|
||||
public static final String ENV_PATH = "dolphinscheduler_env.sh";
|
||||
|
||||
/**
|
||||
* resource.view.suffixs
|
||||
|
|
@ -817,4 +817,9 @@ public final class Constants {
|
|||
public static final String LIMITS_CPU = "limitsCpu";
|
||||
public static final String LIMITS_MEMORY = "limitsMemory";
|
||||
public static final String K8S_LOCAL_TEST_CLUSTER = "ds_null_k8s";
|
||||
|
||||
/**
|
||||
* schedule timezone
|
||||
*/
|
||||
public static final String SCHEDULE_TIMEZONE = "schedule_timezone";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ public class CommonUtils {
|
|||
|
||||
private static final Base64 BASE64 = new Base64();
|
||||
|
||||
private CommonUtils() {
|
||||
protected CommonUtils() {
|
||||
throw new UnsupportedOperationException("Construct CommonUtils");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ public class ParameterUtils {
|
|||
* @return curing user define parameters
|
||||
*/
|
||||
public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
|
||||
CommandType commandType, Date scheduleTime) {
|
||||
CommandType commandType, Date scheduleTime, String timezone) {
|
||||
|
||||
if (globalParamList == null || globalParamList.isEmpty()) {
|
||||
return null;
|
||||
|
|
@ -101,7 +101,7 @@ public class ParameterUtils {
|
|||
Map<String, String> allParamMap = new HashMap<>();
|
||||
//If it is a complement, a complement time needs to be passed in, according to the task type
|
||||
Map<String, String> timeParams = BusinessTimeUtils.
|
||||
getBusinessTime(commandType, scheduleTime);
|
||||
getBusinessTime(commandType, scheduleTime, timezone);
|
||||
|
||||
if (timeParams != null) {
|
||||
allParamMap.putAll(timeParams);
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ public class BusinessTimeUtils {
|
|||
* @param runTime run time or schedule time
|
||||
* @return business time
|
||||
*/
|
||||
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime) {
|
||||
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime, String timezone) {
|
||||
Date businessDate = runTime;
|
||||
Map<String, String> result = new HashMap<>();
|
||||
switch (commandType) {
|
||||
|
|
@ -71,9 +71,9 @@ public class BusinessTimeUtils {
|
|||
break;
|
||||
}
|
||||
Date businessCurrentDate = addDays(businessDate, 1);
|
||||
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, null));
|
||||
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, null));
|
||||
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, null));
|
||||
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, timezone));
|
||||
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, timezone));
|
||||
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, timezone));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ sudo.enable=true
|
|||
#dolphin.scheduler.network.priority.strategy=default
|
||||
|
||||
# system env path
|
||||
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
|
||||
#dolphinscheduler.env.path=dolphinscheduler_env.sh
|
||||
|
||||
# development state
|
||||
development.state=false
|
||||
|
|
|
|||
|
|
@ -41,8 +41,9 @@ public class CommonUtilsTest {
|
|||
private static final Logger logger = LoggerFactory.getLogger(CommonUtilsTest.class);
|
||||
@Test
|
||||
public void getSystemEnvPath() {
|
||||
logger.info(CommonUtils.getSystemEnvPath());
|
||||
Assert.assertTrue(true);
|
||||
String envPath;
|
||||
envPath = CommonUtils.getSystemEnvPath();
|
||||
Assert.assertEquals("/etc/profile", envPath);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -100,22 +100,22 @@ public class ParameterUtilsTest {
|
|||
Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
|
||||
|
||||
//test globalParamList is null
|
||||
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
|
||||
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
|
||||
Assert.assertNull(result);
|
||||
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null));
|
||||
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime));
|
||||
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
|
||||
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
|
||||
|
||||
//test globalParamList is not null
|
||||
Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
|
||||
globalParamList.add(property);
|
||||
|
||||
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
|
||||
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
|
||||
Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
|
||||
|
||||
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null);
|
||||
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
|
||||
Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
|
||||
|
||||
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
|
||||
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
|
||||
Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
|
||||
|
||||
//test var $ startsWith
|
||||
|
|
@ -130,7 +130,7 @@ public class ParameterUtilsTest {
|
|||
globalParamList.add(property3);
|
||||
globalParamList.add(property4);
|
||||
|
||||
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
|
||||
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
|
||||
Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
|
||||
|
||||
Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
|
||||
|
|
@ -150,7 +150,7 @@ public class ParameterUtilsTest {
|
|||
}
|
||||
}
|
||||
|
||||
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
|
||||
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
|
||||
Assert.assertTrue(result6.contains("20191220"));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
|
|||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
|
||||
|
|
@ -39,6 +40,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.dolphinscheduler.spi.utils.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -534,19 +536,7 @@ public class DagHelper {
|
|||
public static boolean haveConditionsAfterNode(String parentNodeCode,
|
||||
DAG<String, TaskNode, TaskNodeRelation> dag
|
||||
) {
|
||||
boolean result = false;
|
||||
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
|
||||
if (CollectionUtils.isEmpty(subsequentNodes)) {
|
||||
return result;
|
||||
}
|
||||
for (String nodeCode : subsequentNodes) {
|
||||
TaskNode taskNode = dag.getNode(nodeCode);
|
||||
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
|
||||
if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_CONDITIONS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -565,19 +555,38 @@ public class DagHelper {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* is there have blocking node after the parent node
|
||||
*/
|
||||
public static boolean haveBlockingAfterNode(String parentNodeCode,
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag) {
|
||||
return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_BLOCKING);
|
||||
}
|
||||
|
||||
/**
|
||||
* is there have all node after the parent node
|
||||
*/
|
||||
public static boolean haveAllNodeAfterNode(String parentNodeCode,
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag) {
|
||||
return haveSubAfterNode(parentNodeCode, dag, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether there is a specified type of child node after the parent node
|
||||
*/
|
||||
public static boolean haveSubAfterNode(String parentNodeCode,
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag, String filterNodeType) {
|
||||
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
|
||||
if (CollectionUtils.isEmpty(subsequentNodes)) {
|
||||
return false;
|
||||
}
|
||||
if (StringUtils.isBlank(filterNodeType)){
|
||||
return true;
|
||||
}
|
||||
for (String nodeName : subsequentNodes) {
|
||||
TaskNode taskNode = dag.getNode(nodeName);
|
||||
List<String> preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class);
|
||||
if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) {
|
||||
if (taskNode.getType().equalsIgnoreCase(filterNodeType)){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
|
|||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessData;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
|
||||
|
|
@ -48,6 +49,57 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
* dag helper test
|
||||
*/
|
||||
public class DagHelperTest {
|
||||
|
||||
@Test
|
||||
public void testHaveSubAfterNode(){
|
||||
String parentNodeCode = "5293789969856";
|
||||
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
|
||||
TaskNodeRelation relation = new TaskNodeRelation();
|
||||
relation.setStartNode("5293789969856");
|
||||
relation.setEndNode("5293789969857");
|
||||
taskNodeRelations.add(relation);
|
||||
|
||||
TaskNodeRelation relationNext = new TaskNodeRelation();
|
||||
relationNext.setStartNode("5293789969856");
|
||||
relationNext.setEndNode("5293789969858");
|
||||
taskNodeRelations.add(relationNext);
|
||||
|
||||
List<TaskNode> taskNodes = new ArrayList<>();
|
||||
TaskNode node = new TaskNode();
|
||||
node.setCode(5293789969856L);
|
||||
node.setType("SHELL");
|
||||
|
||||
TaskNode subNode = new TaskNode();
|
||||
subNode.setCode(5293789969857L);
|
||||
subNode.setType("BLOCKING");
|
||||
subNode.setPreTasks("[5293789969856]");
|
||||
|
||||
TaskNode subNextNode = new TaskNode();
|
||||
subNextNode.setCode(5293789969858L);
|
||||
subNextNode.setType("CONDITIONS");
|
||||
subNextNode.setPreTasks("[5293789969856]");
|
||||
|
||||
taskNodes.add(node);
|
||||
taskNodes.add(subNode);
|
||||
taskNodes.add(subNextNode);
|
||||
|
||||
ProcessDag processDag = new ProcessDag();
|
||||
processDag.setEdges(taskNodeRelations);
|
||||
processDag.setNodes(taskNodes);
|
||||
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
|
||||
boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag);
|
||||
Assert.assertTrue(canSubmit);
|
||||
|
||||
boolean haveBlocking = DagHelper.haveBlockingAfterNode(parentNodeCode, dag);
|
||||
Assert.assertTrue(haveBlocking);
|
||||
|
||||
boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag);
|
||||
Assert.assertTrue(haveConditions);
|
||||
|
||||
boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_DEPENDENT);
|
||||
Assert.assertFalse(dependent);
|
||||
}
|
||||
|
||||
/**
|
||||
* test task node can submit
|
||||
*
|
||||
|
|
@ -280,9 +332,16 @@ public class DagHelperTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* process:
|
||||
* 1->2->3->5->7
|
||||
* 4->3->6
|
||||
* 2->8->5->7
|
||||
* 1->2->8->5->7
|
||||
* DAG graph:
|
||||
* 4 -> -> 6
|
||||
* \ /
|
||||
* 1 -> 2 -> 3 -> 5 -> 7
|
||||
* \ /
|
||||
* -> 8 ->
|
||||
*
|
||||
* @return dag
|
||||
* @throws JsonProcessingException if error throws JsonProcessingException
|
||||
|
|
@ -377,9 +436,12 @@ public class DagHelperTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* 1->2->3->5->7
|
||||
* 4->3->6
|
||||
* 2->8->5->7
|
||||
* DAG graph:
|
||||
* 2
|
||||
* ↑
|
||||
* 0->1(switch)
|
||||
* ↓
|
||||
* 4
|
||||
*
|
||||
* @return dag
|
||||
* @throws JsonProcessingException if error throws JsonProcessingException
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ sudo.enable=true
|
|||
# network IP gets priority, default: inner outer
|
||||
#dolphin.scheduler.network.priority.strategy=default
|
||||
# system env path
|
||||
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
|
||||
#dolphinscheduler.env.path=dolphinscheduler_env.sh
|
||||
# development state
|
||||
development.state=false
|
||||
# rpc port
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@
|
|||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../script/env</directory>
|
||||
<outputDirectory>bin</outputDirectory>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
<includes>
|
||||
<include>dolphinscheduler_env.sh</include>
|
||||
</includes>
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
BIN_DIR=$(dirname $0)
|
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
|
||||
|
||||
source "$BIN_DIR/dolphinscheduler_env.sh"
|
||||
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
|
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
|
|||
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
|
||||
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
|
|
@ -68,6 +69,9 @@ public class MasterServer implements IStoppable {
|
|||
@Autowired
|
||||
private MasterRegistryClient masterRegistryClient;
|
||||
|
||||
@Autowired
|
||||
private TaskPluginManager taskPluginManager;
|
||||
|
||||
@Autowired
|
||||
private MasterSchedulerService masterSchedulerService;
|
||||
|
||||
|
|
@ -131,6 +135,9 @@ public class MasterServer implements IStoppable {
|
|||
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
// install task plugin
|
||||
this.taskPluginManager.installPlugin();
|
||||
|
||||
// self tolerant
|
||||
this.masterRegistryClient.init();
|
||||
this.masterRegistryClient.start();
|
||||
|
|
|
|||
|
|
@ -50,6 +50,9 @@ public class FailoverExecuteThread extends Thread {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
// when startup, wait 10s for ready
|
||||
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10);
|
||||
|
||||
logger.info("failover execute thread started");
|
||||
while (Stopper.isRunning()) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutor
|
|||
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
|
||||
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
|
||||
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
|
|
@ -102,9 +101,6 @@ public class MasterSchedulerService extends Thread {
|
|||
@Autowired
|
||||
private StateWheelExecuteThread stateWheelExecuteThread;
|
||||
|
||||
@Autowired
|
||||
private TaskPluginManager taskPluginManager;
|
||||
|
||||
/**
|
||||
* constructor of MasterSchedulerService
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -456,9 +456,9 @@ public class WorkflowExecuteThread {
|
|||
retryTaskInstance(taskInstance);
|
||||
} else if (taskInstance.getState().typeIsFailure()) {
|
||||
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
||||
if (taskInstance.isConditionsTask()
|
||||
|| DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
|
||||
|| DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
|
||||
// There are child nodes and the failure policy is: CONTINUE
|
||||
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
|
||||
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
|
||||
submitPostNode(Long.toString(taskInstance.getTaskCode()));
|
||||
} else {
|
||||
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
||||
|
|
@ -948,7 +948,7 @@ public class WorkflowExecuteThread {
|
|||
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
|
||||
processDefinition.getGlobalParamMap(),
|
||||
processDefinition.getGlobalParamList(),
|
||||
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
|
||||
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
|
||||
processService.updateProcessInstance(processInstance);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class ParamsTest {
|
|||
// start process
|
||||
Map<String,String> timeParams = BusinessTimeUtils
|
||||
.getBusinessTime(CommandType.START_PROCESS,
|
||||
new Date());
|
||||
new Date(), null);
|
||||
|
||||
command = ParameterUtils.convertParameterPlaceholders(command, timeParams);
|
||||
|
||||
|
|
@ -57,7 +57,7 @@ public class ParamsTest {
|
|||
// complement data
|
||||
timeParams = BusinessTimeUtils
|
||||
.getBusinessTime(CommandType.COMPLEMENT_DATA,
|
||||
calendar.getTime());
|
||||
calendar.getTime(), null);
|
||||
command = ParameterUtils.convertParameterPlaceholders(command, timeParams);
|
||||
logger.info("complement data : {}",command);
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
"""Task sql."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Dict, Optional
|
||||
|
||||
|
|
@ -24,6 +25,8 @@ from pydolphinscheduler.constants import TaskType
|
|||
from pydolphinscheduler.core.database import Database
|
||||
from pydolphinscheduler.core.task import Task
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
class SqlType:
|
||||
"""SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
|
||||
|
|
@ -61,6 +64,7 @@ class Sql(Task):
|
|||
name: str,
|
||||
datasource_name: str,
|
||||
sql: str,
|
||||
sql_type: Optional[int] = None,
|
||||
pre_statements: Optional[str] = None,
|
||||
post_statements: Optional[str] = None,
|
||||
display_rows: Optional[int] = 10,
|
||||
|
|
@ -69,6 +73,7 @@ class Sql(Task):
|
|||
):
|
||||
super().__init__(name, TaskType.SQL, *args, **kwargs)
|
||||
self.sql = sql
|
||||
self.param_sql_type = sql_type
|
||||
self.datasource_name = datasource_name
|
||||
self.pre_statements = pre_statements or []
|
||||
self.post_statements = post_statements or []
|
||||
|
|
@ -76,9 +81,24 @@ class Sql(Task):
|
|||
|
||||
@property
|
||||
def sql_type(self) -> int:
|
||||
"""Judgement sql type, use regexp to check which type of the sql is."""
|
||||
"""Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
|
||||
|
||||
If `param_sql_type` dot not specific, will use regexp to check
|
||||
which type of the SQL is. But if `param_sql_type` is specific
|
||||
will use the parameter overwrites the regexp way
|
||||
"""
|
||||
if (
|
||||
self.param_sql_type == SqlType.SELECT
|
||||
or self.param_sql_type == SqlType.NOT_SELECT
|
||||
):
|
||||
log.info(
|
||||
"The sql type is specified by a parameter, with value %s",
|
||||
self.param_sql_type,
|
||||
)
|
||||
return self.param_sql_type
|
||||
pattern_select_str = (
|
||||
"^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
|
||||
"^(?!(.* |)insert |(.* |)delete |(.* |)drop "
|
||||
"|(.* |)update |(.* |)alter |(.* |)create ).*"
|
||||
)
|
||||
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
|
||||
if pattern_select.match(self.sql) is None:
|
||||
|
|
|
|||
|
|
@ -26,24 +26,38 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sql, sql_type",
|
||||
"sql, param_sql_type, sql_type",
|
||||
[
|
||||
("select 1", SqlType.SELECT),
|
||||
(" select 1", SqlType.SELECT),
|
||||
(" select 1 ", SqlType.SELECT),
|
||||
(" select 'insert' ", SqlType.SELECT),
|
||||
(" select 'insert ' ", SqlType.SELECT),
|
||||
("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
|
||||
("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
|
||||
("select 1", None, SqlType.SELECT),
|
||||
(" select 1", None, SqlType.SELECT),
|
||||
(" select 1 ", None, SqlType.SELECT),
|
||||
(" select 'insert' ", None, SqlType.SELECT),
|
||||
(" select 'insert ' ", None, SqlType.SELECT),
|
||||
("with tmp as (select 1) select * from tmp ", None, SqlType.SELECT),
|
||||
(
|
||||
"insert into table_name(select, col2) value ('select', val2)",
|
||||
"insert into table_name(col1, col2) value (val1, val2)",
|
||||
None,
|
||||
SqlType.NOT_SELECT,
|
||||
),
|
||||
("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
|
||||
("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
|
||||
("delete from table_name where id < 10", SqlType.NOT_SELECT),
|
||||
("delete from table_name where id < 10", SqlType.NOT_SELECT),
|
||||
("alter table table_name add column col1 int", SqlType.NOT_SELECT),
|
||||
(
|
||||
"insert into table_name(select, col2) value ('select', val2)",
|
||||
None,
|
||||
SqlType.NOT_SELECT,
|
||||
),
|
||||
("update table_name SET col1=val1 where col1=val2", None, SqlType.NOT_SELECT),
|
||||
(
|
||||
"update table_name SET col1='select' where col1=val2",
|
||||
None,
|
||||
SqlType.NOT_SELECT,
|
||||
),
|
||||
("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
|
||||
("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
|
||||
("alter table table_name add column col1 int", None, SqlType.NOT_SELECT),
|
||||
("create table table_name2 (col1 int)", None, SqlType.NOT_SELECT),
|
||||
("create table table_name2 (col1 int)", SqlType.SELECT, SqlType.SELECT),
|
||||
("select 1", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
|
||||
("create table table_name2 (col1 int)", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
|
||||
("select 1", SqlType.SELECT, SqlType.SELECT),
|
||||
],
|
||||
)
|
||||
@patch(
|
||||
|
|
@ -54,11 +68,13 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
|
|||
"pydolphinscheduler.core.database.Database.get_database_info",
|
||||
return_value=({"id": 1, "type": "mock_type"}),
|
||||
)
|
||||
def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
|
||||
def test_get_sql_type(
|
||||
mock_datasource, mock_code_version, sql, param_sql_type, sql_type
|
||||
):
|
||||
"""Test property sql_type could return correct type."""
|
||||
name = "test_get_sql_type"
|
||||
datasource_name = "test_datasource"
|
||||
task = Sql(name, datasource_name, sql)
|
||||
task = Sql(name, datasource_name, sql, sql_type=param_sql_type)
|
||||
assert (
|
||||
sql_type == task.sql_type
|
||||
), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
|
|||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
|
||||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
|
||||
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
|
|
@ -399,6 +401,13 @@ public class ProcessServiceImpl implements ProcessService {
|
|||
public int createCommand(Command command) {
|
||||
int result = 0;
|
||||
if (command != null) {
|
||||
// add command timezone
|
||||
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
|
||||
Map<String, String> commandParams = JSONUtils.toMap(command.getCommandParam());
|
||||
if (commandParams != null && schedule != null) {
|
||||
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
|
||||
command.setCommandParam(JSONUtils.toJsonString(commandParams));
|
||||
}
|
||||
result = commandMapper.insert(command);
|
||||
}
|
||||
return result;
|
||||
|
|
@ -771,11 +780,17 @@ public class ProcessServiceImpl implements ProcessService {
|
|||
setGlobalParamIfCommanded(processDefinition, cmdParam);
|
||||
|
||||
// curing global params
|
||||
Map<String, String> commandParamMap = JSONUtils.toMap(command.getCommandParam());
|
||||
String timezoneId = null;
|
||||
if (commandParamMap != null) {
|
||||
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
|
||||
}
|
||||
|
||||
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
|
||||
processDefinition.getGlobalParamMap(),
|
||||
processDefinition.getGlobalParamList(),
|
||||
getCommandTypeIfComplement(processInstance, command),
|
||||
processInstance.getScheduleTime()));
|
||||
processInstance.getScheduleTime(), timezoneId));
|
||||
|
||||
// set process instance priority
|
||||
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
|
||||
|
|
@ -801,14 +816,22 @@ public class ProcessServiceImpl implements ProcessService {
|
|||
}
|
||||
startParamMap.putAll(fatherParamMap);
|
||||
// set start param into global params
|
||||
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
|
||||
List<Property> globalParamList = processDefinition.getGlobalParamList();
|
||||
if (startParamMap.size() > 0
|
||||
&& processDefinition.getGlobalParamMap() != null) {
|
||||
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
|
||||
&& globalMap != null) {
|
||||
for (Map.Entry<String, String> param : globalMap.entrySet()) {
|
||||
String val = startParamMap.get(param.getKey());
|
||||
if (val != null) {
|
||||
param.setValue(val);
|
||||
}
|
||||
}
|
||||
for (Entry<String, String> startParam : startParamMap.entrySet()) {
|
||||
if (!globalMap.containsKey(startParam.getKey())) {
|
||||
globalMap.put(startParam.getKey(), startParam.getValue());
|
||||
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -909,12 +932,15 @@ public class ProcessServiceImpl implements ProcessService {
|
|||
setGlobalParamIfCommanded(processDefinition, cmdParam);
|
||||
}
|
||||
|
||||
// time zone
|
||||
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
|
||||
|
||||
// Recalculate global parameters after rerun.
|
||||
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
|
||||
processDefinition.getGlobalParamMap(),
|
||||
processDefinition.getGlobalParamList(),
|
||||
commandTypeIfComplement,
|
||||
processInstance.getScheduleTime()));
|
||||
processInstance.getScheduleTime(), timezoneId));
|
||||
processInstance.setProcessDefinition(processDefinition);
|
||||
}
|
||||
//reset command parameter
|
||||
|
|
@ -1092,10 +1118,14 @@ public class ProcessServiceImpl implements ProcessService {
|
|||
&& Flag.NO == processInstance.getIsSubProcess()) {
|
||||
processInstance.setScheduleTime(complementDate.get(0));
|
||||
}
|
||||
|
||||
// time zone
|
||||
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
|
||||
|
||||
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
|
||||
processDefinition.getGlobalParamMap(),
|
||||
processDefinition.getGlobalParamList(),
|
||||
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
|
||||
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1181,7 +1211,7 @@ public class ProcessServiceImpl implements ProcessService {
|
|||
private String joinVarPool(String parentValPool, String subValPool) {
|
||||
List<Property> parentValPools = Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class));
|
||||
parentValPools = parentValPools.stream().filter(valPool -> valPool.getDirect() == Direct.OUT).collect(Collectors.toList());
|
||||
|
||||
|
||||
List<Property> subValPools = Lists.newArrayList(JSONUtils.toList(subValPool, Property.class));
|
||||
|
||||
Set<String> parentValPoolKeys = parentValPools.stream().map(Property::getProp).collect(toSet());
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
|
|
@ -86,8 +84,7 @@ public class TaskPluginManager {
|
|||
return taskChannel.parseParameters(parametersNode);
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void installPlugin(ApplicationReadyEvent readyEvent) {
|
||||
public void installPlugin() {
|
||||
final Set<String> names = new HashSet<>();
|
||||
|
||||
ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
|
||||
|
|
|
|||
|
|
@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
|
|||
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
|
||||
|
|
@ -155,6 +156,9 @@ public class ProcessServiceTest {
|
|||
@Mock
|
||||
private DqComparisonTypeMapper dqComparisonTypeMapper;
|
||||
|
||||
@Mock
|
||||
private ScheduleMapper scheduleMapper;
|
||||
|
||||
@Test
|
||||
public void testCreateSubCommand() {
|
||||
ProcessInstance parentInstance = new ProcessInstance();
|
||||
|
|
|
|||
|
|
@ -69,23 +69,13 @@
|
|||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../script/env</directory>
|
||||
<outputDirectory>bin</outputDirectory>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
<includes>
|
||||
<include>dolphinscheduler_env.sh</include>
|
||||
</includes>
|
||||
<fileMode>0755</fileMode>
|
||||
<directoryMode>0755</directoryMode>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../script/env</directory>
|
||||
<outputDirectory>dist-bin</outputDirectory>
|
||||
<includes>
|
||||
<include>dolphinscheduler_env.sh</include>
|
||||
</includes>
|
||||
<fileMode>0755</fileMode>
|
||||
<directoryMode>0755</directoryMode>
|
||||
</fileSet>
|
||||
|
||||
<fileSet>
|
||||
<directory>${basedir}/../dolphinscheduler-dao/src/main/resources</directory>
|
||||
<includes>
|
||||
|
|
@ -93,6 +83,13 @@
|
|||
</includes>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../dolphinscheduler-common/src/main/resources</directory>
|
||||
<includes>
|
||||
<include>common.properties</include>
|
||||
</includes>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../dolphinscheduler-ui-next/dist</directory>
|
||||
<outputDirectory>./ui</outputDirectory>
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
BIN_DIR=$(dirname $0)
|
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
|
||||
|
||||
source "$BIN_DIR/dolphinscheduler_env.sh"
|
||||
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
|
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
BIN_DIR=$(dirname $0)
|
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
|
||||
|
||||
source "$BIN_DIR/dolphinscheduler_env.sh"
|
||||
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
|
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@
|
|||
</fileSet>
|
||||
<fileSet>
|
||||
<directory>${basedir}/../script/env</directory>
|
||||
<outputDirectory>bin</outputDirectory>
|
||||
<outputDirectory>conf</outputDirectory>
|
||||
<includes>
|
||||
<include>dolphinscheduler_env.sh</include>
|
||||
</includes>
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@
|
|||
BIN_DIR=$(dirname $0)
|
||||
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/../..; pwd)}
|
||||
|
||||
source "$DOLPHINSCHEDULER_HOME/tools/bin/dolphinscheduler_env.sh"
|
||||
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
|
||||
|
||||
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ import {
|
|||
} from '@vicons/antd'
|
||||
import { useRoute } from 'vue-router'
|
||||
import { useUserStore } from '@/store/user/user'
|
||||
import { timezoneList } from '@/utils/timezone'
|
||||
import { timezoneList } from '@/common/timezone'
|
||||
import type { UserInfoRes } from '@/service/modules/users/types'
|
||||
|
||||
export function useDataList() {
|
||||
|
|
|
|||
|
|
@ -1037,7 +1037,7 @@ const security = {
|
|||
user: {
|
||||
user_manage: 'User Manage',
|
||||
create_user: 'Create User',
|
||||
update_user: 'Update User',
|
||||
edit_user: 'Edit User',
|
||||
delete_user: 'Delete User',
|
||||
delete_confirm: 'Are you sure to delete?',
|
||||
delete_confirm_tip:
|
||||
|
|
|
|||
|
|
@ -1026,7 +1026,7 @@ const security = {
|
|||
user: {
|
||||
user_manage: '用户管理',
|
||||
create_user: '创建用户',
|
||||
update_user: '更新用户',
|
||||
edit_user: '编辑用户',
|
||||
delete_user: '删除用户',
|
||||
delete_confirm: '确定删除吗?',
|
||||
project: '项目',
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import type { ITaskState } from '@/utils/types'
|
||||
import type { ITaskState } from '@/common/types'
|
||||
|
||||
interface CodeReq {
|
||||
projectCode: number
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ interface AlertGroupIdReq {
|
|||
|
||||
interface UserReq {
|
||||
email: string
|
||||
tenantId: number
|
||||
tenantId: number | null
|
||||
userName: string
|
||||
userPassword: string
|
||||
phone?: string
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { timezoneList } from '@/utils/timezone'
|
||||
import { timezoneList } from '@/common/timezone'
|
||||
|
||||
type Timezone = typeof timezoneList[number]
|
||||
|
||||
|
|
|
|||
|
|
@ -15,17 +15,17 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export const copy = (text: string): boolean => {
|
||||
const range = document.createRange()
|
||||
const node = document.createTextNode(text)
|
||||
document.body.append(node)
|
||||
range.selectNode(node)
|
||||
window.getSelection()?.addRange(range)
|
||||
const copy = (text: string): boolean => {
|
||||
const inp = document.createElement('input')
|
||||
document.body.appendChild(inp)
|
||||
inp.value = text
|
||||
inp.select()
|
||||
let result = false
|
||||
try {
|
||||
result = document.execCommand('copy')
|
||||
} catch (err) {}
|
||||
window.getSelection()?.removeAllRanges()
|
||||
document.body.removeChild(node)
|
||||
inp.remove()
|
||||
return result
|
||||
}
|
||||
|
||||
export default copy
|
||||
|
|
|
|||
|
|
@ -20,13 +20,17 @@ import regex from './regex'
|
|||
import truncateText from './truncate-text'
|
||||
import log from './log'
|
||||
import downloadFile from './downloadFile'
|
||||
import copy from './clipboard'
|
||||
import removeUselessChildren from './tree-format'
|
||||
|
||||
const utils = {
|
||||
mapping,
|
||||
regex,
|
||||
truncateText,
|
||||
log,
|
||||
downloadFile
|
||||
downloadFile,
|
||||
copy,
|
||||
removeUselessChildren
|
||||
}
|
||||
|
||||
export default utils
|
||||
|
|
|
|||
|
|
@ -15,9 +15,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export function removeUselessChildren(
|
||||
const removeUselessChildren = (
|
||||
list: { children?: []; dirctory?: boolean; disabled?: boolean }[]
|
||||
) {
|
||||
) => {
|
||||
if (!list.length) return
|
||||
list.forEach((item) => {
|
||||
if (item.dirctory && item.children?.length === 0) item.disabled = true
|
||||
|
|
@ -29,3 +29,5 @@ export function removeUselessChildren(
|
|||
removeUselessChildren(item.children)
|
||||
})
|
||||
}
|
||||
|
||||
export default removeUselessChildren
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
* @param {string} text
|
||||
* Each Chinese character is equal to two chars
|
||||
*/
|
||||
export default function truncateText(text: string, n: number) {
|
||||
const truncateText = (text: string, n: number) => {
|
||||
const exp = /[\u4E00-\u9FA5]/
|
||||
let res = ''
|
||||
let len = text.length
|
||||
|
|
@ -49,3 +49,5 @@ export default function truncateText(text: string, n: number) {
|
|||
}
|
||||
return res
|
||||
}
|
||||
|
||||
export default truncateText
|
||||
|
|
@ -24,7 +24,7 @@ import TableAction from './components/table-action'
|
|||
import _ from 'lodash'
|
||||
import { format } from 'date-fns'
|
||||
import { TableColumns } from 'naive-ui/es/data-table/src/interface'
|
||||
import { parseTime } from '@/utils/common'
|
||||
import { parseTime } from '@/common/common'
|
||||
|
||||
export function useTable(viewRuleEntry = (unusedRuleJson: string): void => {}) {
|
||||
const { t } = useI18n()
|
||||
|
|
|
|||
|
|
@ -24,12 +24,12 @@ import {
|
|||
COLUMN_WIDTH_CONFIG,
|
||||
calculateTableWidth,
|
||||
DefaultTableWidth
|
||||
} from '@/utils/column-width-config'
|
||||
} from '@/common/column-width-config'
|
||||
import type {
|
||||
ResultItem,
|
||||
ResultListRes
|
||||
} from '@/service/modules/data-quality/types'
|
||||
import { parseTime } from '@/utils/common'
|
||||
import { parseTime } from '@/common/common'
|
||||
|
||||
export function useTable() {
|
||||
const { t } = useI18n()
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import { useColumns } from './use-columns'
|
|||
import { useTable } from './use-table'
|
||||
import styles from './index.module.scss'
|
||||
import type { TableColumns } from './types'
|
||||
import { DefaultTableWidth } from '@/common/column-width-config'
|
||||
|
||||
const list = defineComponent({
|
||||
name: 'list',
|
||||
|
|
@ -39,7 +40,10 @@ const list = defineComponent({
|
|||
const { t } = useI18n()
|
||||
const showDetailModal = ref(false)
|
||||
const selectId = ref()
|
||||
const columns = ref({ columns: [] as TableColumns, tableWidth: 0 })
|
||||
const columns = ref({
|
||||
columns: [] as TableColumns,
|
||||
tableWidth: DefaultTableWidth
|
||||
})
|
||||
const { data, changePage, changePageSize, deleteRecord, updateList } =
|
||||
useTable()
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ import {
|
|||
COLUMN_WIDTH_CONFIG,
|
||||
calculateTableWidth,
|
||||
DefaultTableWidth
|
||||
} from '@/utils/column-width-config'
|
||||
} from '@/common/column-width-config'
|
||||
import type { TableColumns } from './types'
|
||||
|
||||
export function useColumns(onCallback: Function) {
|
||||
|
|
@ -108,6 +108,7 @@ export function useColumns(onCallback: Function) {
|
|||
{
|
||||
circle: true,
|
||||
type: 'info',
|
||||
size: 'small',
|
||||
onClick: () => void onCallback(rowData.id, 'edit')
|
||||
},
|
||||
{
|
||||
|
|
@ -134,6 +135,7 @@ export function useColumns(onCallback: Function) {
|
|||
{
|
||||
circle: true,
|
||||
type: 'error',
|
||||
size: 'small',
|
||||
class: 'btn-delete'
|
||||
},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import { reactive, ref } from 'vue'
|
|||
import { useAsyncState } from '@vueuse/core'
|
||||
import { queryAuditLogListPaging } from '@/service/modules/audit'
|
||||
import { format } from 'date-fns'
|
||||
import { parseTime } from '@/utils/common'
|
||||
import { parseTime } from '@/common/common'
|
||||
import type { AuditListRes } from '@/service/modules/audit/types'
|
||||
|
||||
export function useTable() {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import { useI18n } from 'vue-i18n'
|
|||
import { useAsyncState } from '@vueuse/core'
|
||||
import ButtonLink from '@/components/button-link'
|
||||
import { queryProjectListPaging } from '@/service/modules/projects'
|
||||
import { parseTime } from '@/utils/common'
|
||||
import { parseTime } from '@/common/common'
|
||||
import { deleteProject } from '@/service/modules/projects'
|
||||
import { format } from 'date-fns'
|
||||
import { useRouter } from 'vue-router'
|
||||
|
|
@ -36,7 +36,7 @@ import {
|
|||
COLUMN_WIDTH_CONFIG,
|
||||
calculateTableWidth,
|
||||
DefaultTableWidth
|
||||
} from '@/utils/column-width-config'
|
||||
} from '@/common/column-width-config'
|
||||
import type { Router } from 'vue-router'
|
||||
import type { ProjectRes } from '@/service/modules/projects/types'
|
||||
import { DeleteOutlined, EditOutlined } from '@vicons/antd'
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import { NIcon } from 'naive-ui'
|
|||
import { useRelationCustomParams, useDependentTimeout } from '.'
|
||||
import { useTaskNodeStore } from '@/store/project/task-node'
|
||||
import { queryAllProjectList } from '@/service/modules/projects'
|
||||
import { tasksState } from '@/utils/common'
|
||||
import { tasksState } from '@/common/common'
|
||||
import {
|
||||
queryProcessDefinitionList,
|
||||
getTasksByDefinitionList
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import { ref, onMounted, watch } from 'vue'
|
|||
import { useI18n } from 'vue-i18n'
|
||||
import { queryResourceByProgramType } from '@/service/modules/resources'
|
||||
import { useTaskNodeStore } from '@/store/project/task-node'
|
||||
import { removeUselessChildren } from '@/utils/tree-format'
|
||||
import utils from '@/utils'
|
||||
import type { IJsonItem, ProgramType, IMainJar } from '../types'
|
||||
|
||||
export function useMainJar(model: { [field: string]: any }): IJsonItem {
|
||||
|
|
@ -37,7 +37,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
|
|||
type: 'FILE',
|
||||
programType
|
||||
})
|
||||
removeUselessChildren(res)
|
||||
utils.removeUselessChildren(res)
|
||||
mainJarOptions.value = res || []
|
||||
taskStore.updateMainJar(programType, res)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import { ref, onMounted } from 'vue'
|
|||
import { useI18n } from 'vue-i18n'
|
||||
import { queryResourceList } from '@/service/modules/resources'
|
||||
import { useTaskNodeStore } from '@/store/project/task-node'
|
||||
import { removeUselessChildren } from '@/utils/tree-format'
|
||||
import utils from '@/utils'
|
||||
import type { IJsonItem, IResource } from '../types'
|
||||
|
||||
export function useResources(): IJsonItem {
|
||||
|
|
@ -38,7 +38,7 @@ export function useResources(): IJsonItem {
|
|||
if (resourcesLoading.value) return
|
||||
resourcesLoading.value = true
|
||||
const res = await queryResourceList({ type: 'FILE' })
|
||||
removeUselessChildren(res)
|
||||
utils.removeUselessChildren(res)
|
||||
resourcesOptions.value = res || []
|
||||
resourcesLoading.value = false
|
||||
taskStore.updateResource(res)
|
||||
|
|
|
|||
|
|
@ -66,7 +66,8 @@ export function useTaskGroup(
|
|||
span: 12,
|
||||
name: t('project.node.task_group_name'),
|
||||
props: {
|
||||
loading
|
||||
loading,
|
||||
clearable: true
|
||||
},
|
||||
options
|
||||
},
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ export type {
|
|||
WorkflowInstance
|
||||
} from '@/views/projects/workflow/components/dag/types'
|
||||
export type { IResource, ProgramType, IMainJar } from '@/store/project/types'
|
||||
export type { ITaskState } from '@/utils/types'
|
||||
export type { ITaskState } from '@/common/types'
|
||||
|
||||
type SourceType = 'MYSQL' | 'HDFS' | 'HIVE'
|
||||
type ModelType = 'import' | 'export'
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ import {
|
|||
COLUMN_WIDTH_CONFIG,
|
||||
calculateTableWidth,
|
||||
DefaultTableWidth
|
||||
} from '@/utils/column-width-config'
|
||||
} from '@/common/column-width-config'
|
||||
import type {
|
||||
TaskDefinitionItem,
|
||||
TaskDefinitionRes
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ import Card from '@/components/card'
|
|||
import LogModal from '@/components/log-modal'
|
||||
import { useAsyncState } from '@vueuse/core'
|
||||
import { queryLog } from '@/service/modules/log'
|
||||
import { stateType } from '@/utils/common'
|
||||
import { stateType } from '@/common/common'
|
||||
import styles from './index.module.scss'
|
||||
|
||||
const TaskInstance = defineComponent({
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { ITaskState } from '@/utils/types'
|
||||
import { ITaskState } from '@/common/types'
|
||||
|
||||
export type { Router } from 'vue-router'
|
||||
export type { TaskInstancesRes } from '@/service/modules/task-instances/types'
|
||||
|
|
|
|||
|
|
@ -32,12 +32,12 @@ import {
|
|||
} from '@vicons/antd'
|
||||
import { format } from 'date-fns'
|
||||
import { useRoute, useRouter } from 'vue-router'
|
||||
import { parseTime, tasksState } from '@/utils/common'
|
||||
import { parseTime, tasksState } from '@/common/common'
|
||||
import {
|
||||
COLUMN_WIDTH_CONFIG,
|
||||
calculateTableWidth,
|
||||
DefaultTableWidth
|
||||
} from '@/utils/column-width-config'
|
||||
} from '@/common/column-width-config'
|
||||
import type { Router, TaskInstancesRes, IRecord, ITaskState } from './types'
|
||||
|
||||
export function useTable() {
|
||||
|
|
@ -155,7 +155,8 @@ export function useTable() {
|
|||
{
|
||||
title: t('project.task.host'),
|
||||
key: 'host',
|
||||
...COLUMN_WIDTH_CONFIG['name']
|
||||
...COLUMN_WIDTH_CONFIG['name'],
|
||||
render: (row: IRecord) => row.host || '-'
|
||||
},
|
||||
{
|
||||
title: t('project.task.operation'),
|
||||
|
|
@ -206,6 +207,7 @@ export function useTable() {
|
|||
circle: true,
|
||||
type: 'info',
|
||||
size: 'small',
|
||||
disabled: !row.host,
|
||||
onClick: () => handleLog(row)
|
||||
},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import { defineComponent, onMounted, PropType, inject, ref } from 'vue'
|
|||
import { useI18n } from 'vue-i18n'
|
||||
import { useRoute } from 'vue-router'
|
||||
import styles from './menu.module.scss'
|
||||
import { uuid } from '@/utils/common'
|
||||
import { uuid } from '@/common/common'
|
||||
import { IWorkflowTaskInstance } from './types'
|
||||
|
||||
const props = {
|
||||
|
|
|
|||
|
|
@ -209,7 +209,7 @@ export default defineComponent({
|
|||
<NSwitch v-model:value={formValue.value.timeoutFlag} />
|
||||
</NFormItem>
|
||||
{formValue.value.timeoutFlag && (
|
||||
<NFormItem label=' ' path='timeout'>
|
||||
<NFormItem showLabel={false} path='timeout'>
|
||||
<NInputNumber
|
||||
v-model:value={formValue.value.timeout}
|
||||
show-button={false}
|
||||
|
|
@ -255,14 +255,14 @@ export default defineComponent({
|
|||
/>
|
||||
</NFormItem>
|
||||
{props.definition && !props.instance && (
|
||||
<NFormItem path='timeoutFlag'>
|
||||
<NFormItem path='timeoutFlag' showLabel={false}>
|
||||
<NCheckbox v-model:checked={formValue.value.release}>
|
||||
{t('project.dag.online_directly')}
|
||||
</NCheckbox>
|
||||
</NFormItem>
|
||||
)}
|
||||
{props.instance && (
|
||||
<NFormItem path='sync'>
|
||||
<NFormItem path='sync' showLabel={false}>
|
||||
<NCheckbox v-model:checked={formValue.value.sync}>
|
||||
{t('project.dag.update_directly')}
|
||||
</NCheckbox>
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import { defineComponent, onMounted, PropType, ref, computed } from 'vue'
|
|||
import { useI18n } from 'vue-i18n'
|
||||
import { listAlertGroupById } from '@/service/modules/alert-group'
|
||||
import { queryAllWorkerGroups } from '@/service/modules/worker-groups'
|
||||
import { runningType, warningTypeList } from '@/utils/common'
|
||||
import { runningType, warningTypeList } from '@/common/common'
|
||||
import { IStartupParam } from './types'
|
||||
import styles from './startup.module.scss'
|
||||
|
||||
|
|
|
|||
|
|
@ -351,7 +351,7 @@ export default defineComponent({
|
|||
onEdit={editTask}
|
||||
onCopyTask={copyTask}
|
||||
onRemoveTasks={removeTasks}
|
||||
onViewLog={viewLog}
|
||||
onViewLog={handleViewLog}
|
||||
/>
|
||||
{!!props.definition && (
|
||||
<StartModal
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
|
||||
import { TaskType } from '@/views/projects/task/constants/task-type'
|
||||
export type { ITaskState } from '@/utils/types'
|
||||
export type { ITaskState } from '@/common/types'
|
||||
|
||||
export interface ProcessDefinition {
|
||||
id: number
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@
|
|||
import { render, h, ref } from 'vue'
|
||||
import { useRoute } from 'vue-router'
|
||||
import { useI18n } from 'vue-i18n'
|
||||
import { tasksState } from '@/utils/common'
|
||||
import { tasksState } from '@/common/common'
|
||||
import { NODE, NODE_STATUS_MARKUP } from './dag-config'
|
||||
import { queryTaskListByProcessId } from '@/service/modules/process-instances'
|
||||
import NodeStatus from '@/views/projects/workflow/components/dag/dag-node-status'
|
||||
|
|
@ -79,9 +79,12 @@ export function useNodeStatus(options: Options) {
|
|||
if (taskList.value) {
|
||||
taskList.value.forEach((taskInstance: any) => {
|
||||
setNodeStatus(taskInstance.taskCode, taskInstance.state, taskInstance)
|
||||
nodeStore.updateDependentResult(
|
||||
JSON.parse(taskInstance.dependentResult)
|
||||
)
|
||||
|
||||
if (taskInstance.dependentResult) {
|
||||
nodeStore.updateDependentResult(
|
||||
JSON.parse(taskInstance.dependentResult)
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { copy } from '@/utils/clipboard'
|
||||
import utils from '@/utils'
|
||||
import { useMessage } from 'naive-ui'
|
||||
import { useI18n } from 'vue-i18n'
|
||||
|
||||
|
|
@ -26,7 +26,7 @@ export function useTextCopy() {
|
|||
const { t } = useI18n()
|
||||
const message = useMessage()
|
||||
const copyText = (text: string) => {
|
||||
if (copy(text)) {
|
||||
if (utils.copy(text)) {
|
||||
message.success(t('project.dag.copy_success'))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -186,8 +186,12 @@ export default defineComponent({
|
|||
})
|
||||
|
||||
watch(
|
||||
() => props.row,
|
||||
() => getStartParamsList(props.row.code)
|
||||
() => props.show,
|
||||
() => {
|
||||
if (props.show) {
|
||||
getStartParamsList(props.row.code)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
|
|
@ -381,7 +385,7 @@ export default defineComponent({
|
|||
) : (
|
||||
<NSpace vertical>
|
||||
{this.startParamsList.map((item, index) => (
|
||||
<NSpace class={styles.startup} key={index}>
|
||||
<NSpace class={styles.startup} key={Date.now() + index}>
|
||||
<NInput
|
||||
pair
|
||||
separator=':'
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ import {
|
|||
NPopover
|
||||
} from 'naive-ui'
|
||||
import { ArrowDownOutlined, ArrowUpOutlined } from '@vicons/antd'
|
||||
import { timezoneList } from '@/utils/timezone'
|
||||
import { timezoneList } from '@/common/timezone'
|
||||
import Crontab from '@/components/crontab'
|
||||
|
||||
const props = {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import _ from 'lodash'
|
||||
import _, { cloneDeep } from 'lodash'
|
||||
import { reactive, SetupContext } from 'vue'
|
||||
import { useI18n } from 'vue-i18n'
|
||||
import { useRoute, useRouter } from 'vue-router'
|
||||
|
|
@ -35,7 +35,7 @@ import {
|
|||
updateSchedule,
|
||||
previewSchedule
|
||||
} from '@/service/modules/schedules'
|
||||
import { parseTime } from '@/utils/common'
|
||||
import { parseTime } from '@/common/common'
|
||||
import { EnvironmentItem } from '@/service/modules/environment/types'
|
||||
import { ITimingState } from './types'
|
||||
|
||||
|
|
@ -56,6 +56,10 @@ export function useModal(
|
|||
schedulePreviewList: []
|
||||
})
|
||||
|
||||
const cachedStartParams = {} as {
|
||||
[key: string]: { prop: string; value: string }[]
|
||||
}
|
||||
|
||||
const resetImportForm = () => {
|
||||
state.importForm.name = ''
|
||||
state.importForm.file = ''
|
||||
|
|
@ -237,9 +241,14 @@ export function useModal(
|
|||
}
|
||||
|
||||
const getStartParamsList = (code: number) => {
|
||||
if (cachedStartParams[code]) {
|
||||
variables.startParamsList = cloneDeep(cachedStartParams[code])
|
||||
return
|
||||
}
|
||||
queryProcessDefinitionByCode(code, variables.projectCode).then(
|
||||
(res: any) => {
|
||||
variables.startParamsList = res.processDefinition.globalParamList
|
||||
cachedStartParams[code] = cloneDeep(variables.startParamsList)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue