diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index b3dfa62807..a45b25a92f 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert; import static org.apache.dolphinscheduler.common.Constants.ALERT_RPC_PORT; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.Alert; @@ -100,7 +101,7 @@ public class AlertServer implements Closeable { private void startServer() { NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(ALERT_RPC_PORT); + serverConfig.setListenPort(PropertyUtils.getInt(ALERT_RPC_PORT, 50052)); server = new NettyRemotingServer(serverConfig); server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 9d36464b76..0fd4befd77 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -94,7 +95,7 @@ public class LoggerServiceImpl implements LoggerService { Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), - Constants.RPC_PORT); + PropertyUtils.getInt(Constants.RPC_PORT, 50051)); StringBuilder log = new StringBuilder(); if (skipLineNum == 0) { @@ -106,7 +107,7 @@ public class LoggerServiceImpl implements LoggerService { } log.append(logClient - .rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit)); + .rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit)); result.setData(log.toString()); return result; @@ -131,7 +132,7 @@ public class LoggerServiceImpl implements LoggerService { host, Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8); return Bytes.concat(head, - logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath())); + logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath())); } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 6c7c106794..afa4ae9307 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -39,9 +39,9 @@ public final class Constants { public static final String COMMON_PROPERTIES_PATH = "/common.properties"; /** - * alter properties + * alert properties */ - public static final int ALERT_RPC_PORT = 50052; + public static final String ALERT_RPC_PORT = "alert.rpc.port"; /** * registry properties @@ -291,7 +291,7 @@ public final class Constants { * * rpc port */ - public static final int RPC_PORT = 50051; + public static final String RPC_PORT = "rpc.port"; /** * forbid running task diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 4005a0afaf..9ae11e3c93 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -89,3 +89,7 @@ sudo.enable=true # development state development.state=false + +# rpc port +rpc.port=50051 +alert.rpc.port=50052 \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java index 58e83ac3c4..fc8124d5ac 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.log; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -49,7 +50,7 @@ public class LoggerServer { public LoggerServer() { this.serverConfig = new NettyServerConfig(); - this.serverConfig.setListenPort(Constants.RPC_PORT); + this.serverConfig.setListenPort(PropertyUtils.getInt(Constants.RPC_PORT, 50051)); this.server = new NettyRemotingServer(serverConfig); this.requestProcessor = new LoggerRequestProcessor(); this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor()); @@ -72,7 +73,7 @@ public class LoggerServer { */ public void start() { this.server.start(); - logger.info("logger server started, listening on port : {}", Constants.RPC_PORT); + logger.info("logger server started, listening on port : {}", PropertyUtils.getInt(Constants.RPC_PORT, 50051)); Runtime.getRuntime().addShutdownHook(new Thread(LoggerServer.this::stop)); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 063b153a17..334b7ca944 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -188,7 +188,7 @@ public class ProcessUtils { String log; try (LogClientService logClient = new LogClientService()) { log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(), - Constants.RPC_PORT, + PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskExecutionContext.getLogPath()); } if (!StringUtils.isEmpty(log)) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index fd675b082b..765724f072 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -124,7 +125,7 @@ public class WorkerServer implements IStoppable { @PostConstruct public void run() { // alert-server client registry - alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT); + alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), PropertyUtils.getInt(Constants.ALERT_RPC_PORT, 50052)); // init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index c0ecd67385..7a4bf89e60 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -179,7 +180,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { private Pair> killYarnJob(String host, String logPath, String executePath, String tenantCode) { try (LogClientService logClient = new LogClientService();) { logger.info("view log host : {},logPath : {}", host, logPath); - String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); + String log = logClient.viewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), logPath); List appIds = Collections.emptyList(); if (!StringUtils.isEmpty(log)) { appIds = LoggerUtils.getAppIds(log, logger); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java index 74bdceae8b..b82e8f22c0 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.log; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.commons.lang.StringUtils; @@ -51,7 +52,7 @@ public class LoggerServerTest { org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/demo.txt"), expectedTmpDemoString, Charset.defaultCharset()); String resultTmpDemoString = this.logClientService.rollViewLog( - "localhost", Constants.RPC_PORT,"/tmp/demo.txt", 0, 1000); + "localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051), "/tmp/demo.txt", 0, 1000); Assert.assertEquals(expectedTmpDemoString, resultTmpDemoString.replaceAll("[\r|\n|\t]", StringUtils.EMPTY)); @@ -63,11 +64,11 @@ public class LoggerServerTest { String expectedTmpRemoveString = "testRemoveTaskLog"; org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/remove.txt"), expectedTmpRemoveString, Charset.defaultCharset()); - Boolean b = this.logClientService.removeTaskLog("localhost", Constants.RPC_PORT,"/tmp/remove.txt"); + Boolean b = this.logClientService.removeTaskLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/remove.txt"); Assert.assertTrue(b); - String result = this.logClientService.viewLog("localhost", Constants.RPC_PORT,"/tmp/demo.txt"); + String result = this.logClientService.viewLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/demo.txt"); Assert.assertEquals(StringUtils.EMPTY, result); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index e7093417a2..ee3cca4249 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateEx import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; @@ -507,7 +508,7 @@ public class ProcessService { if (StringUtils.isEmpty(taskInstance.getHost())) { continue; } - int port = Constants.RPC_PORT; + int port = PropertyUtils.getInt(Constants.RPC_PORT, 50051); String ip = ""; try { ip = Host.of(taskInstance.getHost()).getIp();