Compare commits

..

26 Commits

Author SHA1 Message Date
Jiajie Zhong de50f43de6
[common] Make dolphinscheduler_env.sh work when start server (#9726)
* [common] Make dolphinscheduler_env.sh work

* Change dist tarball `dolphinscheduler_env.sh` location
  from `bin/` to `conf/`, which users could finish their
  change configuration operation in one single directory.
  and we only need to add `$DOLPHINSCHEDULER_HOME/conf`
  when we start our sever instead of adding both
  `$DOLPHINSCHEDULER_HOME/conf` and `$DOLPHINSCHEDULER_HOME/bin`
* Change the `start.sh`'s path of `dolphinscheduler_env.sh`
* Change the setting order of `dolphinscheduler_env.sh`
* `bin/env/dolphinscheduler_env.sh` will overwrite the `<server>/conf/dolphinscheduler_env.sh`
when start the server using `bin/dolphinsceduler_daemon.sh` or `bin/install.sh`
* Change the related docs
2022-04-25 15:35:43 +08:00
WangJPLeo 7bcec7115a
[Fix-9717] The failure policy of the task flow takes effect (#9718)
* Failure policy takes effect.

* Coverage on New Code

* correct description logic

* Compatible with all scenarios

* clearer logic

Co-authored-by: WangJPLeo <wangjipeng@whaleops.com>
2022-04-25 15:29:18 +08:00
Tq e6dade71bb
fix process instance global param not include to task instance when master executing (#9730) 2022-04-25 14:05:40 +08:00
songjianet 9abcbbac2e
[Fix][UI Next][V1.0.0-Beta] Fix the bug that the tenant is 0 when editing a user. (#9739) 2022-04-25 14:05:27 +08:00
caishunfeng 0176f4bf61
[Bug-9737][Api] fix task plugin load in api (#9744)
* fix task plugin load in api

* task plugin loading by event
2022-04-25 13:26:34 +08:00
Amy0104 1f9660b80d
[Fix][UI][V1.0.0-Beta] Set the default value of host to '-' and Disable log button without host data. (#9742) 2022-04-25 12:27:08 +08:00
songjianet 15eb1618b4
[Fix][UI Next][V1.0.0-Beta] Fix citation errors. (#9741) 2022-04-25 11:37:21 +08:00
Paul Zhang cc40816f87
[Bug][Script] Fix the type of variable workersGroupMap is not supported in bash 3.x (#9614) 2022-04-25 11:32:09 +08:00
labbomb ca98a4a144
[Bug]Fixed the problem of no request from right click viewing log (#9728)
* The utils configuration files are centrally managed under common

* [Bug]Fixed the problem of no request from right click viewing log
2022-04-25 10:26:52 +08:00
陈家名 8a8b63cd96
[Improve][python] Support create table syntax and custom sql type param (#9673) 2022-04-25 10:17:20 +08:00
Devosend cd82f45d5e
[Bug][UI][V1.0.0-Beta] Fix action buttons not displayed on one line bug (#9700) 2022-04-25 10:04:04 +08:00
Devosend 3faa65ef0c
[Bug] [UI][V1.0.0-Beta] Fix task group name can't clear bug (#9708) 2022-04-25 10:03:11 +08:00
caishunfeng 5657cb9aec
[Bug-9719][Master] fix failover fail because task plugins has not been loaded (#9720) 2022-04-24 20:34:21 +08:00
gaojun2048 ebc4253d50
[fix][Service] BusinessTime should format with schedule timezone (#9714)
* BusinessTime should format with schedule timezone

* fix test error

* fix test error

* fix test error
2022-04-24 19:21:21 +08:00
Amy0104 257380467e
[Fix][UI Next][V1.0.0-Beta] Set the timout label to not show. (#9710) 2022-04-24 17:56:10 +08:00
labbomb 7382284b7d
[Feature]The utils configuration files are centrally managed under common (#9706) 2022-04-24 17:46:47 +08:00
Devosend 48d526f275
[Fix][UI Next][V1.0.0-Beta] Fix bug where route is error in file manage root (#9697) 2022-04-24 15:29:50 +08:00
Devosend 0643fe44a4
[Fix][UI Next][V1.0.0-Beta] Fix success logo is not display bug (#9694) 2022-04-24 15:28:26 +08:00
labbomb b276c372d4
[Feature]Unified exposure method class (#9698) 2022-04-24 15:27:11 +08:00
Amy0104 3e851940e8
[Fix][UI Next][V1.0.0-Beta] Fix the startup parameter display error. (#9692)
* [Fix][UI Next][V1.0.0-Beta] Fix the startup parameter display error.

* [Fix][UI Next][V1.0.0-Beta] Change the key of the startup parameter item.
2022-04-24 14:15:53 +08:00
songjianet 86bdb826dc
[Fix][UI Next][V1.0.0-Beta] Change update user to edit user. (#9683) 2022-04-24 14:14:12 +08:00
Tq a51b710b1c
fix alert msg and change primitive to String to avoid wrong format (#9689) 2022-04-24 13:29:27 +08:00
Devosend 99678c097c
[Fix][UI Next][V1.0.0-Beta] Fix bug where name copy is invalid (#9684) 2022-04-24 11:56:58 +08:00
Mr.An 29a0ea32c6
[Fix] Support more generic tenant code when create tenant (#9634) 2022-04-23 18:41:03 +08:00
mans2singh 4799b27e33
[hotfix][docker] Removed extra equals in mail user (#9677) 2022-04-23 12:32:31 +08:00
yimaixinchen fb11525e49
[chore] Correct the java doc in funtion DagHelperTest generateDag2 (#9602)
Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
2022-04-23 10:31:48 +08:00
137 changed files with 695 additions and 367 deletions

View File

@ -397,21 +397,41 @@ apiServers="ds1"
### dolphinscheduler_env.sh [load environment variables configs]
When using shell to commit tasks, DS will load environment variables inside dolphinscheduler_env.sh into the host.
Types of tasks involved are: Shell, Python, Spark, Flink, DataX, etc.
When using shell to commit tasks, DolphinScheduler will export environment variables from `bin/env/dolphinscheduler_env.sh`. The
mainly configuration including `JAVA_HOME`, mata database, registry center, and task configuration.
```bash
export HADOOP_HOME=/opt/soft/hadoop
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/java
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax/bin/datax.py
# JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
# Database related configuration, set database type, username and password
export DATABASE=${DATABASE:-postgresql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_DRIVER_CLASS_NAME
export SPRING_DATASOURCE_URL
export SPRING_DATASOURCE_USERNAME
export SPRING_DATASOURCE_PASSWORD
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
```
### Services logback configs

View File

@ -218,7 +218,7 @@ A: 1, in **the process definition list**, click the **Start** button.
## Q : Python task setting Python version
A: 1**for the version after 1.0.3** only need to modify PYTHON_HOME in conf/env/.dolphinscheduler_env.sh
A: 1**for the version after 1.0.3** only need to modify PYTHON_HOME in `bin/env/dolphinscheduler_env.sh`
```
export PYTHON_HOME=/bin/python

View File

@ -73,10 +73,10 @@ sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
datasource.properties: database connection information
zookeeper.properties: information for connecting zk
common.properties: Configuration information about the resource store (if hadoop is set up, please check if the core-site.xml and hdfs-site.xml configuration files exist).
env/dolphinscheduler_env.sh: environment Variables
dolphinscheduler_env.sh: environment Variables
````
- Modify the `dolphinscheduler_env.sh` environment variable in the `conf/env` directory according to the machine configuration (the following is the example that all the used software install under `/opt/soft`)
- Modify the `dolphinscheduler_env.sh` environment variable in the `bin/env/dolphinscheduler_env.sh` directory according to the machine configuration (the following is the example that all the used software install under `/opt/soft`)
```shell
export HADOOP_HOME=/opt/soft/hadoop

View File

@ -6,7 +6,7 @@ If you are a new hand and want to experience DolphinScheduler functions, we reco
## Deployment Steps
Cluster deployment uses the same scripts and configuration files as [pseudo-cluster deployment](pseudo-cluster.md), so the preparation and deployment steps are the same as pseudo-cluster deployment. The difference is that [pseudo-cluster deployment](pseudo-cluster.md) is for one machine, while cluster deployment (Cluster) is for multiple machines. And steps of "Modify Configuration" are quite different between pseudo-cluster deployment and cluster deployment.
Cluster deployment uses the same scripts and configuration files as [pseudo-cluster deployment](pseudo-cluster.md), so the preparation and deployment steps are the same as pseudo-cluster deployment. The difference is that pseudo-cluster deployment is for one machine, while cluster deployment (Cluster) is for multiple machines. And steps of "Modify Configuration" are quite different between pseudo-cluster deployment and cluster deployment.
### Prerequisites and DolphinScheduler Startup Environment Preparations
@ -32,8 +32,8 @@ apiServers="ds5"
## Start and Login DolphinScheduler
Same as pseudo-cluster.md](pseudo-cluster.md)
Same as [pseudo-cluster](pseudo-cluster.md)
## Start and Stop Server
Same as pseudo-cluster.md](pseudo-cluster.md)
Same as [pseudo-cluster](pseudo-cluster.md)

View File

@ -87,7 +87,13 @@ sh script/create-dolphinscheduler.sh
## Modify Configuration
After completing the preparation of the basic environment, you need to modify the configuration file according to your environment. The configuration file is in the path of `conf/config/install_config.conf`. Generally, you just need to modify the **INSTALL MACHINE, DolphinScheduler ENV, Database, Registry Server** part to complete the deployment, the following describes the parameters that must be modified:
After completing the preparation of the basic environment, you need to modify the configuration file according to the
environment you used. The configuration files are both in directory `bin/env` and named `install_env.sh` and `dolphinscheduler_env.sh`.
### Modify `install_env.sh`
File `install_env.sh` describes which machines will be installed DolphinScheduler and what server will be installed on
each machine. You could find this file in the path `bin/env/install_env.sh` and the detail of the configuration as below.
```shell
# ---------------------------------------------------------
@ -105,29 +111,30 @@ installPath="~/dolphinscheduler"
# Deploy user, use the user you create in section **Configure machine SSH password-free login**
deployUser="dolphinscheduler"
```
# ---------------------------------------------------------
# DolphinScheduler ENV
# ---------------------------------------------------------
# The path of JAVA_HOME, which JDK install path in section **Preparation**
javaHome="/your/java/home/here"
### Modify `dolphinscheduler_env.sh`
# ---------------------------------------------------------
# Database
# ---------------------------------------------------------
# Database type, username, password, IP, port, metadata. For now `dbtype` supports `mysql` and `postgresql`
dbtype="mysql"
dbhost="localhost:3306"
# Need to modify if you are not using `dolphinscheduler/dolphinscheduler` as your username and password
username="dolphinscheduler"
password="dolphinscheduler"
dbname="dolphinscheduler"
File `dolphinscheduler_env.sh` describes the database configuration of DolphinScheduler, which in the path `bin/env/dolphinscheduler_env.sh`
and some tasks which need external dependencies or libraries such as `JAVA_HOME` and `SPARK_HOME`. You could ignore the
task external dependencies if you do not use those tasks, but you have to change `JAVA_HOME`, registry center and database
related configurations based on your environment.
# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
# Registration center address, the address of ZooKeeper service
registryServers="localhost:2181"
```sh
# JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/custom/path}
# Database related configuration, set database type, username and password
export DATABASE=${DATABASE:-postgresql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.postgresql.Driver
export SPRING_DATASOURCE_URL="jdbc:postgresql://127.0.0.1:5432/dolphinscheduler"
export SPRING_DATASOURCE_USERNAME="username"
export SPRING_DATASOURCE_PASSWORD="password"
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
```
## Initialize the Database
@ -178,7 +185,7 @@ sh tools/bin/create-schema.sh
Use **deployment user** you created above, running the following command to complete the deployment, and the server log will be stored in the logs folder.
```shell
sh install.sh
sh ./bin/install.sh
```
> **_Note:_** For the first time deployment, there maybe occur five times of `sh: bin/dolphinscheduler-daemon.sh: No such file or directory` in the terminal,
@ -214,7 +221,12 @@ sh ./bin/dolphinscheduler-daemon.sh start alert-server
sh ./bin/dolphinscheduler-daemon.sh stop alert-server
```
> **_Note:_**: Please refer to the section of "System Architecture Design" for service usage. Python gateway service is
> **_Note1:_**: Each server have `dolphinscheduler_env.sh` file in path `<server-name>/conf/dolphinscheduler_env.sh` which
> for micro-services need. It means that you could start all servers by command `<server-name>/bin/start.sh` with different
> environment variable from `bin/env/dolphinscheduler_env.sh`. But it will use file `bin/env/dolphinscheduler_env.sh` overwrite
> `<server-name>/conf/dolphinscheduler_env.sh` if you start server with command `/bin/dolphinscheduler-daemon.sh start <server-name>`.
> **_Note2:_**: Please refer to the section of "System Architecture Design" for service usage. Python gateway service is
> started along with the api-server, and if you do not want to start Python gateway service please disabled it by changing
> the yaml config `python-gateway.enabled : false` in api-server's configuration path `api-server/conf/application.yaml`

View File

@ -977,7 +977,7 @@ Configure the mail service port for `alert-server`, default value `empty`.
Configure the mail sender for `alert-server`, default value `empty`.
**`MAIL_USER=`**
**`MAIL_USER`**
Configure the user name of the mail service for `alert-server`, default value `empty`.

View File

@ -40,7 +40,7 @@ This example demonstrates how to import data from Hive into MySQL.
### Configure the DataX environment in DolphinScheduler
If you are using the DataX task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
If you are using the DataX task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
![datax_task01](/img/tasks/demo/datax_task01.png)

View File

@ -46,7 +46,7 @@ This is a common introductory case in the big data ecosystem, which often apply
#### Configure the flink environment in DolphinScheduler
If you are using the flink task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
If you are using the flink task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
![demo-flink-simple](/img/tasks/demo/flink_task01.png)

View File

@ -54,7 +54,7 @@ This example is a common introductory type of MapReduce application, which used
#### Configure the MapReduce Environment in DolphinScheduler
If you are using the MapReduce task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
If you are using the MapReduce task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
![mr_configure](/img/tasks/demo/mr_task01.png)

View File

@ -45,7 +45,7 @@ This is a common introductory case in the big data ecosystem, which often apply
#### Configure the Spark Environment in DolphinScheduler
If you are using the Spark task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
If you are using the Spark task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: `bin/env/dolphinscheduler_env.sh`.
![spark_configure](/img/tasks/demo/spark_task01.png)

View File

@ -380,21 +380,42 @@ apiServers="ds1"
```
## 11.dolphinscheduler_env.sh [环境变量配置]
通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中.
涉及到的任务类型有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等
通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中. 涉及到的 `JAVA_HOME`、元数据库、注册中心和任务类型配置,其中任务
类型主要有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等
```bash
export HADOOP_HOME=/opt/soft/hadoop
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/java
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax/bin/datax.py
# JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
# Database related configuration, set database type, username and password
export DATABASE=${DATABASE:-postgresql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_DRIVER_CLASS_NAME
export SPRING_DATASOURCE_URL
export SPRING_DATASOURCE_USERNAME
export SPRING_DATASOURCE_PASSWORD
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
# Tasks related configurations, need to change the configuration if you use the related tasks.
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
export SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
```
## 12.各服务日志配置文件

View File

@ -203,7 +203,7 @@ A 1在 **流程定义列表**,点击 **启动** 按钮
## QPython 任务设置 Python 版本
A 只需要修改 conf/env/dolphinscheduler_env.sh 中的 PYTHON_HOME
A 只需要修改 `bin/env/dolphinscheduler_env.sh` 中的 PYTHON_HOME
```
export PYTHON_HOME=/bin/python

View File

@ -71,7 +71,7 @@ sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
datasource.properties 中的数据库连接信息.
zookeeper.properties 中的连接zk的信息.
common.properties 中关于资源存储的配置信息(如果设置了hadoop,请检查是否存在core-site.xml和hdfs-site.xml配置文件).
env/dolphinscheduler_env.sh 中的环境变量
dolphinscheduler_env.sh 中的环境变量
````
- 根据机器配置,修改 conf/env 目录下的 `dolphinscheduler_env.sh` 环境变量(以相关用到的软件都安装在/opt/soft下为例)

View File

@ -6,7 +6,7 @@
## 部署步骤
集群部署(Cluster)使用的脚本和配置文件与[伪集群部署](pseudo-cluster.md)中的配置一样,所以所需要的步骤也与[伪集群部署](pseudo-cluster.md)大致一样。区别就是[伪集群部署](pseudo-cluster.md)针对的是一台机器,而集群部署(Cluster)需要针对多台机器,且两者“修改相关配置”步骤区别较大
集群部署(Cluster)使用的脚本和配置文件与[伪集群部署](pseudo-cluster.md)中的配置一样,所以所需要的步骤也与伪集群部署大致一样。区别就是伪集群部署针对的是一台机器,而集群部署(Cluster)需要针对多台机器,且两者“修改相关配置”步骤区别较大
### 前置准备工作 && 准备 DolphinScheduler 启动环境
@ -14,7 +14,7 @@
### 修改相关配置
这个是与[伪集群部署](pseudo-cluster.md)差异较大的一步,因为部署脚本会通过 `scp` 的方式将安装需要的资源传输到各个机器上,所以这一步我们仅需要修改运行`install.sh`脚本的所在机器的配置即可。配置文件在路径在`conf/config/install_config.conf`下,此处我们仅需修改**INSTALL MACHINE****DolphinScheduler ENV、Database、Registry Server**与[伪集群部署](pseudo-cluster.md)保持一致,下面对必须修改参数进行说明
这个是与[伪集群部署](pseudo-cluster.md)差异较大的一步,因为部署脚本会通过 `scp` 的方式将安装需要的资源传输到各个机器上,所以这一步我们仅需要修改运行`install.sh`脚本的所在机器的配置即可。配置文件在路径在`conf/config/install_config.conf`下,此处我们仅需修改**INSTALL MACHINE****DolphinScheduler ENV、Database、Registry Server**与伪集群部署保持一致,下面对必须修改参数进行说明
```shell
# ---------------------------------------------------------

View File

@ -87,47 +87,51 @@ sh script/create-dolphinscheduler.sh
## 修改相关配置
完成了基础环境的准备后,在运行部署命令前,还需要根据环境修改配置文件。配置文件在路径在`conf/config/install_config.conf`下,一般部署只需要修改**INSTALL MACHINE、DolphinScheduler ENV、Database、Registry Server**部分即可完成部署,下面对必须修改参数进行说明
完成基础环境的准备后,需要根据你的机器环境修改配置文件。配置文件可以在目录 `bin/env` 中找到,他们分别是 并命名为 `install_env.sh``dolphinscheduler_env.sh`
### 修改 `install_env.sh` 文件
文件 `install_env.sh` 描述了哪些机器将被安装 DolphinScheduler 以及每台机器对应安装哪些服务。您可以在路径 `bin/env/install_env.sh` 中找到此文件,配置详情如下。
```shell
# ---------------------------------------------------------
# INSTALL MACHINE
# ---------------------------------------------------------
# 因为是在单节点上部署master、worker、API server所以服务器的IP均为机器IP或者localhost
# Due to the master, worker, and API server being deployed on a single node, the IP of the server is the machine IP or localhost
ips="localhost"
masters="localhost"
workers="localhost:default"
alertServer="localhost"
apiServers="localhost"
# DolphinScheduler安装路径,如果不存在会创建
# DolphinScheduler installation path, it will auto-create if not exists
installPath="~/dolphinscheduler"
# 部署用户,填写在 **配置用户免密及权限** 中创建的用户
# Deploy user, use the user you create in section **Configure machine SSH password-free login**
deployUser="dolphinscheduler"
```
# ---------------------------------------------------------
# DolphinScheduler ENV
# ---------------------------------------------------------
# JAVA_HOME 的路径,是在 **前置准备工作** 安装的JDK中 JAVA_HOME 所在的位置
javaHome="/your/java/home/here"
### 修改 `dolphinscheduler_env.sh` 文件
# ---------------------------------------------------------
# Database
# ---------------------------------------------------------
# 数据库的类型用户名密码IP端口元数据库db。其中dbtype目前支持 mysql 和 postgresql
dbtype="mysql"
dbhost="localhost:3306"
# 如果你不是以 dolphinscheduler/dolphinscheduler 作为用户名和密码的,需要进行修改
username="dolphinscheduler"
password="dolphinscheduler"
dbname="dolphinscheduler"
文件 `dolphinscheduler_env.sh` 描述了 DolphinScheduler 的数据库配置,一些任务类型外部依赖路径或库文件,注册中心,其中 `JAVA_HOME`
`SPARK_HOME`都是在这里定义的,其路径是 `bin/env/dolphinscheduler_env.sh`。如果您不使用某些任务类型,您可以忽略任务外部依赖项,
但您必须根据您的环境更改 `JAVA_HOME`、注册中心和数据库相关配置。
# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
# 注册中心地址zookeeper服务的地址
registryServers="localhost:2181"
```sh
# JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/custom/path}
# Database related configuration, set database type, username and password
export DATABASE=${DATABASE:-postgresql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.postgresql.Driver
export SPRING_DATASOURCE_URL="jdbc:postgresql://127.0.0.1:5432/dolphinscheduler"
export SPRING_DATASOURCE_USERNAME="username"
export SPRING_DATASOURCE_PASSWORD="password"
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
```
## 初始化数据库
@ -178,7 +182,7 @@ sh tools/bin/create-schema.sh
使用上面创建的**部署用户**运行以下命令完成部署,部署后的运行日志将存放在 logs 文件夹内
```shell
sh install.sh
sh ./bin/install.sh
```
> **_注意:_** 第一次部署的话,可能出现 5 次`sh: bin/dolphinscheduler-daemon.sh: No such file or directory`相关信息,次为非重要信息直接忽略即可
@ -213,7 +217,13 @@ sh ./bin/dolphinscheduler-daemon.sh start alert-server
sh ./bin/dolphinscheduler-daemon.sh stop alert-server
```
> **_注意:_**:服务用途请具体参见《系统架构设计》小节
> **_注意1:_**: 每个服务在路径 `<server-name>/conf/dolphinscheduler_env.sh` 中都有 `dolphinscheduler_env.sh` 文件,这是可以为微
> 服务需求提供便利。意味着您可以基于不同的环境变量来启动各个服务,只需要在对应服务中配置 `bin/env/dolphinscheduler_env.sh` 然后通过 `<server-name>/bin/start.sh`
> 命令启动即可。但是如果您使用命令 `/bin/dolphinscheduler-daemon.sh start <server-name>` 启动服务器,它将会用文件 `bin/env/dolphinscheduler_env.sh`
> 覆盖 `<server-name>/conf/dolphinscheduler_env.sh` 然后启动服务,目的是为了减少用户修改配置的成本.
> **_注意2:_**服务用途请具体参见《系统架构设计》小节。Python gateway service 默认与 api-server 一起启动,如果您不想启动 Python gateway service
> 请通过更改 api-server 配置文件 `api-server/conf/application.yaml` 中的 `python-gateway.enabled : false` 来禁用它。
[jdk]: https://www.oracle.com/technetwork/java/javase/downloads/index.html
[zookeeper]: https://zookeeper.apache.org/releases.html

View File

@ -40,7 +40,7 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点worker
### 在 DolphinScheduler 中配置 DataX 环境
若生产环境中要是使用到 DataX 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
若生产环境中要是使用到 DataX 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
![datax_task01](/img/tasks/demo/datax_task01.png)

View File

@ -46,7 +46,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点worker
#### 在 DolphinScheduler 中配置 flink 环境
若生产环境中要是使用到 flink 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
若生产环境中要是使用到 flink 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
![flink-configure](/img/tasks/demo/flink_task01.png)

View File

@ -54,7 +54,7 @@ MapReduce(MR) 任务类型,用于执行 MapReduce 程序。对于 MapReduce
#### 在 DolphinScheduler 中配置 MapReduce 环境
若生产环境中要是使用到 MapReduce 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
若生产环境中要是使用到 MapReduce 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
![mr_configure](/img/tasks/demo/mr_task01.png)

View File

@ -46,7 +46,7 @@ Spark 任务类型,用于执行 Spark 程序。对于 Spark 节点worker
#### 在 DolphinScheduler 中配置 Spark 环境
若生产环境中要是使用到 Spark 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
若生产环境中要是使用到 Spark 任务类型,则需要先配置好所需的环境。配置文件如下:`bin/env/dolphinscheduler_env.sh`。
![spark_configure](/img/tasks/demo/spark_task01.png)

View File

@ -15,7 +15,7 @@
## 4. 数据库升级
- 将`./tools/conf/application.yaml`中的username和password改成你设定数据库用户名和密码
- 如果选择 MySQL请修改`./tools/bin/dolphinscheduler_env.sh`中的如下配置, 还需要手动添加 [[ mysql-connector-java 驱动 jar ](https://downloads.MySQL.com/archives/c-j/)] 包到 lib 目录(`./tools/lib`这里下载的是mysql-connector-java-8.0.16.jar
- 如果选择 MySQL请修改`./tools/bin/dolphinscheduler_env.sh`中的如下配置, 还需要手动添加 [ mysql-connector-java 驱动 jar ](https://downloads.MySQL.com/archives/c-j/) 包到 lib 目录(`./tools/lib`这里下载的是mysql-connector-java-8.0.16.jar
```shell
export DATABASE=${DATABASE:-mysql}

View File

@ -41,7 +41,7 @@
</fileSet>
<fileSet>
<directory>${basedir}/../../script/env</directory>
<outputDirectory>bin</outputDirectory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>

View File

@ -19,7 +19,7 @@
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$BIN_DIR/dolphinscheduler_env.sh"
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

View File

@ -41,7 +41,7 @@
</fileSet>
<fileSet>
<directory>${basedir}/../script/env</directory>
<outputDirectory>bin</outputDirectory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>

View File

@ -19,7 +19,7 @@
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$BIN_DIR/dolphinscheduler_env.sh"
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

View File

@ -17,18 +17,31 @@
package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
@ServletComponentScan
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
public class ApiApplicationServer {
@Autowired
private TaskPluginManager taskPluginManager;
public static void main(String[] args) {
SpringApplication.run(ApiApplicationServer.class);
}
@EventListener
public void run(ApplicationReadyEvent readyEvent) {
// install task plugin
taskPluginManager.installPlugin();
}
}

View File

@ -215,10 +215,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode));
} else if (!checkSubProcessDefinitionValid(processDefinition)){
// check sub process definition online
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
@ -488,7 +488,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessInstanceId(instanceId);
if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
}

View File

@ -482,7 +482,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId());
if (tenant != null) {
@ -576,7 +576,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
@ -699,7 +699,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
@ -711,7 +711,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check process definition is already online
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, code);
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(code));
return result;
}
// check process instances is already running
@ -777,7 +777,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
switch (releaseState) {
@ -1341,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
DagData dagData = processService.genDagData(processDefinition);
@ -1487,7 +1487,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (null == processDefinition || projectCode != processDefinition.getProjectCode()) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
@ -1897,7 +1897,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
if (processDefinition.getVersion() == version) {
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
@ -2085,7 +2085,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
@ -2186,7 +2186,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);

View File

@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@ -149,6 +150,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private ScheduleMapper scheduleMapper;
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
*/
@ -472,7 +476,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
//
Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
String timezoneId = null;
if (commandParamMap == null || StringUtils.isBlank(commandParamMap.get(Constants.SCHEDULE_TIMEZONE))) {
timezoneId = loginUser.getTimeZone();
} else {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
}
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout, timezoneId);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
@ -538,7 +552,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* update process instance attributes
*/
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout) {
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) {
Date schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime);
@ -546,7 +560,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setScheduleTime(schedule);
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule);
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone);
processInstance.setTimeout(timeout);
processInstance.setTenantCode(tenantCode);
processInstance.setGlobalParams(globalParams);
@ -672,9 +686,14 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
Map<String, String> commandParam = JSONUtils.toMap(processInstance.getCommandParam());
String timezone = null;
if (commandParam != null) {
timezone = commandParam.get(Constants.SCHEDULE_TIMEZONE);
}
Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
processInstance.getScheduleTime(), timezone);
String userDefinedParams = processInstance.getGlobalParams();
// global params
List<Property> globalParams = new ArrayList<>();

View File

@ -107,7 +107,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
if (processDefinition.getProjectCode() != projectCode) {
@ -122,7 +122,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
.collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation));
if (!preTaskCodeMap.isEmpty()) {
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, String.valueOf(processDefinitionCode));
return result;
}
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
@ -202,12 +202,12 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (null == taskDefinition) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
@ -305,7 +305,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(upstreamList.get(0).getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
@ -364,7 +364,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(downstreamList.get(0).getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
@ -468,7 +468,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);

View File

@ -257,7 +257,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(schedule.getProcessDefinitionCode()));
return result;
}
@ -306,7 +306,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
@ -336,7 +336,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
return result;
}
}
@ -406,7 +406,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
return result;
}
@ -618,7 +618,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}

View File

@ -180,11 +180,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(processDefinitionCode));
return result;
}
TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
@ -314,7 +314,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
@ -406,7 +406,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return null;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
@ -557,7 +557,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
@ -618,7 +618,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
} else {
if (taskDefinition.getVersion() == version) {
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
@ -645,7 +645,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
} else {
result.put(Constants.DATA_LIST, taskDefinition);
putMsg(result, Status.SUCCESS);
@ -752,12 +752,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion());
if (taskDefinitionLog == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
switch (releaseState) {

View File

@ -26,7 +26,7 @@ import java.util.regex.Pattern;
*/
public class RegexUtils {
private static final String LINUX_USERNAME_PATTERN = "[a-z_][a-z\\d_]{0,30}";
private static final String LINUX_USERNAME_PATTERN = "^[a-zA-Z0-9_].{0,30}";
private RegexUtils() {
}

View File

@ -48,6 +48,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
@ -121,6 +122,10 @@ public class ProcessInstanceServiceTest {
@Mock
TaskPluginManager taskPluginManager;
@Mock
ScheduleMapper scheduleMapper;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";

View File

@ -28,10 +28,10 @@ public class RegexUtilsTest {
@Test
public void testIsValidLinuxUserName() {
String name1 = "10000";
Assert.assertFalse(RegexUtils.isValidLinuxUserName(name1));
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name1));
String name2 = "00hayden";
Assert.assertFalse(RegexUtils.isValidLinuxUserName(name2));
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name2));
String name3 = "hayde123456789123456789123456789";
Assert.assertFalse(RegexUtils.isValidLinuxUserName(name3));
@ -44,6 +44,12 @@ public class RegexUtilsTest {
String name6 = "hayden";
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name6));
String name7 = "00hayden_0";
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name2));
String name8 = "00hayden.8";
Assert.assertTrue(RegexUtils.isValidLinuxUserName(name2));
}
@Test

View File

@ -120,7 +120,7 @@ public final class Constants {
/**
* environment properties default path
*/
public static final String ENV_PATH = "env/dolphinscheduler_env.sh";
public static final String ENV_PATH = "dolphinscheduler_env.sh";
/**
* resource.view.suffixs
@ -817,4 +817,9 @@ public final class Constants {
public static final String LIMITS_CPU = "limitsCpu";
public static final String LIMITS_MEMORY = "limitsMemory";
public static final String K8S_LOCAL_TEST_CLUSTER = "ds_null_k8s";
/**
* schedule timezone
*/
public static final String SCHEDULE_TIMEZONE = "schedule_timezone";
}

View File

@ -40,7 +40,7 @@ public class CommonUtils {
private static final Base64 BASE64 = new Base64();
private CommonUtils() {
protected CommonUtils() {
throw new UnsupportedOperationException("Construct CommonUtils");
}

View File

@ -88,7 +88,7 @@ public class ParameterUtils {
* @return curing user define parameters
*/
public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
CommandType commandType, Date scheduleTime) {
CommandType commandType, Date scheduleTime, String timezone) {
if (globalParamList == null || globalParamList.isEmpty()) {
return null;
@ -101,7 +101,7 @@ public class ParameterUtils {
Map<String, String> allParamMap = new HashMap<>();
//If it is a complement, a complement time needs to be passed in, according to the task type
Map<String, String> timeParams = BusinessTimeUtils.
getBusinessTime(commandType, scheduleTime);
getBusinessTime(commandType, scheduleTime, timezone);
if (timeParams != null) {
allParamMap.putAll(timeParams);

View File

@ -45,7 +45,7 @@ public class BusinessTimeUtils {
* @param runTime run time or schedule time
* @return business time
*/
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime) {
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime, String timezone) {
Date businessDate = runTime;
Map<String, String> result = new HashMap<>();
switch (commandType) {
@ -71,9 +71,9 @@ public class BusinessTimeUtils {
break;
}
Date businessCurrentDate = addDays(businessDate, 1);
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, null));
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, null));
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, null));
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, timezone));
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, timezone));
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, timezone));
return result;
}
}

