diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py new file mode 100644 index 000000000..c9ca80cde --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +A example workflow for task datax. + +This example will create a workflow named `task_datax`. +`task_datax` is true workflow define and run task task_datax. +You can create data sources `first_mysql` and `first_mysql` through UI. +It creates a task to synchronize datax from the source database to the target database. +""" + + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.datax import CustomDataX, DataX + +# datax json template +JSON_TEMPLATE = "" + +with ProcessDefinition( + name="task_datax", + tenant="tenant_exists", +) as pd: + # This task synchronizes the data in `t_ds_project` + # of `first_mysql` database to `target_project` of `second_mysql` database. + task1 = DataX( + name="task_datax", + datasource_name="first_mysql", + datatarget_name="second_mysql", + sql="select id, name, code, description from source_table", + target_table="target_table", + ) + + # you can custom json_template of datax to sync data. + task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE) + pd.run() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index c2d2e7f25..a57a11f98 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -73,6 +73,7 @@ class TaskType(str): SQL = "SQL" SUB_PROCESS = "SUB_PROCESS" PROCEDURE = "PROCEDURE" + DATAX = "DATAX" DEPENDENT = "DEPENDENT" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py new file mode 100644 index 000000000..f87ffefe0 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Module database.""" + +from typing import Dict + +from pydolphinscheduler.java_gateway import launch_gateway + + +class Database(dict): + """database object, get information about database. + + You provider database_name contain connection information, it decisions which + database type and database instance would run task. + """ + + def __init__(self, database_name: str, type_key, database_key, *args, **kwargs): + super().__init__(*args, **kwargs) + self._database = {} + self.database_name = database_name + self[type_key] = self.database_type + self[database_key] = self.database_id + + @property + def database_type(self) -> str: + """Get database type from java gateway, a wrapper for :func:`get_database_info`.""" + return self.get_database_info(self.database_name).get("type") + + @property + def database_id(self) -> str: + """Get database id from java gateway, a wrapper for :func:`get_database_info`.""" + return self.get_database_info(self.database_name).get("id") + + def get_database_info(self, name) -> Dict: + """Get database info from java gateway, contains database id, type, name.""" + if self._database: + return self._database + else: + gateway = launch_gateway() + self._database = gateway.entry_point.getDatasourceInfo(name) + return self._database diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py deleted file mode 100644 index 2fa584549..000000000 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py +++ /dev/null @@ -1,80 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Task database base task.""" - -from typing import Dict - -from pydolphinscheduler.core.task import Task -from pydolphinscheduler.java_gateway import launch_gateway - - -class Database(Task): - """Base task to handle database, declare behavior for the base handler of database. - - It a parent class for all database task of dolphinscheduler. And it should run sql like - job in multiply sql lik engine, such as: - - ClickHouse - - DB2 - - HIVE - - MySQL - - Oracle - - Postgresql - - Presto - - SQLServer - You provider datasource_name contain connection information, it decisions which - database type and database instance would run this sql. - """ - - def __init__( - self, task_type: str, name: str, datasource_name: str, *args, **kwargs - ): - super().__init__(name, task_type, *args, **kwargs) - self.datasource_name = datasource_name - self._datasource = {} - - def get_datasource_type(self) -> str: - """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`.""" - return self.get_datasource_info(self.datasource_name).get("type") - - def get_datasource_id(self) -> str: - """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`.""" - return self.get_datasource_info(self.datasource_name).get("id") - - def get_datasource_info(self, name) -> Dict: - """Get datasource info from java gateway, contains datasource id, type, name.""" - if self._datasource: - return self._datasource - else: - gateway = launch_gateway() - self._datasource = gateway.entry_point.getDatasourceInfo(name) - return self._datasource - - @property - def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: - """Override Task.task_params for sql task. - - Sql task have some specials attribute for task_params, and is odd if we - directly set as python property, so we Override Task.task_params here. - """ - params = super().task_params - custom_params = { - "type": self.get_datasource_type(), - "datasource": self.get_datasource_id(), - } - params.update(custom_params) - return params diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py new file mode 100644 index 000000000..f881a67de --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task datax.""" + +from typing import Dict, List, Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.database import Database +from pydolphinscheduler.core.task import Task + + +class CustomDataX(Task): + """Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler. + + You provider json template for DataX, it can synchronize data according to the template you provided. + """ + + CUSTOM_CONFIG = 1 + + _task_custom_attr = {"custom_config", "json", "xms", "xmx"} + + def __init__( + self, + name: str, + json: str, + xms: Optional[int] = 1, + xmx: Optional[int] = 1, + *args, + **kwargs + ): + super().__init__(name, TaskType.DATAX, *args, **kwargs) + self.custom_config = self.CUSTOM_CONFIG + self.json = json + self.xms = xms + self.xmx = xmx + + +class DataX(Task): + """Task DataX object, declare behavior for DataX task to dolphinscheduler. + + It should run database datax job in multiply sql link engine, such as: + - MySQL + - Oracle + - Postgresql + - SQLServer + You provider datasource_name and datatarget_name contain connection information, it decisions which + database type and database instance would synchronous data. + """ + + CUSTOM_CONFIG = 0 + + _task_custom_attr = { + "custom_config", + "sql", + "target_table", + "job_speed_byte", + "job_speed_record", + "pre_statements", + "post_statements", + "xms", + "xmx", + } + + def __init__( + self, + name: str, + datasource_name: str, + datatarget_name: str, + sql: str, + target_table: str, + job_speed_byte: Optional[int] = 0, + job_speed_record: Optional[int] = 1000, + pre_statements: Optional[List[str]] = None, + post_statements: Optional[List[str]] = None, + xms: Optional[int] = 1, + xmx: Optional[int] = 1, + *args, + **kwargs + ): + super().__init__(name, TaskType.DATAX, *args, **kwargs) + self.sql = sql + self.custom_config = self.CUSTOM_CONFIG + self.datasource_name = datasource_name + self.datatarget_name = datatarget_name + self.target_table = target_table + self.job_speed_byte = job_speed_byte + self.job_speed_record = job_speed_record + self.pre_statements = pre_statements or [] + self.post_statements = post_statements or [] + self.xms = xms + self.xmx = xmx + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for datax task. + + datax task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + datasource = Database(self.datasource_name, "dsType", "dataSource") + params.update(datasource) + + datatarget = Database(self.datatarget_name, "dtType", "dataTarget") + params.update(datatarget) + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py index f4d38f4ab..6383e075a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py @@ -17,11 +17,14 @@ """Task procedure.""" +from typing import Dict + from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.tasks.database import Database +from pydolphinscheduler.core.database import Database +from pydolphinscheduler.core.task import Task -class Procedure(Database): +class Procedure(Task): """Task Procedure object, declare behavior for Procedure task to dolphinscheduler. It should run database procedure job in multiply sql lik engine, such as: @@ -40,5 +43,18 @@ class Procedure(Database): _task_custom_attr = {"method"} def __init__(self, name: str, datasource_name: str, method: str, *args, **kwargs): - super().__init__(TaskType.PROCEDURE, name, datasource_name, *args, **kwargs) + super().__init__(name, TaskType.PROCEDURE, *args, **kwargs) + self.datasource_name = datasource_name self.method = method + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for produce task. + + produce task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + datasource = Database(self.datasource_name, "type", "datasource") + params.update(datasource) + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py index 31980b00b..b5be3e45a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py @@ -18,10 +18,11 @@ """Task sql.""" import re -from typing import Optional +from typing import Dict, Optional from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.tasks.database import Database +from pydolphinscheduler.core.database import Database +from pydolphinscheduler.core.task import Task class SqlType: @@ -31,7 +32,7 @@ class SqlType: NOT_SELECT = 1 -class Sql(Database): +class Sql(Task): """Task SQL object, declare behavior for SQL task to dolphinscheduler. It should run sql job in multiply sql lik engine, such as: @@ -66,8 +67,9 @@ class Sql(Database): *args, **kwargs ): - super().__init__(TaskType.SQL, name, datasource_name, *args, **kwargs) + super().__init__(name, TaskType.SQL, *args, **kwargs) self.sql = sql + self.datasource_name = datasource_name self.pre_statements = pre_statements or [] self.post_statements = post_statements or [] self.display_rows = display_rows @@ -83,3 +85,15 @@ class Sql(Database): return SqlType.NOT_SELECT else: return SqlType.SELECT + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for sql task. + + sql task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + datasource = Database(self.datasource_name, "type", "datasource") + params.update(datasource) + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_database.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_database.py new file mode 100644 index 000000000..1286a4a7f --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_database.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Database.""" + + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.core.database import Database + +TEST_DATABASE_DATASOURCE_NAME = "test_datasource" +TEST_DATABASE_TYPE_KEY = "type" +TEST_DATABASE_KEY = "datasource" + + +@pytest.mark.parametrize( + "expect", + [ + { + TEST_DATABASE_TYPE_KEY: "mock_type", + TEST_DATABASE_KEY: 1, + } + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.core.database.Database.get_database_info", + return_value=({"id": 1, "type": "mock_type"}), +) +def test_get_datasource_detail(mock_datasource, mock_code_version, expect): + """Test :func:`get_database_type` and :func:`get_database_id` can return expect value.""" + database_info = Database( + TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_TYPE_KEY, TEST_DATABASE_KEY + ) + assert expect == database_info diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py deleted file mode 100644 index e14bc5e33..000000000 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py +++ /dev/null @@ -1,122 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Test Task Database.""" - - -from unittest.mock import patch - -import pytest - -from pydolphinscheduler.tasks.database import Database - -TEST_DATABASE_TASK_TYPE = "SQL" -TEST_DATABASE_SQL = "select 1" -TEST_DATABASE_DATASOURCE_NAME = "test_datasource" - - -@patch( - "pydolphinscheduler.core.task.Task.gen_code_and_version", - return_value=(123, 1), -) -@patch( - "pydolphinscheduler.tasks.database.Database.get_datasource_info", - return_value=({"id": 1, "type": "mock_type"}), -) -def test_get_datasource_detail(mock_datasource, mock_code_version): - """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value.""" - name = "test_get_database_detail" - task = Database( - TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL - ) - assert 1 == task.get_datasource_id() - assert "mock_type" == task.get_datasource_type() - - -@pytest.mark.parametrize( - "attr, expect", - [ - ( - { - "task_type": TEST_DATABASE_TASK_TYPE, - "name": "test-task-params", - "datasource_name": TEST_DATABASE_DATASOURCE_NAME, - }, - { - "type": "MYSQL", - "datasource": 1, - "localParams": [], - "resourceList": [], - "dependence": {}, - "waitStartTimeout": {}, - "conditionResult": {"successNode": [""], "failedNode": [""]}, - }, - ) - ], -) -@patch( - "pydolphinscheduler.core.task.Task.gen_code_and_version", - return_value=(123, 1), -) -@patch( - "pydolphinscheduler.tasks.database.Database.get_datasource_info", - return_value=({"id": 1, "type": "MYSQL"}), -) -def test_property_task_params(mock_datasource, mock_code_version, attr, expect): - """Test task database task property.""" - task = Database(**attr) - assert expect == task.task_params - - -@patch( - "pydolphinscheduler.core.task.Task.gen_code_and_version", - return_value=(123, 1), -) -@patch( - "pydolphinscheduler.tasks.database.Database.get_datasource_info", - return_value=({"id": 1, "type": "MYSQL"}), -) -def test_database_get_define(mock_datasource, mock_code_version): - """Test task database function get_define.""" - name = "test_database_get_define" - expect = { - "code": 123, - "name": name, - "version": 1, - "description": None, - "delayTime": 0, - "taskType": TEST_DATABASE_TASK_TYPE, - "taskParams": { - "type": "MYSQL", - "datasource": 1, - "localParams": [], - "resourceList": [], - "dependence": {}, - "conditionResult": {"successNode": [""], "failedNode": [""]}, - "waitStartTimeout": {}, - }, - "flag": "YES", - "taskPriority": "MEDIUM", - "workerGroup": "default", - "failRetryTimes": 0, - "failRetryInterval": 1, - "timeoutFlag": "CLOSE", - "timeoutNotifyStrategy": None, - "timeout": 0, - } - task = Database(TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME) - assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py new file mode 100644 index 000000000..7fa4569ad --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task DataX.""" + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.datax import CustomDataX, DataX + + +@patch( + "pydolphinscheduler.core.database.Database.get_database_info", + return_value=({"id": 1, "type": "MYSQL"}), +) +def test_datax_get_define(mock_datasource): + """Test task datax function get_define.""" + code = 123 + version = 1 + name = "test_datax_get_define" + command = "select name from test_source_table_name" + datasource_name = "test_datasource" + datatarget_name = "test_datatarget" + target_table = "test_target_table_name" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "DATAX", + "taskParams": { + "customConfig": 0, + "dsType": "MYSQL", + "dataSource": 1, + "dtType": "MYSQL", + "dataTarget": 1, + "sql": command, + "targetTable": target_table, + "jobSpeedByte": 0, + "jobSpeedRecord": 1000, + "xms": 1, + "xmx": 1, + "preStatements": [], + "postStatements": [], + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + task = DataX(name, datasource_name, datatarget_name, command, target_table) + assert task.get_define() == expect + + +@pytest.mark.parametrize("json_template", ["json_template"]) +def test_custom_datax_get_define(json_template): + """Test task custom datax function get_define.""" + code = 123 + version = 1 + name = "test_custom_datax_get_define" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "DATAX", + "taskParams": { + "customConfig": 1, + "json": json_template, + "xms": 1, + "xmx": 1, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + task = CustomDataX(name, json_template) + print(task.get_define()) + print(expect) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py index f5d09a3ba..178259395 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py @@ -29,22 +29,6 @@ TEST_PROCEDURE_SQL = ( TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource" -@patch( - "pydolphinscheduler.core.task.Task.gen_code_and_version", - return_value=(123, 1), -) -@patch( - "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info", - return_value=({"id": 1, "type": "mock_type"}), -) -def test_get_datasource_detail(mock_datasource, mock_code_version): - """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value.""" - name = "test_get_datasource_detail" - task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL) - assert 1 == task.get_datasource_id() - assert "mock_type" == task.get_datasource_type() - - @pytest.mark.parametrize( "attr, expect", [ @@ -72,7 +56,7 @@ def test_get_datasource_detail(mock_datasource, mock_code_version): return_value=(123, 1), ) @patch( - "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info", + "pydolphinscheduler.core.database.Database.get_database_info", return_value=({"id": 1, "type": "MYSQL"}), ) def test_property_task_params(mock_datasource, mock_code_version, attr, expect): @@ -86,7 +70,7 @@ def test_property_task_params(mock_datasource, mock_code_version, attr, expect): return_value=(123, 1), ) @patch( - "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info", + "pydolphinscheduler.core.database.Database.get_database_info", return_value=({"id": 1, "type": "MYSQL"}), ) def test_sql_get_define(mock_datasource, mock_code_version): diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py index 6058cce8e..3f8209c23 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py @@ -25,24 +25,6 @@ import pytest from pydolphinscheduler.tasks.sql import Sql, SqlType -@patch( - "pydolphinscheduler.core.task.Task.gen_code_and_version", - return_value=(123, 1), -) -@patch( - "pydolphinscheduler.tasks.sql.Sql.get_datasource_info", - return_value=({"id": 1, "type": "mock_type"}), -) -def test_get_datasource_detail(mock_datasource, mock_code_version): - """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value.""" - name = "test_get_sql_type" - datasource_name = "test_datasource" - sql = "select 1" - task = Sql(name, datasource_name, sql) - assert 1 == task.get_datasource_id() - assert "mock_type" == task.get_datasource_type() - - @pytest.mark.parametrize( "sql, sql_type", [ @@ -69,7 +51,7 @@ def test_get_datasource_detail(mock_datasource, mock_code_version): return_value=(123, 1), ) @patch( - "pydolphinscheduler.tasks.sql.Sql.get_datasource_info", + "pydolphinscheduler.core.database.Database.get_database_info", return_value=({"id": 1, "type": "mock_type"}), ) def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type): @@ -109,7 +91,7 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type): return_value=(123, 1), ) @patch( - "pydolphinscheduler.tasks.sql.Sql.get_datasource_info", + "pydolphinscheduler.core.database.Database.get_database_info", return_value=({"id": 1, "type": "MYSQL"}), ) def test_property_task_params(mock_datasource, mock_code_version, attr, expect): @@ -119,7 +101,7 @@ def test_property_task_params(mock_datasource, mock_code_version, attr, expect): @patch( - "pydolphinscheduler.tasks.sql.Sql.get_datasource_info", + "pydolphinscheduler.core.database.Database.get_database_info", return_value=({"id": 1, "type": "MYSQL"}), ) def test_sql_get_define(mock_datasource):