diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java index 22f675268..2351cca40 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java @@ -44,4 +44,15 @@ public enum UdfType { public String getDescp() { return descp; } + + public static UdfType of(int type){ + for(UdfType ut : values()){ + if(ut.getCode() == type){ + return ut; + } + } + throw new IllegalArgumentException("invalid type : " + type); + } + + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 34d96aa4d..e917dda5f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -18,11 +18,13 @@ package org.apache.dolphinscheduler.server.builder; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import java.util.List; + /** * TaskExecutionContext builder */ @@ -82,6 +84,30 @@ public class TaskExecutionContextBuilder { return this; } + + /** + * build SQLTask related info + * + * @param sqlTaskExecutionContext sqlTaskExecutionContext + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildSQLTaskRelatedInfo(SQLTaskExecutionContext sqlTaskExecutionContext){ + taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext); + return this; + } + + + /** + * build DataxTask related info + * @param dataxTaskExecutionContext dataxTaskExecutionContext + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildDataxTaskRelatedInfo(DataxTaskExecutionContext dataxTaskExecutionContext){ + taskExecutionContext.setDataxTaskExecutionContext(dataxTaskExecutionContext); + return this; + } + + /** * create * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java index b1ec20dd5..97afb4f6d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java @@ -32,6 +32,11 @@ public class SQLTaskExecutionContext implements Serializable { * warningGroupId */ private int warningGroupId; + + /** + * connectionParams + */ + private String connectionParams; /** * udf function list */ @@ -54,10 +59,19 @@ public class SQLTaskExecutionContext implements Serializable { this.udfFuncList = udfFuncList; } + public String getConnectionParams() { + return connectionParams; + } + + public void setConnectionParams(String connectionParams) { + this.connectionParams = connectionParams; + } + @Override public String toString() { return "SQLTaskExecutionContext{" + "warningGroupId=" + warningGroupId + + ", connectionParams='" + connectionParams + '\'' + ", udfFuncList=" + udfFuncList + '}'; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index e3957afc2..ce185a53c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -17,13 +17,23 @@ package org.apache.dolphinscheduler.server.master.consumer; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.UdfType; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskPriority; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -38,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.util.List; /** * TaskUpdateQueue consumer @@ -136,10 +147,45 @@ public class TaskUpdateQueueConsumer extends Thread{ taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.setExecutePath(getExecLocalPath(taskInstance)); + SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); + DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); + + TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); + if (taskType == TaskType.SQL){ + TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class); + int datasourceId = sqlParameters.getDatasource(); + DataSource datasource = processService.findDataSourceById(datasourceId); + sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); + + // whether udf type + boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) + && StringUtils.isNotEmpty(sqlParameters.getUdfs()); + + if (udfTypeFlag){ + String[] udfFunIds = sqlParameters.getUdfs().split(","); + int[] udfFunIdsArray = new int[udfFunIds.length]; + for(int i = 0 ; i < udfFunIds.length;i++){ + udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]); + } + + List udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray); + sqlTaskExecutionContext.setUdfFuncList(udfFuncList); + } + + } + + if (taskType == TaskType.DATAX){ + + } + + return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) + .buildSQLTaskRelatedInfo(sqlTaskExecutionContext) + .buildDataxTaskRelatedInfo(dataxTaskExecutionContext) .create(); } @@ -171,7 +217,4 @@ public class TaskUpdateQueueConsumer extends Thread{ } return false; } - - - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index b0fd6322c..bfbce4fba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -152,7 +152,7 @@ public class MasterBaseTaskExecThread implements Callable { /** - * dispatcht task + * TODO dispatcht task * @param taskInstance taskInstance * @return whether submit task success */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 5e2e535cd..63efb24a3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.UdfFunc; @@ -48,6 +49,11 @@ public class UDFUtils { * @return create function list */ public static List createFuncs(List udfFuncs, String tenantCode,Logger logger){ + + if (CollectionUtils.isEmpty(udfFuncs)){ + logger.info("can't find udf function resource"); + return null; + } // get hive udf jar path String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode); logger.info("hive udf jar path : {}" , hiveUdfJarPath); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index e1872f744..2fadaf156 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -97,7 +97,7 @@ public class WorkerServer { this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); - // + // worker registry this.workerRegistry.registry(); /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 86aed54ac..3ea032f81 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -59,7 +59,7 @@ public abstract class AbstractTask { /** * SHELL process pid */ - protected Integer processId; + protected int processId; /** * other resource manager appId , for example : YARN etc @@ -139,11 +139,11 @@ public abstract class AbstractTask { this.appIds = appIds; } - public Integer getProcessId() { + public int getProcessId() { return processId; } - public void setProcessId(Integer processId) { + public void setProcessId(int processId) { this.processId = processId; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 9fa2abec8..ff8f2e9ed 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -95,7 +95,7 @@ public class ShellTask extends AbstractTask { setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); } catch (Exception e) { - logger.error("shell task failure", e); + logger.error("shell task error", e); setExitStatusCode(Constants.EXIT_CODE_FAILURE); throw e; } @@ -125,8 +125,6 @@ public class ShellTask extends AbstractTask { } String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n"); - - /** * combining local and global parameters */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 9a45c7d09..afff82596 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -105,16 +105,14 @@ public class SqlTask extends AbstractTask { sqlParameters.getUdfs(), sqlParameters.getShowType(), sqlParameters.getConnParams()); - - Connection con = null; - List createFuncs = null; try { + SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); // load class DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType())); // get datasource baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()), - sqlParameters.getConnParams()); + sqlTaskExecutionContext.getConnectionParams()); // ready to execute SQL and parameter entity Map SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); @@ -129,32 +127,18 @@ public class SqlTask extends AbstractTask { .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); - // determine if it is UDF - boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) - && StringUtils.isNotEmpty(sqlParameters.getUdfs()); - if(udfTypeFlag){ - String[] ids = sqlParameters.getUdfs().split(","); - int[] idsArray = new int[ids.length]; - for(int i=0;i createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), + taskExecutionContext.getTenantCode(), + logger); // execute sql task - con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + + setExitStatusCode(Constants.EXIT_CODE_SUCCESS); } catch (Exception e) { - logger.error(e.getMessage(), e); + setExitStatusCode(Constants.EXIT_CODE_FAILURE); + logger.error("sql task error", e); throw e; - } finally { - if (con != null) { - try { - con.close(); - } catch (SQLException e) { - logger.error(e.getMessage(),e); - } - } } } @@ -193,11 +177,11 @@ public class SqlTask extends AbstractTask { setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap); // replace the ${} of the SQL statement with the Placeholder - String formatSql = sql.replaceAll(rgex,"?"); + String formatSql = sql.replaceAll(rgex, "?"); sqlBuilder.append(formatSql); // print repalce sql - printReplacedSql(sql,formatSql,rgex,sqlParamsMap); + printReplacedSql(sql, formatSql, rgex, sqlParamsMap); return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); } @@ -214,107 +198,197 @@ public class SqlTask extends AbstractTask { * @param createFuncs create functions * @return Connection */ - public Connection executeFuncAndSql(SqlBinds mainSqlBinds, + public void executeFuncAndSql(SqlBinds mainSqlBinds, List preStatementsBinds, List postStatementsBinds, List createFuncs){ Connection connection = null; + PreparedStatement stmt = null; + ResultSet resultSet = null; try { // if upload resource is HDFS and kerberos startup CommonUtils.loadKerberosConf(); - // if hive , load connection params if exists - if (HIVE == DbType.valueOf(sqlParameters.getType())) { - Properties paramProp = new Properties(); - paramProp.setProperty(USER, baseDataSource.getUser()); - paramProp.setProperty(PASSWORD, baseDataSource.getPassword()); - Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), - SEMICOLON, - HIVE_CONF); - paramProp.putAll(connParamMap); - connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), - paramProp); - }else{ - connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), - baseDataSource.getUser(), - baseDataSource.getPassword()); - } + // create connection + connection = createConnection(); // create temp function if (CollectionUtils.isNotEmpty(createFuncs)) { - try (Statement funcStmt = connection.createStatement()) { - for (String createFunc : createFuncs) { - logger.info("hive create function sql: {}", createFunc); - funcStmt.execute(createFunc); - } - } + createTempFunction(connection,createFuncs); } - for (SqlBinds sqlBind: preStatementsBinds) { - try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { - int result = stmt.executeUpdate(); - logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql()); - } + // pre sql + preSql(connection,preStatementsBinds); + + + stmt = prepareStatementAndBind(connection, mainSqlBinds); + resultSet = stmt.executeQuery(); + // decide whether to executeQuery or executeUpdate based on sqlType + if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { + // query statements need to be convert to JsonArray and inserted into Alert to send + resultProcess(resultSet); + + } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { + // non query statement + stmt.executeUpdate(); } - try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds); - ResultSet resultSet = stmt.executeQuery()) { - // decide whether to executeQuery or executeUpdate based on sqlType - if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { - // query statements need to be convert to JsonArray and inserted into Alert to send - JSONArray resultJSONArray = new JSONArray(); - ResultSetMetaData md = resultSet.getMetaData(); - int num = md.getColumnCount(); + postSql(connection,postStatementsBinds); - while (resultSet.next()) { - JSONObject mapOfColValues = new JSONObject(true); - for (int i = 1; i <= num; i++) { - mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i)); - } - resultJSONArray.add(mapOfColValues); - } - logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); - - // if there is a result set - if ( !resultJSONArray.isEmpty() ) { - if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { - sendAttachment(sqlParameters.getTitle(), - JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); - }else{ - sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ", - JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); - } - } - - exitStatusCode = 0; - - } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { - // non query statement - stmt.executeUpdate(); - exitStatusCode = 0; - } - } - - for (SqlBinds sqlBind: postStatementsBinds) { - try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { - int result = stmt.executeUpdate(); - logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql()); - } - } } catch (Exception e) { - logger.error(e.getMessage(),e); - throw new RuntimeException(e.getMessage()); + logger.error("execute sql error",e); + throw new RuntimeException("execute sql error"); } finally { - try { - connection.close(); - } catch (Exception e) { - logger.error(e.getMessage(), e); + close(resultSet,stmt,connection); + } + } + + /** + * result process + * + * @param resultSet resultSet + * @throws Exception + */ + private void resultProcess(ResultSet resultSet) throws Exception{ + JSONArray resultJSONArray = new JSONArray(); + ResultSetMetaData md = resultSet.getMetaData(); + int num = md.getColumnCount(); + + while (resultSet.next()) { + JSONObject mapOfColValues = new JSONObject(true); + for (int i = 1; i <= num; i++) { + mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i)); + } + resultJSONArray.add(mapOfColValues); + } + logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); + + // if there is a result set + if (!resultJSONArray.isEmpty() ) { + if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { + sendAttachment(sqlParameters.getTitle(), + JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); + }else{ + sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ", + JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); } } + } + + /** + * pre sql + * + * @param connection connection + * @param preStatementsBinds preStatementsBinds + */ + private void preSql(Connection connection, + List preStatementsBinds) throws Exception{ + for (SqlBinds sqlBind: preStatementsBinds) { + try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){ + int result = pstmt.executeUpdate(); + logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql()); + + } + } + } + + /** + * post psql + * + * @param connection connection + * @param postStatementsBinds postStatementsBinds + * @throws Exception + */ + private void postSql(Connection connection, + List postStatementsBinds) throws Exception{ + for (SqlBinds sqlBind: postStatementsBinds) { + try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){ + int result = pstmt.executeUpdate(); + logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql()); + } + } + } + /** + * create temp function + * + * @param connection connection + * @param createFuncs createFuncs + * @throws Exception + */ + private void createTempFunction(Connection connection, + List createFuncs) throws Exception{ + try (Statement funcStmt = connection.createStatement()) { + for (String createFunc : createFuncs) { + logger.info("hive create function sql: {}", createFunc); + funcStmt.execute(createFunc); + } + } + } + /** + * create connection + * + * @return connection + * @throws Exception + */ + private Connection createConnection() throws Exception{ + // if hive , load connection params if exists + Connection connection = null; + if (HIVE == DbType.valueOf(sqlParameters.getType())) { + Properties paramProp = new Properties(); + paramProp.setProperty(USER, baseDataSource.getUser()); + paramProp.setProperty(PASSWORD, baseDataSource.getPassword()); + Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), + SEMICOLON, + HIVE_CONF); + paramProp.putAll(connParamMap); + + connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), + paramProp); + }else{ + connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), + baseDataSource.getUser(), + baseDataSource.getPassword()); + + } return connection; } + /** + * close jdbc resource + * + * @param resultSet resultSet + * @param pstmt pstmt + * @param connection connection + */ + private void close(ResultSet resultSet, + PreparedStatement pstmt, + Connection connection){ + if (resultSet != null){ + try { + connection.close(); + } catch (SQLException e) { + + } + } + + if (pstmt != null){ + try { + connection.close(); + } catch (SQLException e) { + + } + } + + if (connection != null){ + try { + connection.close(); + } catch (SQLException e) { + + } + } + } + /** * preparedStatement bind * @param connection @@ -326,20 +400,19 @@ public class SqlTask extends AbstractTask { // is the timeout set boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED || TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; - try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) { - if(timeoutFlag){ - stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); - } - Map params = sqlBinds.getParamsMap(); - if(params != null) { - for (Map.Entry entry : params.entrySet()) { - Property prop = entry.getValue(); - ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); - } - } - logger.info("prepare statement replace sql : {} ", stmt); - return stmt; + PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql()); + if(timeoutFlag){ + stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); } + Map params = sqlBinds.getParamsMap(); + if(params != null) { + for (Map.Entry entry : params.entrySet()) { + Property prop = entry.getValue(); + ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); + } + } + logger.info("prepare statement replace sql : {} ", stmt); + return stmt; } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 81c523c2e..5c4d0baa6 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1513,7 +1513,6 @@ public class ProcessService { * @return udf function list */ public List queryUdfFunListByids(int[] ids){ - return udfFuncMapper.queryUdfByIdStr(ids, null); }