View File

@ -83,7 +83,7 @@ sudo.enable=true
#dolphin.scheduler.network.priority.strategy=default
# system env path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
#dolphinscheduler.env.path=dolphinscheduler_env.sh
# development state
development.state=false

View File

@ -41,8 +41,9 @@ public class CommonUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(CommonUtilsTest.class);
@Test
public void getSystemEnvPath() {
logger.info(CommonUtils.getSystemEnvPath());
Assert.assertTrue(true);
String envPath;
envPath = CommonUtils.getSystemEnvPath();
Assert.assertEquals("/etc/profile", envPath);
}
@Test

View File

@ -100,22 +100,22 @@ public class ParameterUtilsTest {
Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
//test globalParamList is null
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertNull(result);
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null));
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime));
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
//test globalParamList is not null
Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
globalParamList.add(property);
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null);
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
//test var $ startsWith
@ -130,7 +130,7 @@ public class ParameterUtilsTest {
globalParamList.add(property3);
globalParamList.add(property4);
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
@ -150,7 +150,7 @@ public class ParameterUtilsTest {
}
}
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertTrue(result6.contains("20191220"));
}

View File

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@ -39,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -534,19 +536,7 @@ public class DagHelper {
public static boolean haveConditionsAfterNode(String parentNodeCode,
DAG<String, TaskNode, TaskNodeRelation> dag
) {
boolean result = false;
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
if (CollectionUtils.isEmpty(subsequentNodes)) {
return result;
}
for (String nodeCode : subsequentNodes) {
TaskNode taskNode = dag.getNode(nodeCode);
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
return true;
}
}
return result;
return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_CONDITIONS);
}
/**
@ -565,19 +555,38 @@ public class DagHelper {
return false;
}
/**
* is there have blocking node after the parent node
*/
public static boolean haveBlockingAfterNode(String parentNodeCode,
DAG<String,TaskNode,TaskNodeRelation> dag) {
return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_BLOCKING);
}
/**
* is there have all node after the parent node
*/
public static boolean haveAllNodeAfterNode(String parentNodeCode,
DAG<String,TaskNode,TaskNodeRelation> dag) {
return haveSubAfterNode(parentNodeCode, dag, null);
}
/**
* Whether there is a specified type of child node after the parent node
*/
public static boolean haveSubAfterNode(String parentNodeCode,
DAG<String,TaskNode,TaskNodeRelation> dag, String filterNodeType) {
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
if (CollectionUtils.isEmpty(subsequentNodes)) {
return false;
}
if (StringUtils.isBlank(filterNodeType)){
return true;
}
for (String nodeName : subsequentNodes) {
TaskNode taskNode = dag.getNode(nodeName);
List<String> preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class);
if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) {
if (taskNode.getType().equalsIgnoreCase(filterNodeType)){
return true;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@ -48,6 +49,57 @@ import com.fasterxml.jackson.core.JsonProcessingException;
* dag helper test
*/
public class DagHelperTest {
@Test
public void testHaveSubAfterNode(){
String parentNodeCode = "5293789969856";
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
TaskNodeRelation relation = new TaskNodeRelation();
relation.setStartNode("5293789969856");
relation.setEndNode("5293789969857");
taskNodeRelations.add(relation);
TaskNodeRelation relationNext = new TaskNodeRelation();
relationNext.setStartNode("5293789969856");
relationNext.setEndNode("5293789969858");
taskNodeRelations.add(relationNext);
List<TaskNode> taskNodes = new ArrayList<>();
TaskNode node = new TaskNode();
node.setCode(5293789969856L);
node.setType("SHELL");
TaskNode subNode = new TaskNode();
subNode.setCode(5293789969857L);
subNode.setType("BLOCKING");
subNode.setPreTasks("[5293789969856]");
TaskNode subNextNode = new TaskNode();
subNextNode.setCode(5293789969858L);
subNextNode.setType("CONDITIONS");
subNextNode.setPreTasks("[5293789969856]");
taskNodes.add(node);
taskNodes.add(subNode);
taskNodes.add(subNextNode);
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodes);
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag);
Assert.assertTrue(canSubmit);
boolean haveBlocking = DagHelper.haveBlockingAfterNode(parentNodeCode, dag);
Assert.assertTrue(haveBlocking);
boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag);
Assert.assertTrue(haveConditions);
boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_DEPENDENT);
Assert.assertFalse(dependent);
}
/**
* test task node can submit
*
@ -280,9 +332,16 @@ public class DagHelperTest {
}
/**
* process:
* 1->2->3->5->7
* 4->3->6
* 2->8->5->7
* 1->2->8->5->7
* DAG graph:
* 4 -> -> 6
* \ /
* 1 -> 2 -> 3 -> 5 -> 7
* \ /
* -> 8 ->
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
@ -377,9 +436,12 @@ public class DagHelperTest {
}
/**
* 1->2->3->5->7
* 4->3->6
* 2->8->5->7
* DAG graph:
* 2
*
* 0->1(switch)
*
* 4
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException

View File

@ -76,7 +76,7 @@ sudo.enable=true
# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default
# system env path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
#dolphinscheduler.env.path=dolphinscheduler_env.sh
# development state
development.state=false
# rpc port

View File

@ -41,7 +41,7 @@
</fileSet>
<fileSet>
<directory>${basedir}/../script/env</directory>
<outputDirectory>bin</outputDirectory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>

View File

@ -19,7 +19,7 @@
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$BIN_DIR/dolphinscheduler_env.sh"
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

View File

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.PostConstruct;
@ -68,6 +69,9 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterRegistryClient masterRegistryClient;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private MasterSchedulerService masterSchedulerService;
@ -131,6 +135,9 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
// self tolerant
this.masterRegistryClient.init();
this.masterRegistryClient.start();

View File

@ -50,6 +50,9 @@ public class FailoverExecuteThread extends Thread {
@Override
public void run() {
// when startup, wait 10s for ready
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10);
logger.info("failover execute thread started");
while (Stopper.isRunning()) {
try {

View File

@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutor
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
@ -102,9 +101,6 @@ public class MasterSchedulerService extends Thread {
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
@Autowired
private TaskPluginManager taskPluginManager;
/**
* constructor of MasterSchedulerService
*/

