[DS-5229][feat] Implement server port custom config (#6947)
* [DS-5229][fix] server port custom config This closes #5229 * [DS-5229][feat] Implement server port custom config This closes #5229 * [Bug] [readme] Error link to Docker and k8s in readme apache#6802 (#6862) * [Bug] [readme] Error link to Docker and k8s in readme #6802 1、modify the error link * [Bug] [readme] Error link to Docker and k8s in readme #6802 1、modify the error link in readme_zh_cn.md * [DS-6829][WorkerServer] skip create log dir and print log in dryRun model (#6852) Co-authored-by: caishunfeng <534328519@qq.com> * [DS-5229][feat] Implement server port custom config Modify review suggestion This closes #5229 Co-authored-by: GaoTianDuo <gaotianduo_yewu@cmss.chinamobile.com> Co-authored-by: wind <caishunfeng@users.noreply.github.com> Co-authored-by: caishunfeng <534328519@qq.com>refactor-ui
parent
100a9ba3fc
commit
0dcff1425a
|
|
@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert;
|
|||
import static org.apache.dolphinscheduler.common.Constants.ALERT_RPC_PORT;
|
||||
|
||||
import org.apache.dolphinscheduler.common.thread.Stopper;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.dao.AlertDao;
|
||||
import org.apache.dolphinscheduler.dao.PluginDao;
|
||||
import org.apache.dolphinscheduler.dao.entity.Alert;
|
||||
|
|
@ -100,7 +101,7 @@ public class AlertServer implements Closeable {
|
|||
|
||||
private void startServer() {
|
||||
NettyServerConfig serverConfig = new NettyServerConfig();
|
||||
serverConfig.setListenPort(ALERT_RPC_PORT);
|
||||
serverConfig.setListenPort(PropertyUtils.getInt(ALERT_RPC_PORT, 50052));
|
||||
|
||||
server = new NettyRemotingServer(serverConfig);
|
||||
server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
|
|||
import org.apache.dolphinscheduler.api.service.LoggerService;
|
||||
import org.apache.dolphinscheduler.api.utils.Result;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.service.log.LogClientService;
|
||||
|
|
@ -94,7 +95,7 @@ public class LoggerServiceImpl implements LoggerService {
|
|||
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
|
||||
|
||||
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
|
||||
Constants.RPC_PORT);
|
||||
PropertyUtils.getInt(Constants.RPC_PORT, 50051));
|
||||
|
||||
StringBuilder log = new StringBuilder();
|
||||
if (skipLineNum == 0) {
|
||||
|
|
@ -106,7 +107,7 @@ public class LoggerServiceImpl implements LoggerService {
|
|||
}
|
||||
|
||||
log.append(logClient
|
||||
.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit));
|
||||
.rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit));
|
||||
|
||||
result.setData(log.toString());
|
||||
return result;
|
||||
|
|
@ -131,7 +132,7 @@ public class LoggerServiceImpl implements LoggerService {
|
|||
host,
|
||||
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
|
||||
return Bytes.concat(head,
|
||||
logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()));
|
||||
logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -39,9 +39,9 @@ public final class Constants {
|
|||
public static final String COMMON_PROPERTIES_PATH = "/common.properties";
|
||||
|
||||
/**
|
||||
* alter properties
|
||||
* alert properties
|
||||
*/
|
||||
public static final int ALERT_RPC_PORT = 50052;
|
||||
public static final String ALERT_RPC_PORT = "alert.rpc.port";
|
||||
|
||||
/**
|
||||
* registry properties
|
||||
|
|
@ -291,7 +291,7 @@ public final class Constants {
|
|||
*
|
||||
* rpc port
|
||||
*/
|
||||
public static final int RPC_PORT = 50051;
|
||||
public static final String RPC_PORT = "rpc.port";
|
||||
|
||||
/**
|
||||
* forbid running task
|
||||
|
|
|
|||
|
|
@ -89,3 +89,7 @@ sudo.enable=true
|
|||
|
||||
# development state
|
||||
development.state=false
|
||||
|
||||
# rpc port
|
||||
rpc.port=50051
|
||||
alert.rpc.port=50052
|
||||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.dolphinscheduler.server.log;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
|
|
@ -49,7 +50,7 @@ public class LoggerServer {
|
|||
|
||||
public LoggerServer() {
|
||||
this.serverConfig = new NettyServerConfig();
|
||||
this.serverConfig.setListenPort(Constants.RPC_PORT);
|
||||
this.serverConfig.setListenPort(PropertyUtils.getInt(Constants.RPC_PORT, 50051));
|
||||
this.server = new NettyRemotingServer(serverConfig);
|
||||
this.requestProcessor = new LoggerRequestProcessor();
|
||||
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
|
||||
|
|
@ -72,7 +73,7 @@ public class LoggerServer {
|
|||
*/
|
||||
public void start() {
|
||||
this.server.start();
|
||||
logger.info("logger server started, listening on port : {}", Constants.RPC_PORT);
|
||||
logger.info("logger server started, listening on port : {}", PropertyUtils.getInt(Constants.RPC_PORT, 50051));
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(LoggerServer.this::stop));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ public class ProcessUtils {
|
|||
String log;
|
||||
try (LogClientService logClient = new LogClientService()) {
|
||||
log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
|
||||
Constants.RPC_PORT,
|
||||
PropertyUtils.getInt(Constants.RPC_PORT, 50051),
|
||||
taskExecutionContext.getLogPath());
|
||||
}
|
||||
if (!StringUtils.isEmpty(log)) {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
|
|||
import org.apache.dolphinscheduler.common.IStoppable;
|
||||
import org.apache.dolphinscheduler.common.enums.NodeType;
|
||||
import org.apache.dolphinscheduler.common.thread.Stopper;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
|
|
@ -124,7 +125,7 @@ public class WorkerServer implements IStoppable {
|
|||
@PostConstruct
|
||||
public void run() {
|
||||
// alert-server client registry
|
||||
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT);
|
||||
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), PropertyUtils.getInt(Constants.ALERT_RPC_PORT, 50052));
|
||||
|
||||
// init remoting server
|
||||
NettyServerConfig serverConfig = new NettyServerConfig();
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
|||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
|
||||
|
|
@ -179,7 +180,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
|
|||
private Pair<Boolean, List<String>> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
|
||||
try (LogClientService logClient = new LogClientService();) {
|
||||
logger.info("view log host : {},logPath : {}", host, logPath);
|
||||
String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
|
||||
String log = logClient.viewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), logPath);
|
||||
List<String> appIds = Collections.emptyList();
|
||||
if (!StringUtils.isEmpty(log)) {
|
||||
appIds = LoggerUtils.getAppIds(log, logger);
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.log;
|
|||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.utils.FileUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.service.log.LogClientService;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
|
@ -51,7 +52,7 @@ public class LoggerServerTest {
|
|||
org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/demo.txt"), expectedTmpDemoString, Charset.defaultCharset());
|
||||
|
||||
String resultTmpDemoString = this.logClientService.rollViewLog(
|
||||
"localhost", Constants.RPC_PORT,"/tmp/demo.txt", 0, 1000);
|
||||
"localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051), "/tmp/demo.txt", 0, 1000);
|
||||
|
||||
Assert.assertEquals(expectedTmpDemoString, resultTmpDemoString.replaceAll("[\r|\n|\t]", StringUtils.EMPTY));
|
||||
|
||||
|
|
@ -63,11 +64,11 @@ public class LoggerServerTest {
|
|||
String expectedTmpRemoveString = "testRemoveTaskLog";
|
||||
org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/remove.txt"), expectedTmpRemoveString, Charset.defaultCharset());
|
||||
|
||||
Boolean b = this.logClientService.removeTaskLog("localhost", Constants.RPC_PORT,"/tmp/remove.txt");
|
||||
Boolean b = this.logClientService.removeTaskLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/remove.txt");
|
||||
|
||||
Assert.assertTrue(b);
|
||||
|
||||
String result = this.logClientService.viewLog("localhost", Constants.RPC_PORT,"/tmp/demo.txt");
|
||||
String result = this.logClientService.viewLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/demo.txt");
|
||||
|
||||
Assert.assertEquals(StringUtils.EMPTY, result);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateEx
|
|||
import org.apache.dolphinscheduler.common.utils.DateUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.Command;
|
||||
import org.apache.dolphinscheduler.dao.entity.DagData;
|
||||
|
|
@ -507,7 +508,7 @@ public class ProcessService {
|
|||
if (StringUtils.isEmpty(taskInstance.getHost())) {
|
||||
continue;
|
||||
}
|
||||
int port = Constants.RPC_PORT;
|
||||
int port = PropertyUtils.getInt(Constants.RPC_PORT, 50051);
|
||||
String ip = "";
|
||||
try {
|
||||
ip = Host.of(taskInstance.getHost()).getIp();
|
||||
|
|
|
|||
Loading…
Reference in New Issue