diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 847f87d82a..5d32fda30c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -192,19 +191,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService return getLogBytes(task); } - /** - * get host - * - * @param address address - * @return old version return true ,otherwise return false - */ - private String getHost(String address) { - if (Boolean.TRUE.equals(Host.isOldVersion(address))) { - return address; - } - return Host.of(address).getIp(); - } - /** * query log * @@ -214,11 +200,10 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService * @return log string data */ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { + Host host = Host.of(taskInstance.getHost()); - String host = getHost(taskInstance.getHost()); - - logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), - PropertyUtils.getInt(Constants.RPC_PORT, 50051)); + logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(), + host.getPort()); StringBuilder log = new StringBuilder(); if (skipLineNum == 0) { @@ -230,7 +215,7 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService } log.append(logClient - .rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit)); + .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit)); return log.toString(); } @@ -242,12 +227,12 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService * @return log byte array */ private byte[] getLogBytes(TaskInstance taskInstance) { - String host = getHost(taskInstance.getHost()); + Host host = Host.of(taskInstance.getHost()); byte[] head = String.format(LOG_HEAD_FORMAT, taskInstance.getLogPath(), host, Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8); return Bytes.concat(head, - logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath())); + logClient.getLogBytes(host.getIp(), host.getPort(), taskInstance.getLogPath())); } } diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 9ae11e3c93..b91937ae37 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -91,5 +91,4 @@ sudo.enable=true development.state=false # rpc port -rpc.port=50051 alert.rpc.port=50052 \ No newline at end of file diff --git a/dolphinscheduler-log-server/pom.xml b/dolphinscheduler-log-server/pom.xml index de58a62753..18fb7121f4 100644 --- a/dolphinscheduler-log-server/pom.xml +++ b/dolphinscheduler-log-server/pom.xml @@ -49,41 +49,4 @@ - - - - maven-assembly-plugin - - - dolphinscheduler-logger-server - package - - single - - - logger-server - - src/main/assembly/dolphinscheduler-log-server.xml - - false - - - - - - - - - - docker - - - - org.codehaus.mojo - exec-maven-plugin - - - - - diff --git a/dolphinscheduler-log-server/src/main/assembly/dolphinscheduler-log-server.xml b/dolphinscheduler-log-server/src/main/assembly/dolphinscheduler-log-server.xml deleted file mode 100644 index 35ac6fffd3..0000000000 --- a/dolphinscheduler-log-server/src/main/assembly/dolphinscheduler-log-server.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - - dolphinscheduler-logger-server - - dir - - false - logger-server - - - ${basedir}/src/main/bin - bin - 0755 - 0755 - - - ${basedir}/../script/env - bin - - dolphinscheduler_env.sh - - 0755 - 0755 - - - ${basedir}/../../dolphinscheduler-common/src/main/resources - - **/*.properties - - conf - - - - - libs - - - diff --git a/dolphinscheduler-log-server/src/main/bin/start.sh b/dolphinscheduler-log-server/src/main/bin/start.sh deleted file mode 100644 index 2d6f8fc717..0000000000 --- a/dolphinscheduler-log-server/src/main/bin/start.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -BIN_DIR=$(dirname $0) -DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)} - -source "$BIN_DIR/dolphinscheduler_env.sh" - -JAVA_OPTS=${JAVA_OPTS:-"-server -Xms1g -Xmx1g -Xmn512m -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"} - -if [[ "$DOCKER" == "true" ]]; then - JAVA_OPTS="${JAVA_OPTS} -XX:-UseContainerSupport" -fi - -java $JAVA_OPTS \ - -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \ - org.apache.dolphinscheduler.server.log.LoggerServer diff --git a/dolphinscheduler-log-server/src/main/docker/Dockerfile b/dolphinscheduler-log-server/src/main/docker/Dockerfile deleted file mode 100644 index de7e05f767..0000000000 --- a/dolphinscheduler-log-server/src/main/docker/Dockerfile +++ /dev/null @@ -1,34 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -FROM openjdk:8-jre-slim-buster - -ENV DOCKER true -ENV TZ Asia/Shanghai -ENV DOLPHINSCHEDULER_HOME /opt/dolphinscheduler - -RUN apt update ; \ - apt install -y curl ; \ - rm -rf /var/lib/apt/lists/* - -WORKDIR $DOLPHINSCHEDULER_HOME - -ADD ./target/logger-server $DOLPHINSCHEDULER_HOME - -EXPOSE 50051 - -CMD [ "/bin/bash", "./bin/start.sh" ] diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java deleted file mode 100644 index fc8124d5ac..0000000000 --- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.log; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.remote.NettyRemotingServer; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.config.NettyServerConfig; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * logger server - */ -public class LoggerServer { - - private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class); - - /** - * netty server - */ - private final NettyRemotingServer server; - - /** - * netty server config - */ - private final NettyServerConfig serverConfig; - - /** - * loggger request processor - */ - private final LoggerRequestProcessor requestProcessor; - - public LoggerServer() { - this.serverConfig = new NettyServerConfig(); - this.serverConfig.setListenPort(PropertyUtils.getInt(Constants.RPC_PORT, 50051)); - this.server = new NettyRemotingServer(serverConfig); - this.requestProcessor = new LoggerRequestProcessor(); - this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor()); - this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); - this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); - this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); - } - - /** - * main launches the server from the command line. - * @param args arguments - */ - public static void main(String[] args) { - final LoggerServer server = new LoggerServer(); - server.start(); - } - - /** - * server start - */ - public void start() { - this.server.start(); - logger.info("logger server started, listening on port : {}", PropertyUtils.getInt(Constants.RPC_PORT, 50051)); - Runtime.getRuntime().addShutdownHook(new Thread(LoggerServer.this::stop)); - } - - /** - * stop - */ - public void stop() { - this.server.close(); - logger.info("logger server shut down"); - } - -} diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java b/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java deleted file mode 100644 index b82e8f22c0..0000000000 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerServerTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.log; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.service.log.LogClientService; - -import org.apache.commons.lang.StringUtils; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class LoggerServerTest { - - private LoggerServer loggerServer; - - private LogClientService logClientService; - - @Before - public void startServerAndClient() { - this.loggerServer = new LoggerServer(); - this.loggerServer.start(); - this.logClientService = new LogClientService(); - } - - @Test - public void testRollViewLog() throws IOException { - String expectedTmpDemoString = "testRolloViewLog"; - org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/demo.txt"), expectedTmpDemoString, Charset.defaultCharset()); - - String resultTmpDemoString = this.logClientService.rollViewLog( - "localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051), "/tmp/demo.txt", 0, 1000); - - Assert.assertEquals(expectedTmpDemoString, resultTmpDemoString.replaceAll("[\r|\n|\t]", StringUtils.EMPTY)); - - FileUtils.deleteFile("/tmp/demo.txt"); - } - - @Test - public void testRemoveTaskLog() throws IOException { - String expectedTmpRemoveString = "testRemoveTaskLog"; - org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/remove.txt"), expectedTmpRemoveString, Charset.defaultCharset()); - - Boolean b = this.logClientService.removeTaskLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/remove.txt"); - - Assert.assertTrue(b); - - String result = this.logClientService.viewLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/demo.txt"); - - Assert.assertEquals(StringUtils.EMPTY, result); - } - - @After - public void stopServerAndClient() { - this.loggerServer.stop(); - this.logClientService.close(); - } -} diff --git a/dolphinscheduler-master/pom.xml b/dolphinscheduler-master/pom.xml index 0799445467..77fea6a57f 100644 --- a/dolphinscheduler-master/pom.xml +++ b/dolphinscheduler-master/pom.xml @@ -82,6 +82,10 @@ dolphinscheduler-worker test + + org.apache.dolphinscheduler + dolphinscheduler-log-server + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 65b03dc502..3021c0237d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; @@ -115,6 +116,14 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); + + // logger server + LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor(); + this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.start(); // self tolerant diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 334b7ca944..320d30aa82 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -187,9 +187,8 @@ public class ProcessUtils { Thread.sleep(Constants.SLEEP_TIME_MILLIS); String log; try (LogClientService logClient = new LogClientService()) { - log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(), - PropertyUtils.getInt(Constants.RPC_PORT, 50051), - taskExecutionContext.getLogPath()); + Host host = Host.of(taskExecutionContext.getHost()); + log = logClient.viewLog(host.getIp(), host.getPort(), taskExecutionContext.getLogPath()); } if (!StringUtils.isEmpty(log)) { if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 5b590f06c6..1bc796104c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -56,7 +56,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateEx import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; @@ -523,16 +522,9 @@ public class ProcessService { if (StringUtils.isEmpty(taskInstance.getHost())) { continue; } - int port = PropertyUtils.getInt(Constants.RPC_PORT, 50051); - String ip = ""; - try { - ip = Host.of(taskInstance.getHost()).getIp(); - } catch (Exception e) { - // compatible old version - ip = taskInstance.getHost(); - } + Host host = Host.of(taskInstance.getHost()); // remove task log from loggerserver - logClient.removeTaskLog(ip, port, taskLogPath); + logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath); } } } diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java index ac87a8a0e3..927210352b 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler; -import org.apache.dolphinscheduler.server.log.LoggerServer; - import org.apache.curator.test.TestingServer; import org.springframework.boot.SpringApplication; @@ -29,7 +27,6 @@ public class StandaloneServer { public static void main(String[] args) throws Exception { final TestingServer server = new TestingServer(true); System.setProperty("registry.zookeeper.connect-string", server.getConnectString()); - new LoggerServer().start(); SpringApplication.run(StandaloneServer.class, args); } } diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml index 7cce8b257f..a77dba7755 100644 --- a/dolphinscheduler-worker/pom.xml +++ b/dolphinscheduler-worker/pom.xml @@ -126,6 +126,10 @@ spring-boot-starter-test test + + org.apache.dolphinscheduler + dolphinscheduler-log-server + diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index ad70991d6c..79e2e8250c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; @@ -125,6 +126,14 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor()); + + // logger server + LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor(); + this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.start(); // worker registry diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index cde1b57576..96b51f3a3b 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,12 +17,10 @@ package org.apache.dolphinscheduler.server.worker.processor; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -138,7 +136,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { logger.error("kill task error", e); } // find log and kill yarn job - Pair> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(), + Pair> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), taskExecutionContext.getLogPath(), taskExecutionContext.getExecutePath(), taskExecutionContext.getTenantCode()); @@ -179,10 +177,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param tenantCode tenantCode * @return Pair> yarn kill result */ - private Pair> killYarnJob(String host, String logPath, String executePath, String tenantCode) { + private Pair> killYarnJob(Host host, String logPath, String executePath, String tenantCode) { try (LogClientService logClient = new LogClientService();) { - logger.info("view log host : {},logPath : {}", host, logPath); - String log = logClient.viewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), logPath); + logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), logPath, + host.getPort()); + String log = logClient.viewLog(host.getIp(), host.getPort(), logPath); List appIds = Collections.emptyList(); if (!StringUtils.isEmpty(log)) { appIds = LoggerUtils.getAppIds(log, logger);