View File

@ -456,9 +456,9 @@ public class WorkflowExecuteThread {
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (taskInstance.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
|| DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
// There are child nodes and the failure policy is: CONTINUE
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@ -948,7 +948,7 @@ public class WorkflowExecuteThread {
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
processService.updateProcessInstance(processInstance);
}
}

View File

@ -43,7 +43,7 @@ public class ParamsTest {
// start process
Map<String,String> timeParams = BusinessTimeUtils
.getBusinessTime(CommandType.START_PROCESS,
new Date());
new Date(), null);
command = ParameterUtils.convertParameterPlaceholders(command, timeParams);
@ -57,7 +57,7 @@ public class ParamsTest {
// complement data
timeParams = BusinessTimeUtils
.getBusinessTime(CommandType.COMPLEMENT_DATA,
calendar.getTime());
calendar.getTime(), null);
command = ParameterUtils.convertParameterPlaceholders(command, timeParams);
logger.info("complement data : {}",command);

View File

@ -17,6 +17,7 @@
"""Task sql."""
import logging
import re
from typing import Dict, Optional
@ -24,6 +25,8 @@ from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.task import Task
log = logging.getLogger(__file__)
class SqlType:
"""SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
@ -61,6 +64,7 @@ class Sql(Task):
name: str,
datasource_name: str,
sql: str,
sql_type: Optional[int] = None,
pre_statements: Optional[str] = None,
post_statements: Optional[str] = None,
display_rows: Optional[int] = 10,
@ -69,6 +73,7 @@ class Sql(Task):
):
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.sql = sql
self.param_sql_type = sql_type
self.datasource_name = datasource_name
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
@ -76,9 +81,24 @@ class Sql(Task):
@property
def sql_type(self) -> int:
"""Judgement sql type, use regexp to check which type of the sql is."""
"""Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
If `param_sql_type` dot not specific, will use regexp to check
which type of the SQL is. But if `param_sql_type` is specific
will use the parameter overwrites the regexp way
"""
if (
self.param_sql_type == SqlType.SELECT
or self.param_sql_type == SqlType.NOT_SELECT
):
log.info(
"The sql type is specified by a parameter, with value %s",
self.param_sql_type,
)
return self.param_sql_type
pattern_select_str = (
"^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
"^(?!(.* |)insert |(.* |)delete |(.* |)drop "
"|(.* |)update |(.* |)alter |(.* |)create ).*"
)
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
if pattern_select.match(self.sql) is None:

View File

@ -26,24 +26,38 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
@pytest.mark.parametrize(
"sql, sql_type",
"sql, param_sql_type, sql_type",
[
("select 1", SqlType.SELECT),
(" select 1", SqlType.SELECT),
(" select 1 ", SqlType.SELECT),
(" select 'insert' ", SqlType.SELECT),
(" select 'insert ' ", SqlType.SELECT),
("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
("select 1", None, SqlType.SELECT),
(" select 1", None, SqlType.SELECT),
(" select 1 ", None, SqlType.SELECT),
(" select 'insert' ", None, SqlType.SELECT),
(" select 'insert ' ", None, SqlType.SELECT),
("with tmp as (select 1) select * from tmp ", None, SqlType.SELECT),
(
"insert into table_name(select, col2) value ('select', val2)",
"insert into table_name(col1, col2) value (val1, val2)",
None,
SqlType.NOT_SELECT,
),
("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
("delete from table_name where id < 10", SqlType.NOT_SELECT),
("delete from table_name where id < 10", SqlType.NOT_SELECT),
("alter table table_name add column col1 int", SqlType.NOT_SELECT),
(
"insert into table_name(select, col2) value ('select', val2)",
None,
SqlType.NOT_SELECT,
),
("update table_name SET col1=val1 where col1=val2", None, SqlType.NOT_SELECT),
(
"update table_name SET col1='select' where col1=val2",
None,
SqlType.NOT_SELECT,
),
("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
("alter table table_name add column col1 int", None, SqlType.NOT_SELECT),
("create table table_name2 (col1 int)", None, SqlType.NOT_SELECT),
("create table table_name2 (col1 int)", SqlType.SELECT, SqlType.SELECT),
("select 1", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
("create table table_name2 (col1 int)", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
("select 1", SqlType.SELECT, SqlType.SELECT),
],
)
@patch(
@ -54,11 +68,13 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
"pydolphinscheduler.core.database.Database.get_database_info",
return_value=({"id": 1, "type": "mock_type"}),
)
def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
def test_get_sql_type(
mock_datasource, mock_code_version, sql, param_sql_type, sql_type
):
"""Test property sql_type could return correct type."""
name = "test_get_sql_type"
datasource_name = "test_datasource"
task = Sql(name, datasource_name, sql)
task = Sql(name, datasource_name, sql, sql_type=param_sql_type)
assert (
sql_type == task.sql_type
), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"

View File

@ -27,6 +27,8 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import org.apache.dolphinscheduler.common.Constants;
@ -399,6 +401,13 @@ public class ProcessServiceImpl implements ProcessService {
public int createCommand(Command command) {
int result = 0;
if (command != null) {
// add command timezone
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
Map<String, String> commandParams = JSONUtils.toMap(command.getCommandParam());
if (commandParams != null && schedule != null) {
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
result = commandMapper.insert(command);
}
return result;
@ -771,11 +780,17 @@ public class ProcessServiceImpl implements ProcessService {
setGlobalParamIfCommanded(processDefinition, cmdParam);
// curing global params
Map<String, String> commandParamMap = JSONUtils.toMap(command.getCommandParam());
String timezoneId = null;
if (commandParamMap != null) {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
processInstance.getScheduleTime(), timezoneId));
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@ -801,14 +816,22 @@ public class ProcessServiceImpl implements ProcessService {
}
startParamMap.putAll(fatherParamMap);
// set start param into global params
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (startParamMap.size() > 0
&& processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
&& globalMap != null) {
for (Map.Entry<String, String> param : globalMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
for (Entry<String, String> startParam : startParamMap.entrySet()) {
if (!globalMap.containsKey(startParam.getKey())) {
globalMap.put(startParam.getKey(), startParam.getValue());
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
}
}
}
}
@ -909,12 +932,15 @@ public class ProcessServiceImpl implements ProcessService {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime()));
processInstance.getScheduleTime(), timezoneId));
processInstance.setProcessDefinition(processDefinition);
}
//reset command parameter
@ -1092,10 +1118,14 @@ public class ProcessServiceImpl implements ProcessService {
&& Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementDate.get(0));
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId));
}
/**
@ -1181,7 +1211,7 @@ public class ProcessServiceImpl implements ProcessService {
private String joinVarPool(String parentValPool, String subValPool) {
List<Property> parentValPools = Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class));
parentValPools = parentValPools.stream().filter(valPool -> valPool.getDirect() == Direct.OUT).collect(Collectors.toList());
List<Property> subValPools = Lists.newArrayList(JSONUtils.toList(subValPool, Property.class));
Set<String> parentValPoolKeys = parentValPools.stream().map(Property::getProp).collect(toSet());

View File

@ -40,8 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@ -86,8 +84,7 @@ public class TaskPluginManager {
return taskChannel.parseParameters(parametersNode);
}
@EventListener
public void installPlugin(ApplicationReadyEvent readyEvent) {
public void installPlugin() {
final Set<String> names = new HashSet<>();
ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {

View File

@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
@ -155,6 +156,9 @@ public class ProcessServiceTest {
@Mock
private DqComparisonTypeMapper dqComparisonTypeMapper;
@Mock
private ScheduleMapper scheduleMapper;
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();

View File

@ -69,23 +69,13 @@
</fileSet>
<fileSet>
<directory>${basedir}/../script/env</directory>
<outputDirectory>bin</outputDirectory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>${basedir}/../script/env</directory>
<outputDirectory>dist-bin</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>
<fileMode>0755</fileMode>
<directoryMode>0755</directoryMode>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-dao/src/main/resources</directory>
<includes>
@ -93,6 +83,13 @@
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources</directory>
<includes>
<include>common.properties</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-ui-next/dist</directory>
<outputDirectory>./ui</outputDirectory>

View File

@ -19,7 +19,7 @@
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$BIN_DIR/dolphinscheduler_env.sh"
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

View File

@ -19,7 +19,7 @@
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$BIN_DIR/dolphinscheduler_env.sh"
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

View File

@ -40,7 +40,7 @@
</fileSet>
<fileSet>
<directory>${basedir}/../script/env</directory>
<outputDirectory>bin</outputDirectory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>dolphinscheduler_env.sh</include>
</includes>

View File

@ -19,7 +19,7 @@
BIN_DIR=$(dirname $0)
DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/../..; pwd)}
source "$DOLPHINSCHEDULER_HOME/tools/bin/dolphinscheduler_env.sh"
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

View File

@ -50,7 +50,7 @@ import {
} from '@vicons/antd'
import { useRoute } from 'vue-router'
import { useUserStore } from '@/store/user/user'
import { timezoneList } from '@/utils/timezone'
import { timezoneList } from '@/common/timezone'
import type { UserInfoRes } from '@/service/modules/users/types'
export function useDataList() {

View File

@ -1037,7 +1037,7 @@ const security = {
user: {
user_manage: 'User Manage',
create_user: 'Create User',
update_user: 'Update User',
edit_user: 'Edit User',
delete_user: 'Delete User',
delete_confirm: 'Are you sure to delete?',
delete_confirm_tip:

View File

@ -1026,7 +1026,7 @@ const security = {
user: {
user_manage: '用户管理',
create_user: '创建用户',
update_user: '更新用户',
edit_user: '编辑用户',
delete_user: '删除用户',
delete_confirm: '确定删除吗?',
project: '项目',

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { ITaskState } from '@/utils/types'
import type { ITaskState } from '@/common/types'
interface CodeReq {
projectCode: number

View File

@ -29,7 +29,7 @@ interface AlertGroupIdReq {
interface UserReq {
email: string
tenantId: number
tenantId: number | null
userName: string
userPassword: string
phone?: string

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
import { timezoneList } from '@/utils/timezone'
import { timezoneList } from '@/common/timezone'
type Timezone = typeof timezoneList[number]

View File

@ -15,17 +15,17 @@
* limitations under the License.
*/
export const copy = (text: string): boolean => {
const range = document.createRange()
const node = document.createTextNode(text)
document.body.append(node)
range.selectNode(node)
window.getSelection()?.addRange(range)
const copy = (text: string): boolean => {
const inp = document.createElement('input')
document.body.appendChild(inp)
inp.value = text
inp.select()
let result = false
try {
result = document.execCommand('copy')
} catch (err) {}
window.getSelection()?.removeAllRanges()
document.body.removeChild(node)
inp.remove()
return result
}
export default copy

View File

@ -20,13 +20,17 @@ import regex from './regex'
import truncateText from './truncate-text'
import log from './log'
import downloadFile from './downloadFile'
import copy from './clipboard'
import removeUselessChildren from './tree-format'
const utils = {
mapping,
regex,
truncateText,
log,
downloadFile
downloadFile,
copy,
removeUselessChildren
}
export default utils

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
export function removeUselessChildren(
const removeUselessChildren = (
list: { children?: []; dirctory?: boolean; disabled?: boolean }[]
) {
) => {
if (!list.length) return
list.forEach((item) => {
if (item.dirctory && item.children?.length === 0) item.disabled = true
@ -29,3 +29,5 @@ export function removeUselessChildren(
removeUselessChildren(item.children)
})
}
export default removeUselessChildren

View File

@ -21,7 +21,7 @@
* @param {string} text
* Each Chinese character is equal to two chars
*/
export default function truncateText(text: string, n: number) {
const truncateText = (text: string, n: number) => {
const exp = /[\u4E00-\u9FA5]/
let res = ''
let len = text.length
@ -49,3 +49,5 @@ export default function truncateText(text: string, n: number) {
}
return res
}
export default truncateText

View File

@ -24,7 +24,7 @@ import TableAction from './components/table-action'
import _ from 'lodash'
import { format } from 'date-fns'
import { TableColumns } from 'naive-ui/es/data-table/src/interface'
import { parseTime } from '@/utils/common'
import { parseTime } from '@/common/common'
export function useTable(viewRuleEntry = (unusedRuleJson: string): void => {}) {
const { t } = useI18n()

View File

@ -24,12 +24,12 @@ import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/utils/column-width-config'
} from '@/common/column-width-config'
import type {
ResultItem,
ResultListRes
} from '@/service/modules/data-quality/types'
import { parseTime } from '@/utils/common'
import { parseTime } from '@/common/common'
export function useTable() {
const { t } = useI18n()

View File

@ -32,6 +32,7 @@ import { useColumns } from './use-columns'
import { useTable } from './use-table'
import styles from './index.module.scss'
import type { TableColumns } from './types'
import { DefaultTableWidth } from '@/common/column-width-config'
const list = defineComponent({
name: 'list',
@ -39,7 +40,10 @@ const list = defineComponent({
const { t } = useI18n()
const showDetailModal = ref(false)
const selectId = ref()
const columns = ref({ columns: [] as TableColumns, tableWidth: 0 })
const columns = ref({
columns: [] as TableColumns,
tableWidth: DefaultTableWidth
})
const { data, changePage, changePageSize, deleteRecord, updateList } =
useTable()

View File

@ -32,7 +32,7 @@ import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/utils/column-width-config'
} from '@/common/column-width-config'
import type { TableColumns } from './types'
export function useColumns(onCallback: Function) {
@ -108,6 +108,7 @@ export function useColumns(onCallback: Function) {
{
circle: true,
type: 'info',
size: 'small',
onClick: () => void onCallback(rowData.id, 'edit')
},
{
@ -134,6 +135,7 @@ export function useColumns(onCallback: Function) {
{
circle: true,
type: 'error',
size: 'small',
class: 'btn-delete'
},
{

View File

@ -20,7 +20,7 @@ import { reactive, ref } from 'vue'
import { useAsyncState } from '@vueuse/core'
import { queryAuditLogListPaging } from '@/service/modules/audit'
import { format } from 'date-fns'
import { parseTime } from '@/utils/common'
import { parseTime } from '@/common/common'
import type { AuditListRes } from '@/service/modules/audit/types'
export function useTable() {

View File

@ -20,7 +20,7 @@ import { useI18n } from 'vue-i18n'
import { useAsyncState } from '@vueuse/core'
import ButtonLink from '@/components/button-link'
import { queryProjectListPaging } from '@/service/modules/projects'
import { parseTime } from '@/utils/common'
import { parseTime } from '@/common/common'
import { deleteProject } from '@/service/modules/projects'
import { format } from 'date-fns'
import { useRouter } from 'vue-router'
@ -36,7 +36,7 @@ import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/utils/column-width-config'
} from '@/common/column-width-config'
import type { Router } from 'vue-router'
import type { ProjectRes } from '@/service/modules/projects/types'
import { DeleteOutlined, EditOutlined } from '@vicons/antd'

View File

@ -21,7 +21,7 @@ import { NIcon } from 'naive-ui'
import { useRelationCustomParams, useDependentTimeout } from '.'
import { useTaskNodeStore } from '@/store/project/task-node'
import { queryAllProjectList } from '@/service/modules/projects'
import { tasksState } from '@/utils/common'
import { tasksState } from '@/common/common'
import {
queryProcessDefinitionList,
getTasksByDefinitionList

View File

@ -19,7 +19,7 @@ import { ref, onMounted, watch } from 'vue'
import { useI18n } from 'vue-i18n'
import { queryResourceByProgramType } from '@/service/modules/resources'
import { useTaskNodeStore } from '@/store/project/task-node'
import { removeUselessChildren } from '@/utils/tree-format'
import utils from '@/utils'
import type { IJsonItem, ProgramType, IMainJar } from '../types'
export function useMainJar(model: { [field: string]: any }): IJsonItem {
@ -37,7 +37,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
type: 'FILE',
programType
})
removeUselessChildren(res)
utils.removeUselessChildren(res)
mainJarOptions.value = res || []
taskStore.updateMainJar(programType, res)
}

View File

@ -19,7 +19,7 @@ import { ref, onMounted } from 'vue'
import { useI18n } from 'vue-i18n'
import { queryResourceList } from '@/service/modules/resources'
import { useTaskNodeStore } from '@/store/project/task-node'
import { removeUselessChildren } from '@/utils/tree-format'
import utils from '@/utils'
import type { IJsonItem, IResource } from '../types'
export function useResources(): IJsonItem {
@ -38,7 +38,7 @@ export function useResources(): IJsonItem {
if (resourcesLoading.value) return
resourcesLoading.value = true
const res = await queryResourceList({ type: 'FILE' })
removeUselessChildren(res)
utils.removeUselessChildren(res)
resourcesOptions.value = res || []
resourcesLoading.value = false
taskStore.updateResource(res)

View File

@ -66,7 +66,8 @@ export function useTaskGroup(
span: 12,
name: t('project.node.task_group_name'),
props: {
loading
loading,
clearable: true
},
options
},

View File

@ -31,7 +31,7 @@ export type {
WorkflowInstance
} from '@/views/projects/workflow/components/dag/types'
export type { IResource, ProgramType, IMainJar } from '@/store/project/types'
export type { ITaskState } from '@/utils/types'
export type { ITaskState } from '@/common/types'
type SourceType = 'MYSQL' | 'HDFS' | 'HIVE'
type ModelType = 'import' | 'export'

View File

@ -35,7 +35,7 @@ import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/utils/column-width-config'
} from '@/common/column-width-config'
import type {
TaskDefinitionItem,
TaskDefinitionRes

View File

@ -34,7 +34,7 @@ import Card from '@/components/card'
import LogModal from '@/components/log-modal'
import { useAsyncState } from '@vueuse/core'
import { queryLog } from '@/service/modules/log'
import { stateType } from '@/utils/common'
import { stateType } from '@/common/common'
import styles from './index.module.scss'
const TaskInstance = defineComponent({

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ITaskState } from '@/utils/types'
import { ITaskState } from '@/common/types'
export type { Router } from 'vue-router'
export type { TaskInstancesRes } from '@/service/modules/task-instances/types'

View File

@ -32,12 +32,12 @@ import {
} from '@vicons/antd'
import { format } from 'date-fns'
import { useRoute, useRouter } from 'vue-router'
import { parseTime, tasksState } from '@/utils/common'
import { parseTime, tasksState } from '@/common/common'
import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/utils/column-width-config'
} from '@/common/column-width-config'
import type { Router, TaskInstancesRes, IRecord, ITaskState } from './types'
export function useTable() {
@ -155,7 +155,8 @@ export function useTable() {
{
title: t('project.task.host'),
key: 'host',
...COLUMN_WIDTH_CONFIG['name']
...COLUMN_WIDTH_CONFIG['name'],
render: (row: IRecord) => row.host || '-'
},
{
title: t('project.task.operation'),
@ -206,6 +207,7 @@ export function useTable() {
circle: true,
type: 'info',
size: 'small',
disabled: !row.host,
onClick: () => handleLog(row)
},
{

View File

@ -21,7 +21,7 @@ import { defineComponent, onMounted, PropType, inject, ref } from 'vue'
import { useI18n } from 'vue-i18n'
import { useRoute } from 'vue-router'
import styles from './menu.module.scss'
import { uuid } from '@/utils/common'
import { uuid } from '@/common/common'
import { IWorkflowTaskInstance } from './types'
const props = {

View File

@ -209,7 +209,7 @@ export default defineComponent({
<NSwitch v-model:value={formValue.value.timeoutFlag} />
</NFormItem>
{formValue.value.timeoutFlag && (
<NFormItem label=' ' path='timeout'>
<NFormItem showLabel={false} path='timeout'>
<NInputNumber
v-model:value={formValue.value.timeout}
show-button={false}
@ -255,14 +255,14 @@ export default defineComponent({
/>
</NFormItem>
{props.definition && !props.instance && (
<NFormItem path='timeoutFlag'>
<NFormItem path='timeoutFlag' showLabel={false}>
<NCheckbox v-model:checked={formValue.value.release}>
{t('project.dag.online_directly')}
</NCheckbox>
</NFormItem>
)}
{props.instance && (
<NFormItem path='sync'>
<NFormItem path='sync' showLabel={false}>
<NCheckbox v-model:checked={formValue.value.sync}>
{t('project.dag.update_directly')}
</NCheckbox>

View File

@ -20,7 +20,7 @@ import { defineComponent, onMounted, PropType, ref, computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { listAlertGroupById } from '@/service/modules/alert-group'
import { queryAllWorkerGroups } from '@/service/modules/worker-groups'
import { runningType, warningTypeList } from '@/utils/common'
import { runningType, warningTypeList } from '@/common/common'
import { IStartupParam } from './types'
import styles from './startup.module.scss'

View File

@ -351,7 +351,7 @@ export default defineComponent({
onEdit={editTask}
onCopyTask={copyTask}
onRemoveTasks={removeTasks}
onViewLog={viewLog}
onViewLog={handleViewLog}
/>
{!!props.definition && (
<StartModal

View File

@ -16,7 +16,7 @@
*/
import { TaskType } from '@/views/projects/task/constants/task-type'
export type { ITaskState } from '@/utils/types'
export type { ITaskState } from '@/common/types'
export interface ProcessDefinition {
id: number

View File

@ -18,7 +18,7 @@
import { render, h, ref } from 'vue'
import { useRoute } from 'vue-router'
import { useI18n } from 'vue-i18n'
import { tasksState } from '@/utils/common'
import { tasksState } from '@/common/common'
import { NODE, NODE_STATUS_MARKUP } from './dag-config'
import { queryTaskListByProcessId } from '@/service/modules/process-instances'
import NodeStatus from '@/views/projects/workflow/components/dag/dag-node-status'
@ -79,9 +79,12 @@ export function useNodeStatus(options: Options) {
if (taskList.value) {
taskList.value.forEach((taskInstance: any) => {
setNodeStatus(taskInstance.taskCode, taskInstance.state, taskInstance)
nodeStore.updateDependentResult(
JSON.parse(taskInstance.dependentResult)
)
if (taskInstance.dependentResult) {
nodeStore.updateDependentResult(
JSON.parse(taskInstance.dependentResult)
)
}
})
}
})

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
import { copy } from '@/utils/clipboard'
import utils from '@/utils'
import { useMessage } from 'naive-ui'
import { useI18n } from 'vue-i18n'
@ -26,7 +26,7 @@ export function useTextCopy() {
const { t } = useI18n()
const message = useMessage()
const copyText = (text: string) => {
if (copy(text)) {
if (utils.copy(text)) {
message.success(t('project.dag.copy_success'))
}
}

View File

@ -186,8 +186,12 @@ export default defineComponent({
})
watch(
() => props.row,
() => getStartParamsList(props.row.code)
() => props.show,
() => {
if (props.show) {
getStartParamsList(props.row.code)
}
}
)
return {
@ -381,7 +385,7 @@ export default defineComponent({
) : (
<NSpace vertical>
{this.startParamsList.map((item, index) => (
<NSpace class={styles.startup} key={index}>
<NSpace class={styles.startup} key={Date.now() + index}>
<NInput
pair
separator=':'

View File

@ -47,7 +47,7 @@ import {
NPopover
} from 'naive-ui'
import { ArrowDownOutlined, ArrowUpOutlined } from '@vicons/antd'
import { timezoneList } from '@/utils/timezone'
import { timezoneList } from '@/common/timezone'
import Crontab from '@/components/crontab'
const props = {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
import _ from 'lodash'
import _, { cloneDeep } from 'lodash'
import { reactive, SetupContext } from 'vue'
import { useI18n } from 'vue-i18n'
import { useRoute, useRouter } from 'vue-router'
@ -35,7 +35,7 @@ import {
updateSchedule,
previewSchedule
} from '@/service/modules/schedules'
import { parseTime } from '@/utils/common'
import { parseTime } from '@/common/common'
import { EnvironmentItem } from '@/service/modules/environment/types'
import { ITimingState } from './types'
@ -56,6 +56,10 @@ export function useModal(
schedulePreviewList: []
})
const cachedStartParams = {} as {
[key: string]: { prop: string; value: string }[]
}
const resetImportForm = () => {
state.importForm.name = ''
state.importForm.file = ''
@ -237,9 +241,14 @@ export function useModal(
}
const getStartParamsList = (code: number) => {
if (cachedStartParams[code]) {
variables.startParamsList = cloneDeep(cachedStartParams[code])
return
}
queryProcessDefinitionByCode(code, variables.projectCode).then(
(res: any) => {
variables.startParamsList = res.processDefinition.globalParamList
cachedStartParams[code] = cloneDeep(variables.startParamsList)
}
)
}

Some files were not shown because too many files have changed in this diff Show More