diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java new file mode 100644 index 000000000..c2f22c5d7 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR; + +import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; +import org.apache.dolphinscheduler.api.service.TaskGroupService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import springfox.documentation.annotations.ApiIgnore; + + +/** + * task group controller + */ +@Api(tags = "task group") +@RestController +@RequestMapping("/task-group") +public class TaskGroupController extends BaseController { + + @Autowired + private TaskGroupService taskGroupService; + + /** + * query task group list + * + * @param loginUser login user + * @param name name + * @param description description + * @param groupSize group size + * @param name project id + * @return result and msg code + */ + @ApiOperation(value = "createTaskGroup", notes = "CREATE_TAKS_GROUP_NOTE") + @ApiImplicitParams({ + @ApiImplicitParam(name = "name", value = "NAME", dataType = "String"), + @ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"), + @ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"), + + }) + @PostMapping(value = "/create") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(CREATE_TASK_GROUP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result createTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("name") String name, + @RequestParam("description") String description, + @RequestParam("groupSize") Integer groupSize) { + Map result = taskGroupService.createTaskGroup(loginUser, name, description, groupSize); + return returnDataList(result); + } + + /** + * update task group list + * + * @param loginUser login user + * @param name name + * @param description description + * @param groupSize group size + * @param name project id + * @return result and msg code + */ + @ApiOperation(value = "updateTaskGroup", notes = "UPDATE_TAKS_GROUP_NOTE") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "id", dataType = "Int"), + @ApiImplicitParam(name = "name", value = "NAME", dataType = "String"), + @ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"), + @ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"), + + }) + @PostMapping(value = "/update") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(UPDATE_TASK_GROUP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result updateTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("id") Integer id, + @RequestParam("name") String name, + @RequestParam("description") String description, + @RequestParam("groupSize") Integer groupSize) { + Map result = taskGroupService.updateTaskGroup(loginUser, id, name, description, groupSize); + return returnDataList(result); + } + + /** + * query task group list paging + * + * @param loginUser login user + * @param pageNo page number + * @param pageSize page size + * @return queue list + */ + @ApiOperation(value = "queryAllTaskGroup", notes = "QUERY_ALL_TASK_GROUP_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20") + }) + @GetMapping(value = "/query-list-all") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_GROUP_LIST_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryAllTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize) { + Map result = taskGroupService.queryAllTaskGroup(loginUser, pageNo, pageSize); + return returnDataList(result); + } + + /** + * query task group list paging + * + * @param loginUser login user + * @param pageNo page number + * @param status status + * @param pageSize page size + * @return queue list + */ + @ApiOperation(value = "queryTaskGroupByStatus", notes = "QUERY_TASK_GROUP_LIST_BY_STSATUS_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"), + @ApiImplicitParam(name = "status", value = "status", required = true, dataType = "Int") + }) + @GetMapping(value = "/query-list-by-status") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_GROUP_LIST_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryTaskGroupByStatus(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("pageNo") Integer pageNo, + @RequestParam(value = "status", required = false) Integer status, + @RequestParam("pageSize") Integer pageSize) { + Map result = taskGroupService.queryTaskGroupByStatus(loginUser, pageNo, pageSize, status); + return returnDataList(result); + } + + /** + * query task group list paging by project id + * + * @param loginUser login user + * @param pageNo page number + * @param name project id + * @param pageSize page size + * @return queue list + */ + @ApiOperation(value = "queryTaskGroupByName", notes = "QUERY_TASK_GROUP_LIST_BY_PROJECT_ID_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"), + @ApiImplicitParam(name = "name", value = "PROJECT_ID", required = true, dataType = "String") + }) + @GetMapping(value = "/query-list-by-name") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_GROUP_LIST_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryTaskGroupByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("pageNo") Integer pageNo, + @RequestParam(value = "name", required = false) String name, + @RequestParam("pageSize") Integer pageSize) { + Map result = taskGroupService.queryTaskGroupByName(loginUser, pageNo, pageSize, name); + return returnDataList(result); + } + + /** + * close a task group + * + * @param loginUser login user + * @param id id + * @return result + */ + @ApiOperation(value = "closeTaskGroup", notes = "CLOSE_TASK_GROUP_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "Int") + }) + @PostMapping(value = "/close-task-group") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(CLOSE_TASK_GROUP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result closeTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id", required = false) Integer id) { + + Map result = taskGroupService.closeTaskGroup(loginUser, id); + return returnDataList(result); + } + + /** + * start a task group + * + * @param loginUser login user + * @param id id + * @return result + */ + @ApiOperation(value = "startTaskGroup", notes = "START_TASK_GROUP_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "Int") + }) + @PostMapping(value = "/start-task-group") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(START_TASK_GROUP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result startTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id", required = false) Integer id) { + Map result = taskGroupService.startTaskGroup(loginUser, id); + return returnDataList(result); + } + + /** + * force start task without task group + * + * @param loginUser login user + * @param taskId task id + * @return result + */ + @ApiOperation(value = "wakeCompulsively", notes = "WAKE_TASK_COMPULSIVELY_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "taskId", value = "TASKID", required = true, dataType = "Int") + }) + @PostMapping(value = "/wake-task-compulsively") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(START_TASK_GROUP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result wakeCompulsively(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskId") Integer taskId) { + Map result = taskGroupService.wakeTaskcompulsively(loginUser, taskId); + return returnDataList(result); + } + + @Autowired + private TaskGroupQueueService taskGroupQueueService; + + /** + * query task group queue list paging + * + * @param loginUser login user + * @param pageNo page number + * @param pageSize page size + * @return queue list + */ + @ApiOperation(value = "queryTasksByGroupId", notes = "QUERY_ALL_TASKS_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", value = "GROUP_ID", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20") + }) + @GetMapping(value = "/query-list-by-group-id") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryTasksByGroupId(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("groupId") Integer groupId, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize) { + Map result = taskGroupQueueService.queryTasksByGroupId(loginUser, groupId, pageNo, pageSize); + return returnDataList(result); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 47452a24a..624e9f9ef 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -340,10 +340,26 @@ public enum Status { QUERY_ENVIRONMENT_BY_CODE_ERROR(1200009, "not found environment [{0}] ", "查询环境编码[{0}]不存在"), QUERY_ENVIRONMENT_ERROR(1200010, "login user query environment error", "分页查询环境列表错误"), VERIFY_ENVIRONMENT_ERROR(1200011, "verify environment error", "验证环境信息错误"), - ENVIRONMENT_WORKER_GROUPS_IS_INVALID(1200012, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"), - UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(1200013,"You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]", - "您不能修改工作组选项,因为该工作组 [{0}] 和 该环境 [{1}] 已经被用在任务 [{2}] 中"); + TASK_GROUP_NAME_EXSIT(130001,"this task group name is repeated in a project","该任务组名称在一个项目中已经使用"), + TASK_GROUP_SIZE_ERROR(130002,"task group size error","任务组大小应该为大于1的整数"), + TASK_GROUP_STATUS_ERROR(130003,"task group status error","任务组已经被关闭"), + TASK_GROUP_FULL(130004,"task group is full","任务组已经满了"), + TASK_GROUP_USED_SIZE_ERROR(130005,"the used size number of task group is dirty","任务组使用的容量发生了变化"), + TASK_GROUP_QUEUE_RELEASE_ERROR(130006,"relase task group queue failed","任务组资源释放时出现了错误"), + TASK_GROUP_QUEUE_AWAKE_ERROR(130007,"awake waiting task failed","任务组使唤醒等待任务时发生了错误"), + CREATE_TASK_GROUP_ERROR(130008,"create task group error","创建任务组错误"), + UPDATE_TASK_GROUP_ERROR(130009,"update task group list error","更新任务组错误"), + QUERY_TASK_GROUP_LIST_ERROR(130010,"query task group list error","查询任务组列表错误"), + CLOSE_TASK_GROUP_ERROR(130011,"close task group error","关闭任务组错误"), + START_TASK_GROUP_ERROR(130012,"start task group error","启动任务组错误"), + QUERY_TASK_GROUP_QUEUE_LIST_ERROR(130013,"query task group queue list error","查询任务组队列列表错误"), + TASK_GROUP_CACHE_START_FAILED(130014,"cache start failed","任务组相关的缓存启动失败"), + ENVIRONMENT_WORKER_GROUPS_IS_INVALID(130015, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"), + UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(130016,"You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]", + "您不能修改工作组选项,因为该工作组 [{0}] 和 该环境 [{1}] 已经被用在任务 [{2}] 中"), + TASK_GROUP_QUEUE_ALREADY_START(130017, "task group queue already start", "节点已经获取任务组资源") + ; private final int code; private final String enMsg; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java new file mode 100644 index 000000000..08d3f5721 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; + +/** + * task group queue service + */ +public interface TaskGroupQueueService { + + /** + * query tasks in task group queue by group id + * @param loginUser login user + * @param groupId group id + * @param pageNo page no + * @param pageSize page size + + * @return tasks list + */ + Map queryTasksByGroupId(User loginUser, int groupId, int pageNo, + int pageSize); + + /** + * query tasks in task group queue by project id + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param processId process id + * @return tasks list + */ + Map queryTasksByProcessId(User loginUser, int pageNo, + int pageSize, int processId); + + /** + * query all tasks in task group queue + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @return tasks list + */ + Map queryAllTasks(User loginUser, int pageNo, int pageSize); + + /** + * delete by task id + * @param taskId task id + * @return TaskGroupQueue entity + */ + boolean deleteByTaskId(int taskId); + + void forceStartTask(int taskId,int forceStart); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java new file mode 100644 index 000000000..1a73b04ce --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; + +/** + * task group service + */ +public interface TaskGroupService { + + /** + * create a Task group + * + * @param loginUser login user + * @param name task group name + * @param description task group description + * @param groupSize task group total size + * @return the result code and msg + */ + Map createTaskGroup(User loginUser, String name, + String description, int groupSize); + + /** + * update the task group + * + * @param loginUser login user + * @param name task group name + * @param description task group description + * @param groupSize task group total size + * @return the result code and msg + */ + Map updateTaskGroup(User loginUser, int id, String name, + String description, int groupSize); + + /** + * get task group status + * + * @param id task group id + * @return the result code and msg + */ + boolean isTheTaskGroupAvailable(int id); + + /** + * query all task group by user id + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @return the result code and msg + */ + Map queryAllTaskGroup(User loginUser, int pageNo, int pageSize); + + /** + * query all task group by status + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param status status + * @return the result code and msg + */ + Map queryTaskGroupByStatus(User loginUser, int pageNo, int pageSize, int status); + + /** + * query all task group by name + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param name name + * @return the result code and msg + */ + Map queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name); + + /** + * query all task group by id + * + * @param loginUser login user + * @param id id + * @return the result code and msg + */ + Map queryTaskGroupById(User loginUser, int id); + + /** + * query + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param userId user id + * @param name name + * @param status status + * @return the result code and msg + */ + Map doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status); + + /** + * close a task group + * + * @param loginUser login user + * @param id task group id + * @return the result code and msg + */ + Map closeTaskGroup(User loginUser, int id); + + /** + * start a task group + * + * @param loginUser login user + * @param id task group id + * @return the result code and msg + */ + Map startTaskGroup(User loginUser, int id); + + /** + * wake a task manually + * + * @param taskId task id + * @return result + */ + Map wakeTaskcompulsively(User loginUser, int taskId); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 88530b8d8..51d78f2b8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -24,7 +24,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ExecutorService; @@ -34,10 +33,12 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -47,6 +48,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; @@ -73,6 +75,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.fasterxml.jackson.core.type.TypeReference; + /** * executor service impl */ @@ -403,6 +407,27 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } + /** + * prepare to update process instance command type and status + * + * @param processInstance process instance + * @return update result + */ + private Map forceStartTaskInstance(ProcessInstance processInstance, int taskId) { + Map result = new HashMap<>(); + TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(taskId); + if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) { + putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START); + return result; + } + taskGroupQueue.setForceStart(Flag.YES.getCode()); + processService.updateTaskGroupQueue(taskGroupQueue); + processService.sendStartTask2Master(processInstance,taskId + ,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST); + putMsg(result, Status.SUCCESS); + return result; + } + /** * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java new file mode 100644 index 000000000..3dfe3110f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * task group queue service + */ +@Service +public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGroupQueueService { + + @Autowired + TaskGroupQueueMapper taskGroupQueueMapper; + + @Autowired + private TaskInstanceMapper taskInstanceMapper; + + private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceImpl.class); + + /** + * query tasks in task group queue by group id + * + * @param loginUser login user + * @param groupId group id + * @param pageNo page no + * @param pageSize page size + * @return tasks list + */ + @Override + public Map queryTasksByGroupId(User loginUser, int groupId, int pageNo, int pageSize) { + return this.doQuery(loginUser, pageNo, pageSize, groupId); + } + + /** + * query tasks in task group queue by project id + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param processId process id + * @return tasks list + */ + @Override + public Map queryTasksByProcessId(User loginUser, int pageNo, int pageSize, int processId) { + return this.doQuery(loginUser, pageNo, pageSize, processId); + } + + /** + * query all tasks in task group queue + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @return tasks list + */ + @Override + public Map queryAllTasks(User loginUser, int pageNo, int pageSize) { + return this.doQuery(loginUser, pageNo, pageSize, 0); + } + + public Map doQuery(User loginUser, int pageNo, int pageSize, + int groupId) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + Page page = new Page<>(pageNo, pageSize); + IPage taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueuePaging(page, groupId); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotal((int) taskGroupQueue.getTotal()); + pageInfo.setTotalList(taskGroupQueue.getRecords()); + + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * delete by task id + * + * @param taskId task id + * @return TaskGroupQueue entity + */ + + @Override + public boolean deleteByTaskId(int taskId) { + return taskGroupQueueMapper.deleteByTaskId(taskId) == 1; + } + + @Override + public void forceStartTask(int taskId,int forceStart) { + taskGroupQueueMapper.updateForceStart(taskId,forceStart); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java new file mode 100644 index 000000000..7802850e5 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; +import org.apache.dolphinscheduler.api.service.TaskGroupService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * task Group Service + */ +@Service +public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupService { + + @Autowired + private TaskGroupMapper taskGroupMapper; + + @Autowired + private TaskGroupQueueService taskGroupQueueService; + + @Autowired + private ProcessService processService; + + private static final Logger logger = LoggerFactory.getLogger(TaskGroupServiceImpl.class); + + /** + * create a Task group + * + * @param loginUser login user + * @param name task group name + * @param description task group description + * @param groupSize task group total size + * @return the result code and msg + */ + @Override + public Map createTaskGroup(User loginUser, String name, String description, int groupSize) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + if (name == null) { + putMsg(result, Status.NAME_NULL); + return result; + } + if (groupSize <= 0) { + putMsg(result, Status.TASK_GROUP_SIZE_ERROR); + return result; + + } + TaskGroup taskGroup1 = taskGroupMapper.queryByName(loginUser.getId(), name); + if (taskGroup1 != null) { + putMsg(result, Status.TASK_GROUP_NAME_EXSIT); + return result; + } + TaskGroup taskGroup = new TaskGroup(name, description, + groupSize, loginUser.getId(),Flag.YES.getCode()); + int insert = taskGroupMapper.insert(taskGroup); + logger.info("insert result:{}", insert); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * update the task group + * + * @param loginUser login user + * @param name task group name + * @param description task group description + * @param groupSize task group total size + * @return the result code and msg + */ + @Override + public Map updateTaskGroup(User loginUser, int id, String name, String description, int groupSize) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + TaskGroup taskGroup = taskGroupMapper.selectById(id); + if (taskGroup.getStatus() != Flag.YES.getCode()) { + putMsg(result, Status.TASK_GROUP_STATUS_ERROR); + return result; + } + taskGroup.setGroupSize(groupSize); + taskGroup.setDescription(description); + if (StringUtils.isNotEmpty(name)) { + taskGroup.setName(name); + } + int i = taskGroupMapper.updateById(taskGroup); + logger.info("update result:{}", i); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * get task group status + * + * @param id task group id + * @return is the task group available + */ + @Override + public boolean isTheTaskGroupAvailable(int id) { + return taskGroupMapper.selectCountByIdStatus(id,Flag.YES.getCode()) == 1; + } + + /** + * query all task group by user id + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @return the result code and msg + */ + @Override + public Map queryAllTaskGroup(User loginUser, int pageNo, int pageSize) { + return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, 0); + } + + /** + * query all task group by status + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param status status + * @return the result code and msg + */ + @Override + public Map queryTaskGroupByStatus(User loginUser, int pageNo, int pageSize, int status) { + return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, status); + } + + /** + * query all task group by name + * + * @param loginUser login user + * @param pageNo page no + * @param pageSize page size + * @param name name + * @return the result code and msg + */ + @Override + public Map queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name) { + return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, 0); + } + + /** + * query all task group by id + * + * @param loginUser login user + * @param id id + * @return the result code and msg + */ + @Override + public Map queryTaskGroupById(User loginUser, int id) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + TaskGroup taskGroup = taskGroupMapper.selectById(id); + result.put(Constants.DATA_LIST, taskGroup); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query + * + * @param pageNo page no + * @param pageSize page size + * @param userId user id + * @param name name + * @param status status + * @return the result code and msg + */ + + @Override + public Map doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + Page page = new Page<>(pageNo, pageSize); + IPage taskGroupPaging = taskGroupMapper.queryTaskGroupPaging(page, userId, name, status); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal(); + List list = taskGroupPaging == null ? new ArrayList() : taskGroupPaging.getRecords(); + pageInfo.setTotal(total); + pageInfo.setTotalList(list); + + result.put(Constants.DATA_LIST, pageInfo); + logger.info("select result:{}", taskGroupPaging); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * close a task group + * + * @param loginUser login user + * @param id task group id + * @return the result code and msg + */ + + @Override + public Map closeTaskGroup(User loginUser, int id) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + TaskGroup taskGroup = taskGroupMapper.selectById(id); + taskGroup.setStatus(Flag.NO.getCode()); + taskGroupMapper.updateById(taskGroup); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * start a task group + * + * @param loginUser login user + * @param id task group id + * @return the result code and msg + */ + @Override + public Map startTaskGroup(User loginUser, int id) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + TaskGroup taskGroup = taskGroupMapper.selectById(id); + if (taskGroup.getStatus() == 1) { + putMsg(result, Status.TASK_GROUP_STATUS_ERROR); + return result; + } + taskGroup.setStatus(1); + taskGroup.setUpdateTime(new Date(System.currentTimeMillis())); + int update = taskGroupMapper.updateById(taskGroup); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * wake a task manually + * + * @param loginUser + * @param taskId task id + * @return result + */ + @Override + public Map wakeTaskcompulsively(User loginUser, int taskId) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + taskGroupQueueService.forceStartTask(taskId,Flag.YES.getCode()); + putMsg(result, Status.SUCCESS); + return result; + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java new file mode 100644 index 000000000..06e7fc74b --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * queue controller test + */ +public class TaskGroupControllerTest extends AbstractControllerTest { + + private static Logger logger = LoggerFactory.getLogger(TaskGroupControllerTest.class); + + private static final String QUEUE_CREATE_STRING = "queue1"; + + @Test + public void testQueryListAll() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("pageNo", "2"); + paramsMap.add("pageSize", "2"); + MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-all") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString()); + } + + @Test + public void testQueryByName() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("pageNo", "1"); + paramsMap.add("name", "TGQ"); + paramsMap.add("pageSize", "10"); + MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-name") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString()); + } + + @Test + public void testQueryByStatus() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("pageNo", "1"); + paramsMap.add("status", "1"); + paramsMap.add("pageSize", "10"); + MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-status") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString()); + } + + @Test + public void testCreateTaskGroup() throws Exception { + + // success + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("name", "TGQ1"); + paramsMap.add("description", "this is a task group queue!"); + paramsMap.add("groupSize", "10"); + + MvcResult mvcResult = mockMvc.perform(post("/task-group/create") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("create queue return result:{}", mvcResult.getResponse().getContentAsString()); + // failed + // name exists + paramsMap.clear(); + paramsMap.add("name", "TGQ1"); + paramsMap.add("description", "this is a task group queue!"); + paramsMap.add("groupSize", "10"); + + MvcResult mvcResult1 = mockMvc.perform(post("/task-group/create") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result1 = JSONUtils.parseObject(mvcResult1.getResponse().getContentAsString(), Result.class); + Assert.assertTrue(result1 != null && result1.isFailed()); + logger.info("create queue return result:{}", mvcResult1.getResponse().getContentAsString()); + } + + @Test + public void testUpdateTaskGroup() throws Exception { + + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("id", "1"); + paramsMap.add("name", "TGQ11"); + paramsMap.add("description", "this is a task group queue!"); + paramsMap.add("groupSize", "10"); + + MvcResult mvcResult = mockMvc.perform(post("/task-group/update") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + } + + @Test + public void testCloseAndStartTaskGroup() throws Exception { + + // close + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("id", "1"); + MvcResult mvcResult = mockMvc.perform(post("/task-group/close-task-group") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + + // start + paramsMap.clear(); + paramsMap.add("id", "1"); + MvcResult mvcResult1 = mockMvc.perform(post("/task-group/start-task-group") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result1 = JSONUtils.parseObject(mvcResult1.getResponse().getContentAsString(), Result.class); + logger.info("update queue return result:{}", mvcResult1.getResponse().getContentAsString()); + Assert.assertTrue(result1 != null && result1.isSuccess()); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + } + + @Test + public void testWakeCompulsively() throws Exception { + + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("id", "1"); + paramsMap.add("taskId", "1"); + + MvcResult mvcResult = mockMvc.perform(post("/task-group/wake-task-compulsively") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + Assert.assertTrue(result != null && (result.isSuccess() || result.isStatus(Status.TASK_GROUP_CACHE_START_FAILED))); + logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupQueueControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupQueueControllerTest.java new file mode 100644 index 000000000..38cd1397d --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupQueueControllerTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * queue controller test + */ +public class TaskGroupQueueControllerTest extends AbstractControllerTest { + + private static Logger logger = LoggerFactory.getLogger(TaskGroupQueueControllerTest.class); + + private static final String QUEUE_CREATE_STRING = "queue1"; + + @Test + public void queryTasksByGroupId() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("groupId", "1"); + paramsMap.add("pageNo", "1"); + paramsMap.add("pageSize", "10"); + MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-group-id") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertTrue(result != null && result.isSuccess()); + logger.info("query list queue return result:{}", mvcResult.getResponse().getContentAsString()); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueServiceTest.java new file mode 100644 index 000000000..cf5dd7cc3 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueServiceTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.service.impl.TaskGroupQueueServiceImpl; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; + +import org.apache.commons.collections.CollectionUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * project service test + **/ +@RunWith(MockitoJUnitRunner.class) +public class TaskGroupQueueServiceTest { + + private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceTest.class); + + @InjectMocks + private TaskGroupQueueServiceImpl taskGroupQueueService; + + @Mock + private TaskGroupQueueMapper taskGroupQueueMapper; + + private String userName = "test"; + + private String taskName = "taskGroupQueueServiceTest"; + + /** + * create admin user + */ + private User getLoginUser() { + User loginUser = new User(); + loginUser.setUserType(UserType.ADMIN_USER); + loginUser.setUserName(userName); + loginUser.setId(1); + return loginUser; + } + + private TaskGroupQueue getTaskGroupQueue() { + TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); + taskGroupQueue.setTaskName(taskName); + taskGroupQueue.setId(1); + taskGroupQueue.setGroupId(1); + taskGroupQueue.setTaskId(1); + taskGroupQueue.setPriority(1); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + Date date = new Date(System.currentTimeMillis()); + taskGroupQueue.setUpdateTime(date); + taskGroupQueue.setCreateTime(date); + return taskGroupQueue; + } + + @Test + public void testDoQuery() { + User user = getLoginUser(); + IPage page = new Page<>(1, 10); + page.setTotal(1L); + List records = new ArrayList<>(); + records.add(getTaskGroupQueue()); + page.setRecords(records); + Mockito.when(taskGroupQueueMapper.queryTaskGroupQueuePaging(Mockito.any(Page.class), Mockito.eq(10))).thenReturn(page); + Map result = taskGroupQueueService.doQuery(user, 1, 1, 10); + PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST); + Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList())); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java new file mode 100644 index 000000000..4ba950bb2 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.TaskGroupServiceImpl; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * project service test + **/ +@RunWith(MockitoJUnitRunner.Silent.class) +public class TaskGroupServiceTest { + + private static final Logger logger = LoggerFactory.getLogger(TaskGroupServiceTest.class); + + @InjectMocks + private TaskGroupServiceImpl taskGroupService; + + @Mock + private TaskGroupQueueService taskGroupQueueService; + + @Mock + private ProcessService processService; + + @Mock + private TaskGroupMapper taskGroupMapper; + + @Mock + private TaskGroupQueueMapper taskGroupQueueMapper; + + @Mock + private UserMapper userMapper; + + private String taskGroupName = "TaskGroupServiceTest"; + + private String taskGroupDesc = "this is a task group"; + + private String userName = "taskGroupServiceTest"; + + /** + * create admin user + */ + private User getLoginUser() { + User loginUser = new User(); + loginUser.setUserType(UserType.ADMIN_USER); + loginUser.setUserName(userName); + loginUser.setId(1); + return loginUser; + } + + private TaskGroup getTaskGroup() { + TaskGroup taskGroup = new TaskGroup(taskGroupName, taskGroupDesc, + 100, 1,1); + return taskGroup; + } + + private List getList() { + List list = new ArrayList<>(); + list.add(getTaskGroup()); + return list; + } + + @Test + public void testCreate() { + User loginUser = getLoginUser(); + TaskGroup taskGroup = getTaskGroup(); + Mockito.when(taskGroupMapper.insert(taskGroup)).thenReturn(1); + Mockito.when(taskGroupMapper.queryByName(loginUser.getId(), taskGroupName)).thenReturn(null); + Map result = taskGroupService.createTaskGroup(loginUser, taskGroupName, taskGroupDesc, 100); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + + } + + @Test + public void testQueryById() { + User loginUser = getLoginUser(); + TaskGroup taskGroup = getTaskGroup(); + Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup); + Map result = taskGroupService.queryTaskGroupById(loginUser, 1); + Assert.assertNotNull(result.get(Constants.DATA_LIST)); + } + + @Test + public void testQueryProjectListPaging() { + + IPage page = new Page<>(1, 10); + page.setRecords(getList()); + User loginUser = getLoginUser(); + Mockito.when(taskGroupMapper.queryTaskGroupPaging(Mockito.any(Page.class), Mockito.eq(10), + Mockito.eq(null), Mockito.eq(0))).thenReturn(page); + + // query all + Map result = taskGroupService.queryAllTaskGroup(loginUser, 1, 10); + PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST); + Assert.assertNotNull(pageInfo.getTotalList()); + } + + @Test + public void testUpdate() { + + User loginUser = getLoginUser(); + TaskGroup taskGroup = getTaskGroup(); + taskGroup.setStatus(Flag.YES.getCode()); + // Task group status error + Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup); + Map result = taskGroupService.updateTaskGroup(loginUser, 1, "newName", "desc", 100); + logger.info(result.toString()); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + + taskGroup.setStatus(0); + } + + @Test + public void testCloseAndStart() { + + User loginUser = getLoginUser(); + TaskGroup taskGroup = getTaskGroup(); + Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup); + + //close failed + Map result1 = taskGroupService.closeTaskGroup(loginUser, 1); + Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); + } + + @Test + public void testWakeTaskFroceManually() { + + TreeMap tm = new TreeMap<>(); + tm.put(1, 1); + Map map1 = taskGroupService.wakeTaskcompulsively(getLoginUser(), 1); + Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS)); + + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index bf93fbed8..fc795d4b0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -24,7 +24,9 @@ public enum StateEventType { PROCESS_STATE_CHANGE(0, "process statechange"), TASK_STATE_CHANGE(1, "task state change"), PROCESS_TIMEOUT(2, "process timeout"), - TASK_TIMEOUT(3, "task timeout"); + TASK_TIMEOUT(3, "task timeout"), + WAIT_TASK_GROUP(4, "wait task group"), + ; StateEventType(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java new file mode 100644 index 000000000..08ee74d4c --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import java.util.HashMap; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +/** + * running status for task group queue + */ +public enum TaskGroupQueueStatus { + + WAIT_QUEUE(-1, "wait queue"), + ACQUIRE_SUCCESS(1, "acquire success"), + RELEASE(2, "release"); + + @EnumValue + private final int code; + private final String descp; + private static HashMap STATUS_MAP = new HashMap<>(); + + static { + for (TaskGroupQueueStatus taskGroupQueueStatus : TaskGroupQueueStatus.values()) { + STATUS_MAP.put(taskGroupQueueStatus.code, taskGroupQueueStatus); + } + } + + TaskGroupQueueStatus(int code, String descp) { + this.code = code; + this.descp = descp; + } + + public static TaskGroupQueueStatus of(int status) { + if (STATUS_MAP.containsKey(status)) { + return STATUS_MAP.get(status); + } + throw new IllegalArgumentException("invalid status : " + status); + } + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 92a3fa8b8..6c6e8ff0e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -195,6 +195,11 @@ public class TaskDefinition { @TableField(exist = false) private String modifyBy; + /** + * task group id + */ + private int taskGroupId; + public TaskDefinition() { } @@ -203,6 +208,14 @@ public class TaskDefinition { this.version = version; } + public int getTaskGroupId() { + return taskGroupId; + } + + public void setTaskGroupId(int taskGroupId) { + this.taskGroupId = taskGroupId; + } + public String getName() { return name; } @@ -442,51 +455,51 @@ public class TaskDefinition { } TaskDefinition that = (TaskDefinition) o; return failRetryTimes == that.failRetryTimes - && failRetryInterval == that.failRetryInterval - && timeout == that.timeout - && delayTime == that.delayTime - && Objects.equals(name, that.name) - && Objects.equals(description, that.description) - && Objects.equals(taskType, that.taskType) - && Objects.equals(taskParams, that.taskParams) - && flag == that.flag - && taskPriority == that.taskPriority - && Objects.equals(workerGroup, that.workerGroup) - && timeoutFlag == that.timeoutFlag - && timeoutNotifyStrategy == that.timeoutNotifyStrategy - && Objects.equals(resourceIds, that.resourceIds) - && environmentCode == that.environmentCode; + && failRetryInterval == that.failRetryInterval + && timeout == that.timeout + && delayTime == that.delayTime + && Objects.equals(name, that.name) + && Objects.equals(description, that.description) + && Objects.equals(taskType, that.taskType) + && Objects.equals(taskParams, that.taskParams) + && flag == that.flag + && taskPriority == that.taskPriority + && Objects.equals(workerGroup, that.workerGroup) + && timeoutFlag == that.timeoutFlag + && timeoutNotifyStrategy == that.timeoutNotifyStrategy + && Objects.equals(resourceIds, that.resourceIds) + && environmentCode == that.environmentCode; } @Override public String toString() { return "TaskDefinition{" - + "id=" + id - + ", code=" + code - + ", name='" + name + '\'' - + ", version=" + version - + ", description='" + description + '\'' - + ", projectCode=" + projectCode - + ", userId=" + userId - + ", taskType=" + taskType - + ", taskParams='" + taskParams + '\'' - + ", taskParamList=" + taskParamList - + ", taskParamMap=" + taskParamMap - + ", flag=" + flag - + ", taskPriority=" + taskPriority - + ", userName='" + userName + '\'' - + ", projectName='" + projectName + '\'' - + ", workerGroup='" + workerGroup + '\'' - + ", failRetryTimes=" + failRetryTimes - + ", environmentCode='" + environmentCode + '\'' - + ", failRetryInterval=" + failRetryInterval - + ", timeoutFlag=" + timeoutFlag - + ", timeoutNotifyStrategy=" + timeoutNotifyStrategy - + ", timeout=" + timeout - + ", delayTime=" + delayTime - + ", resourceIds='" + resourceIds + '\'' - + ", createTime=" + createTime - + ", updateTime=" + updateTime - + '}'; + + "id=" + id + + ", code=" + code + + ", name='" + name + '\'' + + ", version=" + version + + ", description='" + description + '\'' + + ", projectCode=" + projectCode + + ", userId=" + userId + + ", taskType=" + taskType + + ", taskParams='" + taskParams + '\'' + + ", taskParamList=" + taskParamList + + ", taskParamMap=" + taskParamMap + + ", flag=" + flag + + ", taskPriority=" + taskPriority + + ", userName='" + userName + '\'' + + ", projectName='" + projectName + '\'' + + ", workerGroup='" + workerGroup + '\'' + + ", failRetryTimes=" + failRetryTimes + + ", environmentCode='" + environmentCode + '\'' + + ", failRetryInterval=" + failRetryInterval + + ", timeoutFlag=" + timeoutFlag + + ", timeoutNotifyStrategy=" + timeoutNotifyStrategy + + ", timeout=" + timeout + + ", delayTime=" + delayTime + + ", resourceIds='" + resourceIds + '\'' + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java new file mode 100644 index 000000000..b7692c3ab --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * Task Group + */ +@TableName("t_ds_task_group") +public class TaskGroup implements Serializable { + /** + * key + */ + @TableId(value = "id", type = IdType.AUTO) + private int id; + /** + * task_group name + */ + private String name; + + private String description; + /** + * 作业组大小 + */ + private int groupSize; + /** + * 已使用作业组大小 + */ + private int useSize; + /** + * creator id + */ + private int userId; + /** + * 0 not available, 1 available + */ + private int status; + /** + * create time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + /** + * update time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + public TaskGroup(String name, String description, int groupSize, int userId,int status) { + this.name = name; + this.description = description; + this.groupSize = groupSize; + this.userId = userId; + this.status = status; + init(); + + } + + public TaskGroup() { + init(); + } + + public void init() { + this.status = 1; + this.useSize = 0; + } + + @Override + public String toString() { + return "TaskGroup{" + + "id=" + id + + ", name='" + name + '\'' + + ", description='" + description + '\'' + + ", groupSize=" + groupSize + + ", useSize=" + useSize + + ", userId=" + userId + + ", status=" + status + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public int getGroupSize() { + return groupSize; + } + + public void setGroupSize(int groupSize) { + this.groupSize = groupSize; + } + + public int getUseSize() { + return useSize; + } + + public void setUseSize(int useSize) { + this.useSize = useSize; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java new file mode 100644 index 000000000..c208dcb62 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * Task Group Queue + */ +@TableName("t_ds_task_group_queue") +public class TaskGroupQueue implements Serializable { + /** + * key + */ + @TableId(value = "id", type = IdType.AUTO) + private int id; + /** + * taskIntanceid + */ + private int taskId; + /** + * TaskInstance name + */ + private String taskName; + /** + * taskGroup id + */ + private int groupId; + /** + * processInstace id + */ + private int processId; + /** + * the priority of task instance + */ + private int priority; + /** + * is force start + * 0 NO ,1 YES + */ + private int forceStart; + /** + * ready to get the queue by other task finish + * 0 NO ,1 YES + */ + private int inQueue; + /** + * -1: waiting 1: running 2: finished + */ + private TaskGroupQueueStatus status; + /** + * create time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + /** + * update time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + public TaskGroupQueue() { + + } + + public TaskGroupQueue(int taskId, String taskName, int groupId, int processId, int priority, TaskGroupQueueStatus status) { + this.taskId = taskId; + this.taskName = taskName; + this.groupId = groupId; + this.processId = processId; + this.priority = priority; + this.status = status; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public int getGroupId() { + return groupId; + } + + public void setGroupId(int groupId) { + this.groupId = groupId; + } + + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + @Override + public String toString() { + return "TaskGroupQueue{" + + "id=" + id + + ", taskId=" + taskId + + ", taskName='" + taskName + '\'' + + ", groupId=" + groupId + + ", processId=" + processId + + ", priority=" + priority + + ", status=" + status + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; + } + + public TaskGroupQueueStatus getStatus() { + return status; + } + + public void setStatus(TaskGroupQueueStatus status) { + this.status = status; + } + + public int getForceStart() { + return forceStart; + } + + public void setForceStart(int forceStart) { + this.forceStart = forceStart; + } + + public int getInQueue() { + return inQueue; + } + + public void setInQueue(int inQueue) { + this.inQueue = inQueue; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 4076900f4..e5f4e60e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.entity; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; @@ -39,6 +38,7 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.core.type.TypeReference; /** * task instance @@ -268,6 +268,10 @@ public class TaskInstance implements Serializable { * dry run flag */ private int dryRun; + /** + * task group id + */ + private int taskGroupId; public void init(String host, Date startTime, String executePath) { this.host = host; @@ -283,6 +287,14 @@ public class TaskInstance implements Serializable { this.varPool = varPool; } + public int getTaskGroupId() { + return taskGroupId; + } + + public void setTaskGroupId(int taskGroupId) { + this.taskGroupId = taskGroupId; + } + public ProcessInstance getProcessInstance() { return processInstance; } @@ -457,7 +469,8 @@ public class TaskInstance implements Serializable { public DependentParameters getDependency() { if (this.dependency == null) { - Map taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() {}); + Map taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() { + }); this.dependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), DependentParameters.class); } return this.dependency; @@ -469,15 +482,17 @@ public class TaskInstance implements Serializable { public SwitchParameters getSwitchDependency() { if (this.switchDependency == null) { - Map taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() {}); + Map taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() { + }); this.switchDependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class); } return this.switchDependency; } public void setSwitchDependency(SwitchParameters switchDependency) { - Map taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() {}); - taskParamsMap.put(Constants.SWITCH_RESULT,JSONUtils.toJsonString(switchDependency)); + Map taskParamsMap = JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() { + }); + taskParamsMap.put(Constants.SWITCH_RESULT, JSONUtils.toJsonString(switchDependency)); this.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java new file mode 100644 index 000000000..26511d0b9 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.TaskGroup; + +import org.apache.ibatis.annotations.Param; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; + +/** + * the Dao interfaces of task group + * + * @author yinrui + * @since 2021-08-07 + */ +public interface TaskGroupMapper extends BaseMapper { + + /** + * compard and set to update table of task group + * + * @param id primary key + * @return affected rows + */ + int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int queueId, + @Param("queueStatus") int queueStatus); + + /** + * update table of task group + * + * @param id primary key + * @return affected rows + */ + int releaseTaskGroupResource(@Param("id") int id, @Param("useSize") int useSize, + @Param("queueId") int queueId, @Param("queueStatus") int queueStatus); + + /** + * select task groups paging + * + * @param page page + * @param userId user id + * @param name name + * @param status status + * @return result page + */ + IPage queryTaskGroupPaging(IPage page, @Param("userId") int userId, + @Param("name") String name, @Param("status") int status); + + /** + * query by task group name + * + * @param userId user id + * @param name name + * @return task group + */ + TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name); + + int selectAvailableCountById(@Param("groupId") int groupId); + + int selectCountByIdStatus(@Param("id") int id,@Param("status") int status); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java new file mode 100644 index 000000000..3b2e7d03b --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; + +/** + * the Dao interfaces of task group queue + * + * @author yinrui + * @since 2021-08-07 + */ +public interface TaskGroupQueueMapper extends BaseMapper { + + /** + * select task group queues by some conditions + * + * @param page page + * @param groupId group id + * @return task group queue list + */ + IPage queryTaskGroupQueuePaging(IPage page, + @Param("groupId") int groupId + ); + + TaskGroupQueue queryByTaskId(@Param("taskId") int taskId); + + /** + * query by status + * + * @param status status + * @return result + */ + List queryByStatus(@Param("status") int status); + + /** + * delete by task id + * + * @param taskId task id + * @return affected rows + */ + int deleteByTaskId(@Param("taskId") int taskId); + + /** + * update status by task id + * + * @param taskId task id + * @param status status + * @return + */ + int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status); + + List queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status); + + TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status, + @Param("forceStart") int forceStart, @Param("inQueue") int inQueue); + + void updateInQueue(@Param("inQueue") int inQueue, @Param("id") int id); + + void updateForceStart(@Param("taskId") int taskId, @Param("forceStart") int forceStart); + + int updateInQueueLimit1(@Param("oldValue") int oldValue, @Param("newValue") int newValue + , @Param("groupId") int id, @Param("status") int status); + + int updateInQueueCAS(@Param("oldValue") int oldValue, @Param("newValue") int newValue, @Param("id") int id); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 795004d0c..26bddd311 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -73,4 +73,6 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("startTime") Date startTime, @Param("endTime") Date endTime ); + + List loadAllInfosNoRelease(@Param("processInstanceId") int processInstanceId,@Param("status") int status); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index da6c0ea2b..c57d34682 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -1,4 +1,4 @@ - + - + id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, - resource_ids, operator, operate_time, create_time, update_time + resource_ids, operator, operate_time, create_time, update_time,task_group_id select @@ -76,14 +76,14 @@ insert into t_ds_task_definition (code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, - timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time) + timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time,task_group_id) values (#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description}, #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag}, #{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes}, #{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout}, - #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}) + #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}, #{taskDefinition.taskGroupId}) + select + + + from t_ds_task_group + + + and user_id = #{userId} + + + and status = #{status} + + + and name like concat('%', #{name}, '%') + + + + + + + update t_ds_task_group + set use_size = use_size+1 + where id = #{id} and use_size < group_size and + (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1 + + + + + update t_ds_task_group + set use_size = use_size-1 + where id = #{id} and use_size > 0 and + (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1 + + + + + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml new file mode 100644 index 000000000..a26471f06 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml @@ -0,0 +1,120 @@ + + + + + + + + + + + + + + + + + + + + + id, task_id, task_name, group_id, process_id, priority, status , force_start , in_queue, create_time, update_time + + + + + + + + delete from t_ds_task_group_queue + where task_id = #{taskId} + + + + update t_ds_task_group_queue + + + status = #{status}, + + + where task_id = #{taskId} + + + + update t_ds_task_group_queue + set in_queue = #{inQueue} + where id = #{id} + + + + update t_ds_task_group_queue + set force_start = #{forceStart} + where task_id = #{taskId} + + + + update t_ds_task_group_queue + set in_queue = #{newValue} + where group_id = #{groupId} and in_queue = #{oldValue} and status = #{status} order by priority desc limit 1 + + + + update t_ds_task_group_queue + set in_queue = #{newValue} + where id = #{id} and in_queue = #{oldValue} + + + + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index f41b58ae1..f05696dd1 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -1,4 +1,4 @@ - + - + id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, - first_submit_time, delay_time, task_params, var_pool, dry_run + first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time, ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, - ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run + ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.task_group_id update t_ds_task_instance @@ -162,4 +162,14 @@ order by instance.start_time desc + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index c85e106bd..e9e6a5113 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -477,6 +477,7 @@ CREATE TABLE t_ds_task_definition timeout_notify_strategy tinyint(4) DEFAULT NULL, timeout int(11) DEFAULT '0', delay_time int(11) DEFAULT '0', + task_group_id int(11) DEFAULT NULL, resource_ids text, create_time datetime NOT NULL, update_time datetime DEFAULT NULL, @@ -510,6 +511,7 @@ CREATE TABLE t_ds_task_definition_log delay_time int(11) DEFAULT '0', resource_ids text, operator int(11) DEFAULT NULL, + task_group_id int(11) DEFAULT NULL, operate_time datetime DEFAULT NULL, create_time datetime NOT NULL, update_time datetime DEFAULT NULL, @@ -844,6 +846,7 @@ CREATE TABLE t_ds_task_instance executor_id int(11) DEFAULT NULL, first_submit_time datetime DEFAULT NULL, delay_time int(4) DEFAULT '0', + task_group_id int(11) DEFAULT NULL, var_pool longtext, dry_run int NULL DEFAULT 0, PRIMARY KEY (id), @@ -1040,3 +1043,35 @@ CREATE TABLE t_ds_environment_worker_group_relation PRIMARY KEY (id), UNIQUE KEY environment_worker_group_unique (environment_code,worker_group) ); +DROP TABLE IF EXISTS t_ds_task_group_queue; +CREATE TABLE t_ds_task_group_queue +( + id int(11) NOT NULL AUTO_INCREMENT , + task_id int(11) DEFAULT NULL , + task_name VARCHAR(100) DEFAULT NULL , + group_id int(11) DEFAULT NULL , + process_id int(11) DEFAULT NULL , + priority int(8) DEFAULT '0' , + status int(4) DEFAULT '-1' , + force_start int(4) DEFAULT '0' , + in_queue int(4) DEFAULT '0' , + create_time datetime DEFAULT NULL , + update_time datetime DEFAULT NULL , + PRIMARY KEY (id) +); + +DROP TABLE IF EXISTS t_ds_task_group; +CREATE TABLE t_ds_task_group +( + id int(11) NOT NULL AUTO_INCREMENT , + name varchar(100) DEFAULT NULL , + description varchar(200) DEFAULT NULL , + group_size int(11) NOT NULL , + use_size int(11) DEFAULT '0' , + user_id int(11) DEFAULT NULL , + project_id int(11) DEFAULT NULL , + status int(4) DEFAULT '1' , + create_time datetime DEFAULT NULL , + update_time datetime DEFAULT NULL , + PRIMARY KEY(id) +); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 1f23eb144..949959bdf 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -477,6 +477,7 @@ CREATE TABLE `t_ds_task_definition` ( `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute', `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', `resource_ids` text COMMENT 'resource id, separated by comma', + `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`id`,`code`) @@ -508,6 +509,7 @@ CREATE TABLE `t_ds_task_definition_log` ( `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', `resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', + `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', @@ -832,6 +834,7 @@ CREATE TABLE `t_ds_task_instance` ( `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time', `var_pool` longtext COMMENT 'var_pool', + `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run', PRIMARY KEY (`id`), KEY `process_instance_id` (`process_instance_id`) USING BTREE, @@ -1017,3 +1020,40 @@ CREATE TABLE `t_ds_environment_worker_group_relation` ( PRIMARY KEY (`id`), UNIQUE KEY `environment_worker_group_unique` (`environment_code`,`worker_group`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; + +-- ---------------------------- +-- Table structure for t_ds_task_group_queue +-- ---------------------------- +DROP TABLE IF EXISTS `t_ds_task_group_queue`; +CREATE TABLE `t_ds_task_group_queue` ( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key', + `task_id` int(11) DEFAULT NULL COMMENT 'taskintanceid', + `task_name` varchar(100) DEFAULT NULL COMMENT 'TaskInstance name', + `group_id` int(11) DEFAULT NULL COMMENT 'taskGroup id', + `process_id` int(11) DEFAULT NULL COMMENT 'processInstace id', + `priority` int(8) DEFAULT '0' COMMENT 'priority', + `status` tinyint(4) DEFAULT '-1' COMMENT '-1: waiting 1: running 2: finished', + `force_start` tinyint(4) DEFAULT '0' COMMENT 'is force start 0 NO ,1 YES', + `in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES', + `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY( `id` ) +)ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; + +-- ---------------------------- +-- Table structure for t_ds_task_group +-- ---------------------------- +DROP TABLE IF EXISTS `t_ds_task_group`; +CREATE TABLE `t_ds_task_group` ( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key', + `name` varchar(100) DEFAULT NULL COMMENT 'task_group name', + `description` varchar(200) DEFAULT NULL, + `group_size` int (11) NOT NULL COMMENT'group size', + `use_size` int (11) DEFAULT '0' COMMENT 'used size', + `user_id` int(11) DEFAULT NULL COMMENT 'creator id', + `project_id` int(11) DEFAULT NULL COMMENT 'project id', + `status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available', + `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY(`id`) +) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql index 8c39bb128..de7454082 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql @@ -385,6 +385,7 @@ CREATE TABLE t_ds_task_definition ( timeout_notify_strategy int DEFAULT NULL , timeout int DEFAULT '0' , delay_time int DEFAULT '0' , + task_group_id int DEFAULT NULL, resource_ids text , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , @@ -416,6 +417,7 @@ CREATE TABLE t_ds_task_definition_log ( delay_time int DEFAULT '0' , resource_ids text , operator int DEFAULT NULL , + task_group_id int DEFAULT NULL, operate_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , @@ -713,6 +715,7 @@ CREATE TABLE t_ds_task_instance ( executor_id int DEFAULT NULL , first_submit_time timestamp DEFAULT NULL , delay_time int DEFAULT '0' , + task_group_id int DEFAULT NULL, var_pool text , dry_run int DEFAULT '0' , PRIMARY KEY (id), @@ -989,3 +992,34 @@ CREATE TABLE t_ds_environment_worker_group_relation ( PRIMARY KEY (id) , CONSTRAINT environment_worker_group_unique UNIQUE (environment_code,worker_group) ); + +DROP TABLE IF EXISTS t_ds_task_group_queue; +CREATE TABLE t_ds_task_group_queue ( + id serial NOT NULL, + task_id int DEFAULT NULL , + task_name VARCHAR(100) DEFAULT NULL , + group_id int DEFAULT NULL , + process_id int DEFAULT NULL , + priority int DEFAULT '0' , + status int DEFAULT '-1' , + force_start int DEFAULT '0' , + in_queue int DEFAULT '0' , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY (id) +); + +DROP TABLE IF EXISTS t_ds_task_group; +CREATE TABLE t_ds_task_group ( + id serial NOT NULL, + name varchar(100) DEFAULT NULL , + description varchar(200) DEFAULT NULL , + group_size int NOT NULL , + use_size int DEFAULT '0' , + user_id int DEFAULT NULL , + project_id int DEFAULT NULL , + status int DEFAULT '1' , + create_time timestamp DEFAULT NULL , + update_time timestamp DEFAULT NULL , + PRIMARY KEY(id) +); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java new file mode 100644 index 000000000..2ee8b2b7e --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; + +import java.util.Date; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +public class TaskGroupMapperTest extends BaseDaoTest { + + private static final Logger logger = LoggerFactory.getLogger(TaskGroupMapperTest.class); + + @Autowired + TaskGroupMapper taskGroupMapper; + + /** + * test insert + */ + public TaskGroup insertOne() { + TaskGroup taskGroup = new TaskGroup(); + taskGroup.setName("task group"); + taskGroup.setUserId(1); + taskGroup.setStatus(1); + taskGroup.setGroupSize(10); + taskGroup.setDescription("this is a task group"); + Date date = new Date(System.currentTimeMillis()); + taskGroup.setUpdateTime(date); + taskGroup.setUpdateTime(date); + + taskGroupMapper.insert(taskGroup); + return taskGroup; + } + + /** + * test update + */ + @Test + public void testUpdate() { + TaskGroup taskGroup = insertOne(); + taskGroup.setGroupSize(100); + taskGroup.setUpdateTime(new Date(System.currentTimeMillis())); + int i = taskGroupMapper.updateById(taskGroup); + Assert.assertEquals(i, 1); + } + + /** + * test CheckName + */ + @Test + public void testCheckName() { + TaskGroup taskGroup = insertOne(); + TaskGroup result = taskGroupMapper.queryByName(taskGroup.getUserId(), taskGroup.getName()); + Assert.assertNotNull(result); + } + + /** + * test queryTaskGroupPaging + */ + @Test + public void testQueryTaskGroupPaging() { + TaskGroup taskGroup = insertOne(); + Page page = new Page(1, 3); + IPage taskGroupIPage = taskGroupMapper.queryTaskGroupPaging( + page, + taskGroup.getUserId(), + taskGroup.getName(), taskGroup.getStatus()); + + Assert.assertEquals(taskGroupIPage.getTotal(), 1); + } +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java new file mode 100644 index 000000000..af203d262 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; + +import java.util.Date; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; + +public class TaskGroupQueueMapperTest extends BaseDaoTest { + + @Autowired + TaskGroupQueueMapper taskGroupQueueMapper; + + int userId = 1; + + public TaskGroupQueue insertOne() { + TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); + taskGroupQueue.setTaskName("task1"); + taskGroupQueue.setGroupId(10); + taskGroupQueue.setProcessId(11); + taskGroupQueue.setPriority(10); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + Date date = new Date(System.currentTimeMillis()); + taskGroupQueue.setUpdateTime(date); + taskGroupQueue.setUpdateTime(date); + + taskGroupQueueMapper.insert(taskGroupQueue); + return taskGroupQueue; + } + + /** + * test update + */ + @Test + public void testUpdate() { + TaskGroupQueue taskGroupQueue = insertOne(); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis())); + int i = taskGroupQueueMapper.updateById(taskGroupQueue); + Assert.assertEquals(i, 1); + } + + /** + * test delete + */ + @Test + public void testDelete() { + TaskGroupQueue taskGroupQueue = insertOne(); + int i = taskGroupQueueMapper.deleteByTaskId(taskGroupQueue.getId()); + Assert.assertEquals(i, 0); + } + + /** + * test select + */ + @Test + public void testSelect() { + TaskGroupQueue taskGroupQueue = insertOne(); + TaskGroupQueue result = taskGroupQueueMapper.selectById(taskGroupQueue.getId()); + Assert.assertEquals(result.getTaskName(), "task1"); + + List taskGroupQueues = taskGroupQueueMapper.queryByStatus(taskGroupQueue.getStatus().getCode()); + Assert.assertEquals(taskGroupQueues.size(), 1); + + } + + @Test + public void testUpdateStatusByTaskId() { + TaskGroupQueue taskGroupQueue = insertOne(); + int i = taskGroupQueueMapper.updateStatusByTaskId(taskGroupQueue.getTaskId(), 7); + Assert.assertEquals(i, 1); + } + + @Test + public void testDeleteByTaskId() { + TaskGroupQueue taskGroupQueue = insertOne(); + int i = taskGroupQueueMapper.deleteByTaskId(taskGroupQueue.getTaskId()); + Assert.assertEquals(i, 1); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index cf2cfe51b..a5022427f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -137,9 +137,16 @@ public enum CommandType { * state event request */ STATE_EVENT_REQUEST, - /** * cache expire */ - CACHE_EXPIRE; + CACHE_EXPIRE, + /** + * task state event request + */ + TASK_FORCE_STATE_EVENT_REQUEST, + /** + * task state event request + */ + TASK_WAKEUP_EVENT_REQUEST; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskEventChangeCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskEventChangeCommand.java new file mode 100644 index 000000000..1f1ea2c95 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskEventChangeCommand.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +/** + * db task final result response command + */ +public class TaskEventChangeCommand implements Serializable { + + private String key; + + private int processInstanceId; + + private int taskInstanceId; + + public TaskEventChangeCommand() { + super(); + } + + public TaskEventChangeCommand( + int processInstanceId, + int taskInstanceId + ) { + this.key = String.format("%d-%d", + processInstanceId, + taskInstanceId); + + this.processInstanceId = processInstanceId; + this.taskInstanceId = taskInstanceId; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + /** + * package response command + * + * @return command + */ + public Command convert2Command(CommandType commandType) { + Command command = new Command(); + command.setType(commandType); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "TaskEventChangeCommand{" + + "key=" + key + + '}'; + } + + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index c1e22f8d0..704b1b0da 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; @@ -95,6 +96,8 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, new TaskEventProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, new TaskEventProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor()); this.nettyRemotingServer.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java new file mode 100644 index 000000000..2200e8315 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +/** + * handle state event received from master/api + */ +public class TaskEventProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskEventProcessor.class); + + private StateEventResponseService stateEventResponseService; + + public TaskEventProcessor() { + stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); + } + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType() + || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType() + , String.format("invalid command type: %s", command.getType())); + + TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class); + StateEvent stateEvent = new StateEvent(); + stateEvent.setKey(taskEventChangeCommand.getKey()); + stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId()); + stateEvent.setType(StateEventType.WAIT_TASK_GROUP); + logger.info("received command : {}", stateEvent); + stateEventResponseService.addEvent2WorkflowExecute(stateEvent); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 9a7fc5917..cb0b93c14 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -149,6 +149,10 @@ public class StateEventResponseService { } } + public void addEvent2WorkflowExecute(StateEvent stateEvent) { + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + workflowExecuteThread.addStateEvent(stateEvent); + } public BlockingQueue getEventQueue() { return eventQueue; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 152aaeaa2..059c371ae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.graph.DAG; @@ -53,6 +54,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; @@ -316,6 +318,9 @@ public class WorkflowExecuteThread implements Runnable { case TASK_TIMEOUT: result = taskTimeout(stateEvent); break; + case WAIT_TASK_GROUP: + result = checkForceStartAndWakeUp(stateEvent); + break; default: break; } @@ -326,6 +331,29 @@ public class WorkflowExecuteThread implements Runnable { return result; } + private boolean checkForceStartAndWakeUp(StateEvent stateEvent) { + TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); + if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); + TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); + taskProcessor.dispatch(taskInstance, processInstance); + this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + return true; + } + if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { + boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue); + if (acquireTaskGroup) { + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); + TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); + taskProcessor.dispatch(taskInstance, processInstance); + return true; + } + } + return false; + } + private boolean taskTimeout(StateEvent stateEvent) { if (!checkTaskInstanceByStateEvent(stateEvent)) { return true; @@ -362,8 +390,25 @@ public class WorkflowExecuteThread implements Runnable { return true; } - if (task.getState().typeIsFinished()) { + if (task.getState().typeIsFinished() && !completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) { taskFinished(task); + if (task.getTaskGroupId() > 0) { + //release task group + TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(task); + if (nextTaskInstance != null) { + if (nextTaskInstance.getProcessInstanceId() == task.getProcessInstanceId()) { + StateEvent nextEvent = new StateEvent(); + nextEvent.setProcessInstanceId(this.processInstance.getId()); + nextEvent.setTaskInstanceId(nextTaskInstance.getId()); + nextEvent.setType(StateEventType.WAIT_TASK_GROUP); + this.stateEvents.add(nextEvent); + } else { + ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); + this.processService.sendStartTask2Master(processInstance,nextTaskInstance.getId(), + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + } + } + } } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); iTaskProcessor.run(); @@ -615,6 +660,11 @@ public class WorkflowExecuteThread implements Runnable { ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); } + List taskInstances = processService.findValidTaskListByProcessId(processInstance.getId()); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser); + //release task group + processService.releaseAllTaskGroup(processInstance.getId()); } public void checkSerialProcess(ProcessDefinition processDefinition) { @@ -740,7 +790,10 @@ public class WorkflowExecuteThread implements Runnable { && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } - + TaskDefinition taskDefinition = processService.findTaskDefinition( + taskInstance.getTaskCode(), + taskInstance.getTaskDefinitionVersion()); + taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId()); // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 3836a71c9..07ae8124b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -157,6 +157,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return null; } + @Override + public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) { + + } + /** * get TaskExecutionContext * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index aed682866..60c7de7a7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -39,8 +39,6 @@ import org.apache.commons.lang.StringUtils; import java.util.Date; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; /** @@ -57,7 +55,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor { @Autowired NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); - @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { this.processInstance = processInstance; @@ -67,6 +64,18 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return false; } setTaskExecutionLogger(); + int taskGroupId = task.getTaskGroupId(); + if (taskGroupId > 0) { + boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(), + task.getName(), + taskGroupId, + task.getProcessInstanceId(), + task.getTaskInstancePriority().getCode()); + if (!acquireTaskGroup) { + logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName()); + return true; + } + } dispatchTask(taskInstance, processInstance); return true; } @@ -76,6 +85,11 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return this.taskInstance.getState(); } + @Override + public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) { + this.dispatchTask(taskInstance,processInstance); + } + @Override public void run() { } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index b68dc221a..882533726 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -36,4 +36,6 @@ public interface ITaskProcessor { ExecutionStatus taskState(); + void dispatch(TaskInstance taskInstance, ProcessInstance processInstance); + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index ed5a1d1f8..fe9bddc01 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -113,7 +112,6 @@ public class WorkflowExecuteThreadTest { PowerMockito.doNothing().when(workflowExecuteThread, "endProcess"); } - @Test public void testParseStartNodeName() throws ParseException { try { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 994262ade..914315406 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; @@ -74,6 +75,8 @@ import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; @@ -94,10 +97,13 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -212,6 +218,12 @@ public class ProcessService { @Autowired private EnvironmentMapper environmentMapper; + @Autowired + private TaskGroupQueueMapper taskGroupQueueMapper; + + @Autowired + private TaskGroupMapper taskGroupMapper; + /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -2504,6 +2516,180 @@ public class ProcessService { return processTaskMap; } + /** + * the first time (when submit the task ) get the resource of the task group + * @param taskId task id + * @param taskName + * @param groupId + * @param processId + * @param priority + * @return + */ + public boolean acquireTaskGroup(int taskId, + String taskName, int groupId, + int processId, int priority) { + TaskGroup taskGroup = taskGroupMapper.selectById(groupId); + if (taskGroup == null) { + return true; + } + // if task group is not applicable + if (taskGroup.getStatus() == Flag.NO.getCode()) { + return true; + } + TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); + if (taskGroupQueue == null) { + taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE); + } else { + if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { + return true; + } + taskGroupQueue.setInQueue(Flag.NO.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); + this.taskGroupQueueMapper.updateById(taskGroupQueue); + } + //check priority + List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode()); + if (CollectionUtils.isNotEmpty(highPriorityTasks)) { + this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); + return false; + } + //try to get taskGroup + int count = taskGroupMapper.selectAvailableCountById(groupId); + if (count == 1 && robTaskGroupResouce(taskGroupQueue)) { + return true; + } + this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); + return false; + } + + /** + * try to get the task group resource(when other task release the resource) + * @param taskGroupQueue + * @return + */ + public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { + TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); + int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),taskGroupQueue.getId(), + TaskGroupQueueStatus.WAIT_QUEUE.getCode()); + if (affectedCount > 0) { + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + this.taskGroupQueueMapper.updateById(taskGroupQueue); + this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); + return true; + } + return false; + } + + public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) { + return robTaskGroupResouce(taskGroupQueue); + } + + public void releaseAllTaskGroup(int processInstanceId) { + List taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + for (TaskInstance info : taskInstances) { + releaseTaskGroup(info); + } + } + + /** + * release the TGQ resource when the corresponding task is finished. + * + * @return the result code and msg + */ + public TaskInstance releaseTaskGroup(TaskInstance taskInstance) { + + TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); + if (taskGroup == null) { + return null; + } + TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); + if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { + return null; + } + try { + while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize() + , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) { + thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); + if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { + return null; + } + taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); + } + } catch (Exception e) { + logger.error("release the task group error",e); + } + logger.info("updateTask:{}",taskInstance.getName()); + changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); + TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), + TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); + if (taskGroupQueue == null) { + return null; + } + while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) { + taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), + TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); + if (taskGroupQueue == null) { + return null; + } + } + return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); + } + + /** + * release the TGQ resource when the corresponding task is finished. + * + * @param taskId task id + * @return the result code and msg + */ + + public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) { + TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId); + taskGroupQueue.setStatus(status); + taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis())); + taskGroupQueueMapper.updateById(taskGroupQueue); + } + + /** + * insert into task group queue + * + * @param taskId task id + * @param taskName task name + * @param groupId group id + * @param processId process id + * @param priority priority + * @return result and msg code + */ + public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, + String taskName, Integer groupId, + Integer processId, Integer priority, TaskGroupQueueStatus status) { + TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, groupId, processId, priority, status); + taskGroupQueueMapper.insert(taskGroupQueue); + return taskGroupQueue; + } + + public int updateTaskGroupQueueStatus(Integer taskId, int status) { + return taskGroupQueueMapper.updateStatusByTaskId(taskId, status); + } + + public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) { + return taskGroupQueueMapper.updateById(taskGroupQueue); + } + + public TaskGroupQueue loadTaskGroupQueue(int taskId) { + return this.taskGroupQueueMapper.queryByTaskId(taskId); + } + + public void sendStartTask2Master(ProcessInstance processInstance,int taskId, + org.apache.dolphinscheduler.remote.command.CommandType taskType) { + String host = processInstance.getHost(); + String address = host.split(":")[0]; + int port = Integer.parseInt(host.split(":")[1]); + TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand( + processInstance.getId(), taskId + ); + stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType)); + } + public ProcessInstance loadNextProcess4Serial(long code, int state) { return this.processInstanceMapper.loadNextProcess4Serial(code, state); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 346f120e5..2117b8678 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; @@ -47,6 +48,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.CommandMapper; @@ -59,6 +61,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl; import org.apache.dolphinscheduler.service.exceptions.ServiceException; @@ -125,6 +129,10 @@ public class ProcessServiceTest { private ProcessDefinitionLogMapper processDefineLogMapper; @Mock private ResourceMapper resourceMapper; + @Mock + private TaskGroupMapper taskGroupMapper; + @Mock + private TaskGroupQueueMapper taskGroupQueueMapper; private HashMap processDefinitionCacheMaps = new HashMap<>(); @@ -761,4 +769,41 @@ public class ProcessServiceTest { } + @Test + public void testCreateTaskGroupQueue() { + Mockito.when(taskGroupQueueMapper.insert(Mockito.any(TaskGroupQueue.class))).thenReturn(1); + TaskGroupQueue taskGroupQueue = processService.insertIntoTaskGroupQueue(1, "task name", 1, 1, 1, TaskGroupQueueStatus.WAIT_QUEUE); + Assert.assertNotNull(taskGroupQueue); + } + + @Test + public void testDoRelease() { + + TaskGroupQueue taskGroupQueue = getTaskGroupQueue(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setTaskGroupId(taskGroupQueue.getGroupId()); + + Mockito.when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue); + Mockito.when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1); + + processService.releaseTaskGroup(taskInstance); + + } + + private TaskGroupQueue getTaskGroupQueue() { + TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); + taskGroupQueue.setTaskName("task name"); + taskGroupQueue.setId(1); + taskGroupQueue.setGroupId(1); + taskGroupQueue.setTaskId(1); + taskGroupQueue.setPriority(1); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + Date date = new Date(System.currentTimeMillis()); + taskGroupQueue.setUpdateTime(date); + taskGroupQueue.setCreateTime(date); + return taskGroupQueue; + } + }