From d6dea4633c3f0e89f245b4ebae4c607b8cbde733 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 27 Dec 2023 20:20:21 +0800 Subject: [PATCH] Optimize server startup log (#15362) --- .../alert/rpc/AlertRpcServer.java | 5 +- .../src/main/resources/logback-spring.xml | 4 +- .../api/service/LoggerServiceTest.java | 2 +- .../common/constants/Constants.java | 114 +----------------- .../common/utils/NetUtils.java | 16 +-- .../dao/entity/TaskInstance.java | 4 +- .../extract/base/NettyRemotingServer.java | 25 ++-- .../base/config/NettyServerConfig.java | 13 +- ...onJdkDynamicRpcClientProxyFactoryTest.java | 3 +- .../builder/TaskExecutionContextBuilder.java | 4 +- .../server/master/config/MasterConfig.java | 40 +++--- .../server/master/rpc/MasterRpcServer.java | 3 +- .../runner/StateWheelExecuteThread.java | 8 +- .../src/main/resources/logback-spring.xml | 4 +- .../microbench/rpc/RpcBenchMarkTest.java | 3 +- .../src/main/resources/logback-spring.xml | 4 +- .../plugin/task/api/TaskConstants.java | 72 ----------- .../server/worker/config/WorkerConfig.java | 34 +++--- .../worker/message/MessageRetryRunner.java | 2 +- .../server/worker/rpc/WorkerRpcServer.java | 3 +- .../src/main/resources/logback-spring.xml | 4 +- 21 files changed, 104 insertions(+), 263 deletions(-) diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java index 4a8c621d30..3bd368573a 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.alert.rpc; import org.apache.dolphinscheduler.alert.config.AlertConfig; -import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; +import org.apache.dolphinscheduler.extract.base.NettyRemotingServerFactory; import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; @@ -31,7 +31,8 @@ import org.springframework.stereotype.Service; public class AlertRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable { public AlertRpcServer(AlertConfig alertConfig) { - super(new NettyRemotingServer(new NettyServerConfig(alertConfig.getPort()))); + super(NettyRemotingServerFactory.buildNettyRemotingServer( + NettyServerConfig.builder().serverName("AlertRpcServer").listenPort(alertConfig.getPort()).build())); } public void start() { diff --git a/dolphinscheduler-api/src/main/resources/logback-spring.xml b/dolphinscheduler-api/src/main/resources/logback-spring.xml index f46d732b8d..1982d6de5b 100644 --- a/dolphinscheduler-api/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-api/src/main/resources/logback-spring.xml @@ -23,7 +23,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 @@ -40,7 +40,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 1eb7a2575b..acfda80ccd 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -91,7 +91,7 @@ public class LoggerServiceTest { @BeforeEach public void setUp() { - nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(8080)); + nettyRemotingServer = new NettyRemotingServer(NettyServerConfig.builder().listenPort(8080).build()); nettyRemotingServer.start(); SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyRemotingServer); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index 94577d6bbf..7556636216 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -35,9 +35,6 @@ public final class Constants { */ public static final String COMMON_PROPERTIES_PATH = "/common.properties"; - public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master"; - public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker"; - public static final String FORMAT_SS = "%s%s"; public static final String FORMAT_S_S = "%s/%s"; public static final String FORMAT_S_S_COLON = "%s:%s"; @@ -191,11 +188,6 @@ public final class Constants { */ public static final String DOUBLE_SLASH = "//"; - /** - * EQUAL SIGN - */ - public static final String EQUAL_SIGN = "="; - /** * AT SIGN */ @@ -235,11 +227,6 @@ public final class Constants { */ public static final int SOCKET_TIMEOUT = 60 * 1000; - /** - * registry session timeout - */ - public static final int REGISTRY_SESSION_TIMEOUT = 10 * 1000; - /** * http header */ @@ -300,22 +287,6 @@ public final class Constants { */ public static final int MAX_TASK_TIMEOUT = 24 * 3600; - /** - * worker host weight - */ - public static final int DEFAULT_WORKER_HOST_WEIGHT = 100; - - /** - * unit convertor for minute to second - */ - public static final int MINUTE_2_SECOND_TIME_UNIT = 60; - - /*** - * - * rpc port - */ - public static final String RPC_PORT = "rpc.port"; - /** * forbid running task */ @@ -356,21 +327,6 @@ public final class Constants { public static final Duration SERVER_CLOSE_WAIT_TIME = Duration.ofSeconds(3); - /** - * one second mils - */ - public static final long SECOND_TIME_MILLIS = 1_000L; - - /** - * master task instance cache-database refresh interval - */ - public static final long CACHE_REFRESH_TIME_MILLIS = 20 * 1_000L; - - /** - * heartbeat for zk info length - */ - public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 14; - /** * jar */ @@ -408,39 +364,10 @@ public final class Constants { */ public static final int VERSION_FIRST = 1; - /** - * ACCEPTED - */ - public static final String ACCEPTED = "ACCEPTED"; - - /** - * SUCCEEDED - */ - public static final String SUCCEEDED = "SUCCEEDED"; - /** - * ENDED - */ - public static final String ENDED = "ENDED"; - /** - * NEW - */ - public static final String NEW = "NEW"; - /** - * NEW_SAVING - */ - public static final String NEW_SAVING = "NEW_SAVING"; - /** - * SUBMITTED - */ - public static final String SUBMITTED = "SUBMITTED"; /** * FAILED */ public static final String FAILED = "FAILED"; - /** - * KILLED - */ - public static final String KILLED = "KILLED"; /** * RUNNING */ @@ -449,25 +376,11 @@ public final class Constants { * underline "_" */ public static final String UNDERLINE = "_"; - /** - * application regex - */ - public static final String APPLICATION_REGEX = "application_\\d+_\\d+"; + public static final String PID = SystemUtils.IS_OS_WINDOWS ? "handle" : "pid"; - public static final char SUBTRACT_CHAR = '-'; - public static final char ADD_CHAR = '+'; - public static final char MULTIPLY_CHAR = '*'; - public static final char DIVISION_CHAR = '/'; - public static final char LEFT_BRACE_CHAR = '('; - public static final char RIGHT_BRACE_CHAR = ')'; - public static final String ADD_STRING = "+"; public static final String STAR = "*"; - public static final String DIVISION_STRING = "/"; - public static final String LEFT_BRACE_STRING = "("; - public static final char P = 'P'; public static final char N = 'N'; - public static final String SUBTRACT_STRING = "-"; public static final String GLOBAL_PARAMS = "globalParams"; public static final String LOCAL_PARAMS = "localParams"; public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId"; @@ -482,9 +395,6 @@ public final class Constants { public static final String QUEUE_NAME = "queueName"; public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0; public static final int LOG_QUERY_LIMIT = 4096; - public static final String BLOCKING_CONDITION = "blockingCondition"; - public static final String ALERT_WHEN_BLOCKING = "alertWhenBlocking"; - public static final String ALIAS = "alias"; public static final String CONTENT = "content"; public static final String DEPENDENT_SPLIT = ":||"; @@ -527,11 +437,6 @@ public final class Constants { public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state"; - /** - * com.amazonaws.services.s3.enableV4 - */ - public static final String AWS_S3_V4 = "com.amazonaws.services.s3.enableV4"; - /** * loginUserFromKeytab user */ @@ -550,11 +455,6 @@ public final class Constants { public static final String WORKFLOW_INSTANCE_ID_MDC_KEY = "workflowInstanceId"; public static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId"; - /** - * task log info format - */ - public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s"; - /** * double brackets left */ @@ -647,10 +547,6 @@ public final class Constants { * authorize writable perm */ public static final int AUTHORIZE_WRITABLE_PERM = 7; - /** - * authorize readable perm - */ - public static final int AUTHORIZE_READABLE_PERM = 4; public static final String START_TIME = "start time"; public static final String END_TIME = "end time"; @@ -682,8 +578,6 @@ public final class Constants { */ public static final String DATA_QUALITY_ERROR_OUTPUT_PATH = "data-quality.error.output.path"; - public static final String CACHE_KEY_VALUE_ALL = "'all'"; - /** * use for k8s */ @@ -784,12 +678,6 @@ public final class Constants { */ public static final String SUPPORT_HIVE_ONE_SESSION = "support.hive.oneSession"; - public static final String PRINCIPAL = "principal"; - public static final String ORACLE_DB_CONNECT_TYPE = "connectType"; - public static final String KERBEROS_KRB5_CONF_PATH = "javaSecurityKrb5Conf"; - public static final String KERBEROS_KEY_TAB_USERNAME = "loginUserKeytabUsername"; - public static final String KERBEROS_KEY_TAB_PATH = "loginUserKeytabPath"; - public static final Integer QUERY_ALL_ON_SYSTEM = 0; public static final Integer QUERY_ALL_ON_PROJECT = 1; public static final Integer QUERY_ALL_ON_WORKFLOW = 2; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java index e1d98e169a..ebefe210b2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java @@ -41,9 +41,6 @@ import lombok.extern.slf4j.Slf4j; import com.google.common.collect.Sets; -/** - * NetUtils - */ @Slf4j public class NetUtils { @@ -213,15 +210,17 @@ public class NetUtils { .collect(Collectors.toList()); // Use the specified network interface if set - if (StringUtils.isNotBlank(specifyNetworkInterfaceName())) { - String specifyNetworkInterfaceName = specifyNetworkInterfaceName(); + String specifiedNetworkInterfaceName = specifyNetworkInterfaceName(); + if (StringUtils.isNotBlank(specifiedNetworkInterfaceName)) { validNetworkInterfaces = validNetworkInterfaces.stream() - .filter(networkInterface -> specifyNetworkInterfaceName.equals(networkInterface.getDisplayName())) + .filter(networkInterface -> specifiedNetworkInterfaceName.equals(networkInterface.getDisplayName())) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(validNetworkInterfaces)) { throw new IllegalArgumentException( - "The specified network interface: " + specifyNetworkInterfaceName + " is not found"); + "The specified network interface: " + specifiedNetworkInterfaceName + " is not found"); } + log.info("Use the specified network interface: {} -> {}", specifiedNetworkInterfaceName, + validNetworkInterfaces); } Set restrictNetworkInterfaceName = restrictNetworkInterfaceName(); @@ -307,9 +306,10 @@ public class NetUtils { Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); while (interfaces.hasMoreElements()) { NetworkInterface networkInterface = interfaces.nextElement(); - log.info("Found NetworkInterface: {}", networkInterface); + log.debug("Found NetworkInterface: {}", networkInterface); validNetworkInterfaces.add(networkInterface); } + log.info("Get all NetworkInterfaces: {}", validNetworkInterfaces); return validNetworkInterfaces; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index b292641008..4897449753 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.dao.entity; -import static org.apache.dolphinscheduler.common.constants.Constants.MINUTE_2_SECOND_TIME_UNIT; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; @@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; import java.io.Serializable; import java.util.Date; import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.Data; @@ -407,7 +407,7 @@ public class TaskInstance implements Serializable { Date now = new Date(); long failedTimeInterval = DateUtils.differSec(now, getEndTime()); // task retry does not over time, return false - return getRetryInterval() * MINUTE_2_SECOND_TIME_UNIT < failedTimeInterval; + return TimeUnit.MINUTES.toSeconds(getRetryInterval()) < failedTimeInterval; } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java index 50d82e2c91..7655b804fe 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java @@ -73,9 +73,11 @@ public class NettyRemotingServer { public NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; ThreadFactory bossThreadFactory = - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build(); + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName() + "BossThread_%s") + .build(); ThreadFactory workerThreadFactory = - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build(); + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(serverConfig.getServerName() + "WorkerThread_%s").build(); if (Epoll.isAvailable()) { this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory); @@ -108,16 +110,23 @@ public class NettyRemotingServer { try { future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); } catch (Exception e) { - log.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); - throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort())); + log.error("{} bind fail {}, exit", serverConfig.getServerName(), e.getMessage(), e); + throw new RemoteException( + String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort())); } + if (future.isSuccess()) { - log.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); - } else if (future.cause() != null) { - throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), + log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort()); + return; + } + + if (future.cause() != null) { + throw new RemoteException( + String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), future.cause()); } else { - throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort())); + throw new RemoteException( + String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort())); } } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java index f81804935b..d432d2cc75 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java @@ -17,11 +17,19 @@ package org.apache.dolphinscheduler.extract.base.config; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class NettyServerConfig { + private String serverName; + /** * init the server connectable queue */ @@ -55,9 +63,6 @@ public class NettyServerConfig { /** * listen port */ - private int listenPort = 12346; + private int listenPort; - public NettyServerConfig(int listenPort) { - this.listenPort = listenPort; - } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java index 3cf9ff1c89..2c80773301 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java @@ -34,7 +34,8 @@ public class SingletonJdkDynamicRpcClientProxyFactoryTest { @BeforeEach public void setUp() { - nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(12345)); + nettyRemotingServer = + new NettyRemotingServer(NettyServerConfig.builder().serverName("ApiServer").listenPort(12345).build()); nettyRemotingServer.start(); new SpringServerMethodInvokerDiscovery(nettyRemotingServer) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java index c0a5abc369..832c1b336b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.builder; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.dolphinscheduler.common.constants.Constants.MINUTE_2_SECOND_TIME_UNIT; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -34,6 +33,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -84,7 +84,7 @@ public class TaskExecutionContextBuilder { if (taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.FAILED || taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) { taskExecutionContext.setTaskTimeout( - Math.min(taskDefinition.getTimeout() * MINUTE_2_SECOND_TIME_UNIT, Integer.MAX_VALUE)); + (int) Math.min(TimeUnit.MINUTES.toSeconds(taskDefinition.getTimeout()), Integer.MAX_VALUE)); } } taskExecutionContext.setTaskParams(taskDefinition.getTaskParams()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index d7f10f03fb..553789d384 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -163,23 +163,27 @@ public class MasterConfig implements Validator { } private void printConfig() { - log.info("Master config: listenPort -> {} ", listenPort); - log.info("Master config: fetchCommandNum -> {} ", fetchCommandNum); - log.info("Master config: preExecThreads -> {} ", preExecThreads); - log.info("Master config: execThreads -> {} ", execThreads); - log.info("Master config: dispatchTaskNumber -> {} ", dispatchTaskNumber); - log.info("Master config: hostSelector -> {} ", hostSelector); - log.info("Master config: heartbeatInterval -> {} ", heartbeatInterval); - log.info("Master config: taskCommitRetryTimes -> {} ", taskCommitRetryTimes); - log.info("Master config: taskCommitInterval -> {} ", taskCommitInterval); - log.info("Master config: stateWheelInterval -> {} ", stateWheelInterval); - log.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg); - log.info("Master config: reservedMemory -> {} ", reservedMemory); - log.info("Master config: failoverInterval -> {} ", failoverInterval); - log.info("Master config: killApplicationWhenTaskFailover -> {} ", killApplicationWhenTaskFailover); - log.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy); - log.info("Master config: masterAddress -> {} ", masterAddress); - log.info("Master config: masterRegistryPath -> {} ", masterRegistryPath); - log.info("Master config: workerGroupRefreshInterval -> {} ", workerGroupRefreshInterval); + String config = + "\n****************************Master Configuration**************************************" + + "\n listen-port -> " + listenPort + + "\n fetch-command-num -> " + fetchCommandNum + + "\n pre-exec-threads -> " + preExecThreads + + "\n exec-threads -> " + execThreads + + "\n dispatch-task-number -> " + dispatchTaskNumber + + "\n host-selector -> " + hostSelector + + "\n heartbeat-interval -> " + heartbeatInterval + + "\n task-commit-retry-times -> " + taskCommitRetryTimes + + "\n task-commit-interval -> " + taskCommitInterval + + "\n state-wheel-interval -> " + stateWheelInterval + + "\n max-cpu-load-avg -> " + maxCpuLoadAvg + + "\n reserved-memory -> " + reservedMemory + + "\n failover-interval -> " + failoverInterval + + "\n kill-application-when-task-failover -> " + killApplicationWhenTaskFailover + + "\n registry-disconnect-strategy -> " + registryDisconnectStrategy + + "\n master-address -> " + masterAddress + + "\n master-registry-path: " + masterRegistryPath + + "\n worker-group-refresh-interval: " + workerGroupRefreshInterval + + "\n****************************Master Configuration**************************************"; + log.info(config); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java index 91a7448b7c..0eaf885d11 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServer.java @@ -31,7 +31,8 @@ import org.springframework.stereotype.Component; public class MasterRpcServer extends SpringServerMethodInvokerDiscovery implements AutoCloseable { public MasterRpcServer(MasterConfig masterConfig) { - super(NettyRemotingServerFactory.buildNettyRemotingServer(new NettyServerConfig(masterConfig.getListenPort()))); + super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder() + .serverName("MasterRpcServer").listenPort(masterConfig.getListenPort()).build())); } public void start() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index d0e48b14c1..667eb4ecbb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; @@ -37,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; @@ -147,8 +147,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { continue; } long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), - (long) processInstance.getTimeout() - * Constants.MINUTE_2_SECOND_TIME_UNIT); + TimeUnit.MINUTES.toSeconds(processInstance.getTimeout())); if (timeRemain < 0) { log.info("Workflow instance {} timeout, adding timeout event", processInstance.getId()); addProcessTimeoutEvent(processInstance); @@ -247,8 +246,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { TaskInstance taskInstance = taskInstanceOptional.get(); if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), - (long) taskInstance.getTaskDefine().getTimeout() - * Constants.MINUTE_2_SECOND_TIME_UNIT); + TimeUnit.MINUTES.toSeconds(taskInstance.getTaskDefine().getTimeout())); if (timeRemain < 0) { log.info("Task instance is timeout, adding task timeout event and remove the check"); addTaskTimeoutEvent(taskInstance); diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index efe5d78b73..efc5a513c6 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -23,7 +23,7 @@ - [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 @@ -61,7 +61,7 @@ - [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java index fd423bcda0..1a3e4ab1e2 100644 --- a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java @@ -52,7 +52,8 @@ public class RpcBenchMarkTest extends AbstractBaseBenchmark { @Setup public void before() { - nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(12345)); + nettyRemotingServer = new NettyRemotingServer( + NettyServerConfig.builder().serverName("NettyRemotingServer").listenPort(12345).build()); nettyRemotingServer.start(); SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = new SpringServerMethodInvokerDiscovery(nettyRemotingServer); diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index c58b7951f7..daf3c77f3c 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -23,7 +23,7 @@ - [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 @@ -38,7 +38,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 83f4b3a678..fe8d9a77bf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.constants.DateConstants; import java.time.Duration; import java.util.Set; -import java.util.regex.Pattern; import com.google.common.collect.Sets; @@ -35,11 +34,6 @@ public class TaskConstants { public static final String FLINK_APPLICATION_REGEX = "JobID \\w+"; - /** - * string false - */ - public static final String STRING_FALSE = "false"; - /** * exit code kill */ @@ -146,11 +140,6 @@ public class TaskConstants { public static final String RWXR_XR_X = "rwxr-xr-x"; - /** - * date format of yyyyMMdd - */ - public static final String PARAMETER_FORMAT_DATE = "yyyyMMdd"; - /** * date format of yyyyMMddHHmmss */ @@ -291,22 +280,9 @@ public class TaskConstants { public static final char P = 'P'; public static final char N = 'N'; public static final String SUBTRACT_STRING = "-"; - public static final String GLOBAL_PARAMS = "globalParams"; - public static final String LOCAL_PARAMS = "localParams"; public static final String LOCAL_PARAMS_LIST = "localParamsList"; - public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId"; - public static final String PROCESS_INSTANCE_STATE = "processInstanceState"; - public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance"; - public static final String CONDITION_RESULT = "conditionResult"; - public static final String SWITCH_RESULT = "switchResult"; - public static final String DEPENDENCE = "dependence"; public static final String TASK_TYPE = "taskType"; - public static final String TASK_LIST = "taskList"; public static final String QUEUE = "queue"; - public static final String QUEUE_NAME = "queueName"; - public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0; - public static final int LOG_QUERY_LIMIT = 4096; - /** * default display rows */ @@ -327,33 +303,6 @@ public class TaskConstants { */ public static final String D = "-D"; - /** - * jdbc url - */ - public static final String JDBC_MYSQL = "jdbc:mysql://"; - public static final String JDBC_POSTGRESQL = "jdbc:postgresql://"; - public static final String JDBC_HIVE_2 = "jdbc:hive2://"; - public static final String JDBC_CLICKHOUSE = "jdbc:clickhouse://"; - public static final String JDBC_DATABEND = "jdbc:databend://"; - public static final String JDBC_ORACLE_SID = "jdbc:oracle:thin:@"; - public static final String JDBC_ORACLE_SERVICE_NAME = "jdbc:oracle:thin:@//"; - public static final String JDBC_SQLSERVER = "jdbc:sqlserver://"; - public static final String JDBC_DB2 = "jdbc:db2://"; - public static final String JDBC_PRESTO = "jdbc:presto://"; - - /** - * driver - */ - public static final String ORG_POSTGRESQL_DRIVER = "org.postgresql.Driver"; - public static final String COM_MYSQL_CJ_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; - public static final String ORG_APACHE_HIVE_JDBC_HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; - public static final String COM_CLICKHOUSE_JDBC_DRIVER = "com.clickhouse.jdbc.ClickHouseDriver"; - public static final String COM_DATABEND_JDBC_DRIVER = "com.databend.jdbc.DatabendDriver"; - public static final String COM_ORACLE_JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver"; - public static final String COM_SQLSERVER_JDBC_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; - public static final String COM_DB2_JDBC_DRIVER = "com.ibm.db2.jcc.DB2Driver"; - public static final String COM_PRESTO_JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"; - /** * datasource encryption salt */ @@ -361,11 +310,6 @@ public class TaskConstants { public static final String DATASOURCE_ENCRYPTION_ENABLE = "datasource.encryption.enable"; public static final String DATASOURCE_ENCRYPTION_SALT = "datasource.encryption.salt"; - /** - * resource storage type - */ - // public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type"; - /** * kerberos */ @@ -407,11 +351,6 @@ public class TaskConstants { public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state"; - /** - * Task Logger Thread's name - */ - public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; - /** * hdfs/s3 configuration * resource.storage.upload.base.path @@ -423,11 +362,6 @@ public class TaskConstants { */ public static final String DATA_QUALITY_JAR_NAME = "data-quality.jar.name"; - /** - * data.quality.error.output.path - */ - public static final String DATA_QUALITY_ERROR_OUTPUT_PATH = "data-quality.error.output.path"; - public static final String TASK_TYPE_CONDITIONS = "CONDITIONS"; public static final String TASK_TYPE_SWITCH = "SWITCH"; @@ -442,14 +376,10 @@ public class TaskConstants { public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY"; - public static final String DEPLOY_MODE_KUBERNETES = "Kubernetes"; - public static final Set TASK_TYPE_SET_K8S = Sets.newHashSet("K8S", "KUBEFLOW"); public static final String TASK_TYPE_BLOCKING = "BLOCKING"; - public static final String TASK_TYPE_STREAM = "STREAM"; - /** * azure config */ @@ -483,7 +413,6 @@ public class TaskConstants { * use for k8s task */ public static final String API_VERSION = "batch/v1"; - public static final String IMAGE_PULL_POLICY = "Always"; public static final String RESTART_POLICY = "Never"; public static final String MEMORY = "memory"; public static final String CPU = "cpu"; @@ -496,7 +425,6 @@ public class TaskConstants { public static final int LOG_LINES = 500; public static final String NAMESPACE_NAME = "name"; public static final String CLUSTER = "cluster"; - public static final Pattern COMMAND_SPLIT_REGEX = Pattern.compile("[^\\s\"'`]+|\"([^\"]+)\"|'([^']+)'|`([^`]+)`"); /** * spark / flink on k8s label name diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 4dd67613fe..f1b821901f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -17,10 +17,9 @@ package org.apache.dolphinscheduler.server.worker.config; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.commons.lang3.StringUtils; @@ -81,22 +80,27 @@ public class WorkerConfig implements Validator { workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); } - workerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress()); + workerConfig.setWorkerRegistryPath( + RegistryNodeType.WORKER.getRegistryPath() + "/" + workerConfig.getWorkerAddress()); printConfig(); } private void printConfig() { - log.info("Worker config: listenPort -> {}", listenPort); - log.info("Worker config: execThreads -> {}", execThreads); - log.info("Worker config: heartbeatInterval -> {}", heartbeatInterval); - log.info("Worker config: hostWeight -> {}", hostWeight); - log.info("Worker config: tenantAutoCreate -> {}", tenantAutoCreate); - log.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser); - log.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg); - log.info("Worker config: reservedMemory -> {}", reservedMemory); - log.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy); - log.info("Worker config: workerAddress -> {}", workerAddress); - log.info("Worker config: workerRegistryPath: {}", workerRegistryPath); - log.info("Worker config: taskExecuteThreadsFullPolicy: {}", taskExecuteThreadsFullPolicy); + String config = + "\n****************************Worker Configuration**************************************" + + "\n listen-port -> " + listenPort + + "\n exec-threads -> " + execThreads + + "\n heartbeat-interval -> " + heartbeatInterval + + "\n host-weight -> " + hostWeight + + "\n tenant-auto-create -> " + tenantAutoCreate + + "\n tenant-distributed-user -> " + tenantDistributedUser + + "\n max-cpu-load-avg -> " + maxCpuLoadAvg + + "\n reserved-memory -> " + reservedMemory + + "\n registry-disconnect-strategy -> " + registryDisconnectStrategy + + "\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy + + "\n address -> " + workerAddress + + "\n registry-path: " + workerRegistryPath + + "\n****************************Worker Configuration**************************************"; + log.info(config); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java index 28a4d8b214..08163e4938 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java @@ -69,7 +69,7 @@ public class MessageRetryRunner extends BaseDaemonThread { log.info("Message retry runner staring"); messageSenders.forEach(messageSender -> { messageSenderMap.put(messageSender.getMessageType(), messageSender); - log.info("Injected message sender: {}", messageSender.getClass().getName()); + log.info("Injected message sender: {}", messageSender.getClass().getSimpleName()); }); super.start(); log.info("Message retry runner started"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index a66b064d98..7733fbba4f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -33,7 +33,8 @@ import org.springframework.stereotype.Service; public class WorkerRpcServer extends SpringServerMethodInvokerDiscovery implements Closeable { public WorkerRpcServer(WorkerConfig workerConfig) { - super(NettyRemotingServerFactory.buildNettyRemotingServer(new NettyServerConfig(workerConfig.getListenPort()))); + super(NettyRemotingServerFactory.buildNettyRemotingServer(NettyServerConfig.builder() + .serverName("WorkerRpcServer").listenPort(workerConfig.getListenPort()).build())); } public void start() { diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 46a69f2239..b272315f2f 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -23,7 +23,7 @@ - [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8 @@ -61,7 +61,7 @@ - [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [WI-%X{workflowInstanceId:-0}][TI-%X{taskInstanceId:-0}] - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n UTF-8