[Improvement][TaskLog] Unified task log (#7831)
* task logger * update * log format * remove master config task-logger * udpate log level Co-authored-by: caishunfeng <534328519@qq.com>dailidong-patch-1
parent
5172862a0a
commit
e4cdd82b7c
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.dolphinscheduler.common.utils;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileInputStream;
|
||||
|
|
@ -48,38 +49,18 @@ public class LoggerUtils {
|
|||
*/
|
||||
private static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX);
|
||||
|
||||
/**
|
||||
* Task Logger's prefix
|
||||
*/
|
||||
public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
|
||||
|
||||
/**
|
||||
* Task Logger Thread's name
|
||||
*/
|
||||
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
|
||||
|
||||
/**
|
||||
* Task Logger Thread's name
|
||||
*/
|
||||
public static final String TASK_APPID_LOG_FORMAT = "[taskAppId=";
|
||||
|
||||
/**
|
||||
* build job id
|
||||
*
|
||||
* @param affix Task Logger's prefix
|
||||
* @param processInstId process instance id
|
||||
* @param taskId task id
|
||||
* @return task id format
|
||||
*/
|
||||
public static String buildTaskId(String affix,
|
||||
Date firstSubmitTime,
|
||||
public static String buildTaskId(Date firstSubmitTime,
|
||||
Long processDefineCode,
|
||||
int processDefineVersion,
|
||||
int processInstId,
|
||||
int taskId) {
|
||||
// - [taskAppId=TASK-20211107-798_1-4084-15210]
|
||||
// like TaskAppId=TASK-20211107-798_1-4084-15210
|
||||
String firstSubmitTimeStr = DateUtils.format(firstSubmitTime, Constants.YYYYMMDD);
|
||||
return String.format(" - %s%s-%s-%s_%s-%s-%s]", TASK_APPID_LOG_FORMAT, affix, firstSubmitTimeStr, processDefineCode, processDefineVersion, processInstId, taskId);
|
||||
return String.format("%s=%s-%s-%s_%s-%s-%s", TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, processDefineCode, processDefineVersion, processInstId, taskId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import ch.qos.logback.classic.spi.IThrowableProxy;
|
|||
import ch.qos.logback.classic.spi.LoggerContextVO;
|
||||
import ch.qos.logback.core.spi.FilterReply;
|
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Marker;
|
||||
|
|
@ -39,7 +41,7 @@ public class TaskLogFilterTest {
|
|||
FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() {
|
||||
@Override
|
||||
public String getThreadName() {
|
||||
return LoggerUtils.TASK_LOGGER_THREAD_NAME;
|
||||
return TaskConstants.TASK_APPID_LOG_FORMAT;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -64,7 +66,7 @@ public class TaskLogFilterTest {
|
|||
|
||||
@Override
|
||||
public String getLoggerName() {
|
||||
return null;
|
||||
return TaskConstants.TASK_LOG_LOGGER_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -39,7 +39,6 @@ public class MasterConfig {
|
|||
private int stateWheelInterval;
|
||||
private double maxCpuLoadAvg;
|
||||
private double reservedMemory;
|
||||
private boolean taskLogger;
|
||||
private int failoverInterval;
|
||||
private boolean killYarnJobWhenTaskFailover;
|
||||
|
||||
|
|
@ -139,14 +138,6 @@ public class MasterConfig {
|
|||
this.reservedMemory = reservedMemory;
|
||||
}
|
||||
|
||||
public boolean isTaskLogger() {
|
||||
return taskLogger;
|
||||
}
|
||||
|
||||
public void setTaskLogger(boolean taskLogger) {
|
||||
this.taskLogger = taskLogger;
|
||||
}
|
||||
|
||||
public int getFailoverInterval() {
|
||||
return failoverInterval;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
|||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.spi.enums.ResourceType;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
|
||||
|
|
@ -69,7 +70,7 @@ import com.google.common.base.Strings;
|
|||
|
||||
public abstract class BaseTaskProcessor implements ITaskProcessor {
|
||||
|
||||
protected Logger logger = LoggerFactory.getLogger(getClass());
|
||||
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
|
||||
|
||||
protected boolean killed = false;
|
||||
|
||||
|
|
@ -85,12 +86,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
|
|||
|
||||
protected int commitInterval;
|
||||
|
||||
protected boolean isTaskLogger;
|
||||
|
||||
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
|
||||
|
||||
protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
|
||||
|
||||
protected String threadLoggerInfoName;
|
||||
|
||||
@Override
|
||||
public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
|
||||
if (processService == null) {
|
||||
|
|
@ -103,7 +104,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
|
|||
this.processInstance = processInstance;
|
||||
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
|
||||
this.commitInterval = masterConfig.getTaskCommitInterval();
|
||||
this.isTaskLogger = masterConfig.isTaskLogger();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -133,13 +133,15 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
|
|||
|
||||
/**
|
||||
* dispatch task
|
||||
* @return
|
||||
*/
|
||||
protected abstract boolean dispatchTask();
|
||||
|
||||
@Override
|
||||
public boolean action(TaskAction taskAction) {
|
||||
|
||||
String threadName = Thread.currentThread().getName();
|
||||
if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
}
|
||||
switch (taskAction) {
|
||||
case STOP:
|
||||
return stop();
|
||||
|
|
@ -155,8 +157,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
|
|||
return dispatch();
|
||||
default:
|
||||
logger.error("unknown task action: {}", taskAction);
|
||||
|
||||
}
|
||||
// reset thread name
|
||||
Thread.currentThread().setName(threadName);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -205,6 +208,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
|
|||
return this.taskInstance.getState();
|
||||
}
|
||||
|
||||
/**
|
||||
* set master task running logger.
|
||||
*/
|
||||
public void setTaskExecutionLogger() {
|
||||
threadLoggerInfoName = LoggerUtils.buildTaskId(taskInstance.getFirstSubmitTime(),
|
||||
processInstance.getProcessDefinitionCode(),
|
||||
processInstance.getProcessDefinitionVersion(),
|
||||
taskInstance.getProcessInstanceId(),
|
||||
taskInstance.getId());
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
}
|
||||
|
||||
/**
|
||||
* get TaskExecutionContext
|
||||
*
|
||||
|
|
@ -267,21 +282,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
|
|||
.create();
|
||||
}
|
||||
|
||||
/**
|
||||
* set master task running logger.
|
||||
*/
|
||||
public void setTaskExecutionLogger(boolean isTaskLogger) {
|
||||
if (!isTaskLogger) {
|
||||
return;
|
||||
}
|
||||
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
|
||||
taskInstance.getFirstSubmitTime(),
|
||||
processInstance.getProcessDefinitionCode(),
|
||||
processInstance.getProcessDefinitionVersion(),
|
||||
taskInstance.getProcessInstanceId(),
|
||||
taskInstance.getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* set procedure task relation
|
||||
*
|
||||
|
|
|
|||
|
|
@ -48,13 +48,13 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
|
|||
private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
|
||||
|
||||
@Override
|
||||
public boolean submitTask() {
|
||||
protected boolean submitTask() {
|
||||
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
|
||||
|
||||
if (this.taskInstance == null) {
|
||||
return false;
|
||||
}
|
||||
setTaskExecutionLogger(isTaskLogger);
|
||||
this.setTaskExecutionLogger();
|
||||
int taskGroupId = taskInstance.getTaskGroupId();
|
||||
if (taskGroupId > 0) {
|
||||
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
|
||||
|
|
|
|||
|
|
@ -62,14 +62,10 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
|
|||
@Override
|
||||
public boolean submitTask() {
|
||||
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
|
||||
|
||||
if (this.taskInstance == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
setTaskExecutionLogger(isTaskLogger);
|
||||
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
this.setTaskExecutionLogger();
|
||||
initTaskParameters();
|
||||
logger.info("dependent task start");
|
||||
return true;
|
||||
|
|
|
|||
|
|
@ -75,13 +75,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
|
|||
if (this.taskInstance == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
this.setTaskExecutionLogger();
|
||||
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
|
||||
processInstance.getProcessDefinitionCode(),
|
||||
processInstance.getProcessDefinitionVersion(),
|
||||
taskInstance.getProcessInstanceId(),
|
||||
taskInstance.getId()));
|
||||
setTaskExecutionLogger(isTaskLogger);
|
||||
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
|
||||
taskInstance.setStartTime(new Date());
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
|
|||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
|
||||
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
|
||||
import org.apache.dolphinscheduler.server.utils.LogUtils;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
|
||||
import java.util.Date;
|
||||
|
|
@ -53,7 +54,13 @@ public class SubTaskProcessor extends BaseTaskProcessor {
|
|||
if (this.taskInstance == null) {
|
||||
return false;
|
||||
}
|
||||
setTaskExecutionLogger(isTaskLogger);
|
||||
this.setTaskExecutionLogger();
|
||||
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
|
||||
processInstance.getProcessDefinitionCode(),
|
||||
processInstance.getProcessDefinitionVersion(),
|
||||
taskInstance.getProcessInstanceId(),
|
||||
taskInstance.getId()));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -62,11 +62,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
|
|||
if (this.taskInstance == null) {
|
||||
return false;
|
||||
}
|
||||
this.setTaskExecutionLogger();
|
||||
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
|
||||
processInstance.getProcessDefinitionVersion(),
|
||||
taskInstance.getProcessInstanceId(),
|
||||
taskInstance.getId()));
|
||||
setTaskExecutionLogger(isTaskLogger);
|
||||
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
|
||||
taskInstance.setStartTime(new Date());
|
||||
|
|
|
|||
|
|
@ -104,8 +104,6 @@ master:
|
|||
max-cpu-load-avg: -1
|
||||
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
|
||||
reserved-memory: 0.3
|
||||
# use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
|
||||
task-logger: true
|
||||
# failover interval, the unit is minute
|
||||
failover-interval: 10
|
||||
# kill yarn jon when failover taskInstance, default true
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@
|
|||
<file>${log.base}/${taskAppId}.log</file>
|
||||
<encoder>
|
||||
<pattern>
|
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
|
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
|
||||
</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
|
|
|
|||
|
|
@ -16,16 +16,22 @@
|
|||
*/
|
||||
package org.apache.dolphinscheduler.server.log;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.sift.AbstractDiscriminator;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
||||
|
||||
/**
|
||||
* Task Log Discriminator
|
||||
*/
|
||||
public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(TaskLogDiscriminator.class);
|
||||
|
||||
/**
|
||||
* key
|
||||
*/
|
||||
|
|
@ -42,15 +48,20 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
|
|||
*/
|
||||
@Override
|
||||
public String getDiscriminatingValue(ILoggingEvent event) {
|
||||
String loggerName = event.getLoggerName()
|
||||
.split(Constants.EQUAL_SIGN)[1];
|
||||
String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-";
|
||||
if (loggerName.startsWith(prefix)) {
|
||||
return loggerName.substring(prefix.length(),
|
||||
loggerName.length() - 1).replaceFirst("-","/");
|
||||
} else {
|
||||
return "unknown_task";
|
||||
String key = "unknown_task";
|
||||
if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) {
|
||||
String threadName = event.getThreadName();
|
||||
if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) {
|
||||
threadName = threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length());
|
||||
}
|
||||
String part1 = threadName.split(Constants.EQUAL_SIGN)[1];
|
||||
String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-";
|
||||
if (part1.startsWith(prefix)) {
|
||||
key = part1.substring(prefix.length()).replaceFirst("-", "/");
|
||||
}
|
||||
}
|
||||
logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(), event.getLoggerName());
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -14,9 +14,13 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.log;
|
||||
|
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
|
|
@ -28,6 +32,8 @@ import ch.qos.logback.core.spi.FilterReply;
|
|||
*/
|
||||
public class TaskLogFilter extends Filter<ILoggingEvent> {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(TaskLogFilter.class);
|
||||
|
||||
/**
|
||||
* level
|
||||
*/
|
||||
|
|
@ -39,16 +45,19 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
|
|||
|
||||
/**
|
||||
* Accept or reject based on thread name
|
||||
*
|
||||
* @param event event
|
||||
* @return FilterReply
|
||||
*/
|
||||
@Override
|
||||
public FilterReply decide(ILoggingEvent event) {
|
||||
if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME)
|
||||
|| event.getLoggerName().startsWith(" - " + LoggerUtils.TASK_APPID_LOG_FORMAT)
|
||||
FilterReply filterReply = FilterReply.DENY;
|
||||
if ((event.getThreadName().startsWith(TaskConstants.TASK_APPID_LOG_FORMAT)
|
||||
&& event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME))
|
||||
|| event.getLevel().isGreaterOrEqual(level)) {
|
||||
return FilterReply.ACCEPT;
|
||||
filterReply = FilterReply.ACCEPT;
|
||||
}
|
||||
return FilterReply.DENY;
|
||||
logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(), event.getLoggerName(), filterReply.name(), level);
|
||||
return filterReply;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.dolphinscheduler.server.log;
|
||||
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.classic.spi.IThrowableProxy;
|
||||
|
|
@ -48,7 +50,7 @@ public class TaskLogDiscriminatorTest {
|
|||
String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() {
|
||||
@Override
|
||||
public String getThreadName() {
|
||||
return null;
|
||||
return "taskAppId=TASK-20220105-101-1-1001";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -73,7 +75,7 @@ public class TaskLogDiscriminatorTest {
|
|||
|
||||
@Override
|
||||
public String getLoggerName() {
|
||||
return "[taskAppId=TASK-1-1-1";
|
||||
return TaskConstants.TASK_LOG_LOGGER_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -121,7 +123,7 @@ public class TaskLogDiscriminatorTest {
|
|||
|
||||
}
|
||||
});
|
||||
Assert.assertEquals("1/1-", result);
|
||||
Assert.assertEquals("20220105/101-1-1001", result);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -126,7 +126,27 @@ public class TaskConstants {
|
|||
/**
|
||||
* task log info format
|
||||
*/
|
||||
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
|
||||
public static final String TASK_LOG_LOGGER_NAME = "TaskLogLogger";
|
||||
|
||||
/**
|
||||
* task log logger name format
|
||||
*/
|
||||
public static final String TASK_LOG_LOGGER_NAME_FORMAT = TASK_LOG_LOGGER_NAME + "-%s";
|
||||
|
||||
/**
|
||||
* Task Logger's prefix
|
||||
*/
|
||||
public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
|
||||
|
||||
/**
|
||||
* Task Logger Thread's name
|
||||
*/
|
||||
public static final String TASK_APPID_LOG_FORMAT = "taskAppId";
|
||||
|
||||
/**
|
||||
* get output log service
|
||||
*/
|
||||
public static final String GET_OUTPUT_LOG_SERVICE = "-getOutputLogService";
|
||||
|
||||
/**
|
||||
* date format of yyyyMMdd
|
||||
|
|
@ -320,11 +340,6 @@ public class TaskConstants {
|
|||
*/
|
||||
public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state";
|
||||
|
||||
/**
|
||||
* Task Logger Thread's name
|
||||
*/
|
||||
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
|
||||
|
||||
/**
|
||||
* hdfs/s3 configuration
|
||||
* resource.upload.path
|
||||
|
|
|
|||
|
|
@ -108,8 +108,6 @@ master:
|
|||
max-cpu-load-avg: -1
|
||||
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
|
||||
reserved-memory: 0.3
|
||||
# use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
|
||||
task-logger: true
|
||||
# failover interval, the unit is minute
|
||||
failover-interval: 10
|
||||
# kill yarn jon when failover taskInstance, default true
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@
|
|||
<file>${log.base}/${taskAppId}.log</file>
|
||||
<encoder>
|
||||
<pattern>
|
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
|
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
|
||||
</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
|
|||
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
|
||||
import org.apache.dolphinscheduler.spi.utils.StringUtils;
|
||||
|
||||
import org.apache.hadoop.hive.common.LogUtils;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
|
@ -307,8 +309,8 @@ public abstract class AbstractCommandExecutor {
|
|||
* @param process process
|
||||
*/
|
||||
private void parseProcessOutput(Process process) {
|
||||
String threadLoggerInfoName = String.format(TaskConstants.TASK_LOGGER_THREAD_NAME + "-%s", taskRequest.getTaskAppId());
|
||||
ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
|
||||
String threadLoggerInfoName = taskRequest.getTaskLogName();
|
||||
ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName);
|
||||
getOutputLogService.submit(() -> {
|
||||
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||
String line;
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
|
|||
|
||||
import org.apache.dolphinscheduler.spi.task.AbstractTask;
|
||||
import org.apache.dolphinscheduler.spi.task.Property;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
|
||||
|
||||
import java.util.Map;
|
||||
|
|
@ -36,7 +37,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
|
|||
|
||||
public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");
|
||||
|
||||
protected Logger logger;
|
||||
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
|
||||
|
||||
public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
|
||||
/**
|
||||
|
|
@ -46,7 +47,6 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
|
|||
*/
|
||||
protected AbstractTaskExecutor(TaskRequest taskRequest) {
|
||||
super(taskRequest);
|
||||
logger = LoggerFactory.getLogger(taskRequest.getTaskLogName());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -145,10 +145,6 @@ public class DataxTask extends AbstractTaskExecutor {
|
|||
@Override
|
||||
public void handle() throws Exception {
|
||||
try {
|
||||
// set the name of the current thread
|
||||
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
|
||||
// replace placeholder,and combine local and global parameters
|
||||
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
|
||||
if (MapUtils.isEmpty(paramsMap)) {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.dolphinscheduler.plugin.task.http;
|
||||
|
||||
import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
|
||||
import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT;
|
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
|
||||
import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
|
||||
|
|
@ -91,10 +90,6 @@ public class HttpTask extends AbstractTaskExecutor {
|
|||
|
||||
@Override
|
||||
public void handle() throws Exception {
|
||||
|
||||
String threadLoggerInfoName = String.format(TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
|
||||
String statusCode = null;
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ import java.util.Map;
|
|||
|
||||
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
|
||||
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS;
|
||||
import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT;
|
||||
|
||||
/**
|
||||
* procedure task
|
||||
|
|
@ -82,10 +81,6 @@ public class ProcedureTask extends AbstractTaskExecutor {
|
|||
|
||||
@Override
|
||||
public void handle() throws Exception {
|
||||
// set the name of the current thread
|
||||
String threadLoggerInfoName = String.format(TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
|
||||
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
|
||||
procedureParameters.getType(),
|
||||
procedureParameters.getDatasource(),
|
||||
|
|
|
|||
|
|
@ -114,10 +114,6 @@ public class SqlTask extends AbstractTaskExecutor {
|
|||
|
||||
@Override
|
||||
public void handle() throws Exception {
|
||||
// set the name of the current thread
|
||||
String threadLoggerInfoName = String.format(TaskConstants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
|
||||
Thread.currentThread().setName(threadLoggerInfoName);
|
||||
|
||||
logger.info("Full sql parameters: {}", sqlParameters);
|
||||
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
|
||||
sqlParameters.getType(),
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
|
|||
import org.apache.dolphinscheduler.spi.task.AbstractTask;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskChannel;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskConstants;
|
||||
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
|
||||
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
|
||||
|
||||
|
|
@ -171,14 +172,16 @@ public class TaskExecuteThread implements Runnable, Delayed {
|
|||
throw new RuntimeException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
|
||||
}
|
||||
TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class);
|
||||
String taskLogName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
|
||||
taskExecutionContext.getFirstSubmitTime(),
|
||||
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
|
||||
taskExecutionContext.getProcessDefineCode(),
|
||||
taskExecutionContext.getProcessDefineVersion(),
|
||||
taskExecutionContext.getProcessInstanceId(),
|
||||
taskExecutionContext.getTaskInstanceId());
|
||||
taskRequest.setTaskLogName(taskLogName);
|
||||
|
||||
// set the name of the current thread
|
||||
Thread.currentThread().setName(taskLogName);
|
||||
|
||||
task = taskChannel.createTask(taskRequest);
|
||||
|
||||
// task init
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@
|
|||
<file>${log.base}/${taskAppId}.log</file>
|
||||
<encoder>
|
||||
<pattern>
|
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
|
||||
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
|
||||
</pattern>
|
||||
<charset>UTF-8</charset>
|
||||
</encoder>
|
||||
|
|
|
|||
|
|
@ -107,13 +107,6 @@ public class TaskExecuteProcessorTest {
|
|||
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
|
||||
.thenReturn(workerConfig);
|
||||
|
||||
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
|
||||
taskExecutionContext.getFirstSubmitTime(),
|
||||
taskExecutionContext.getProcessDefineCode(),
|
||||
taskExecutionContext.getProcessDefineVersion(),
|
||||
taskExecutionContext.getProcessInstanceId(),
|
||||
taskExecutionContext.getTaskInstanceId()));
|
||||
|
||||
workerManager = PowerMockito.mock(WorkerManagerThread.class);
|
||||
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue