Browse Source

Merge remote-tracking branch 'upstream/dev' into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
a574d2e38c
  1. 14
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/main/java/org/apache/dolphinscheduler/plugin/alert/http/HttpSender.java
  2. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
  3. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
  4. 43
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/BaseException.java
  5. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
  6. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
  7. 32
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  8. 27
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  9. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
  10. 54
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java
  11. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  12. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  13. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  14. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  15. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  16. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  17. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  18. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  19. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  20. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  21. 56
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java
  22. 9
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
  23. 18
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
  24. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
  25. 143
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
  26. 3
      pom.xml

14
dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/main/java/org/apache/dolphinscheduler/plugin/alert/http/HttpSender.java

@ -26,6 +26,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
@ -110,17 +111,17 @@ public class HttpSender {
} }
private void createHttpRequest(String msg) { private void createHttpRequest(String msg) {
if (REQUEST_TYPE_POST.equals(requestType)) { if (REQUEST_TYPE_POST.equals(requestType)) {
httpRequest = new HttpPost(url); httpRequest = new HttpPost(url);
setHeader();
//POST request add param in request body //POST request add param in request body
setMsgInRequestBody(msg); setMsgInRequestBody(msg);
} else if (REQUEST_TYPE_GET.equals(requestType)) { } else if (REQUEST_TYPE_GET.equals(requestType)) {
//GET request add param in url //GET request add param in url
setMsgInUrl(msg); setMsgInUrl(msg);
httpRequest = new HttpGet(url); httpRequest = new HttpGet(url);
setHeader();
} }
setHeader();
} }
/** /**
@ -156,11 +157,16 @@ public class HttpSender {
/** /**
* set body params * set body params
*/ */
private String setMsgInRequestBody(String msg) { private void setMsgInRequestBody(String msg) {
ObjectNode objectNode = JSONUtils.parseObject(bodyParams); ObjectNode objectNode = JSONUtils.parseObject(bodyParams);
//set msg content field //set msg content field
objectNode.put(contentField, msg); objectNode.put(contentField, msg);
return objectNode.toString(); try {
StringEntity entity = new StringEntity(bodyParams, DEFAULT_CHARSET);
((HttpPost)httpRequest).setEntity(entity);
} catch (Exception e) {
logger.error("send http alert msg exception : {}", e.getMessage());
}
} }
} }

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java

@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.interceptor;
import javax.servlet.http.HttpServletRequest; package org.apache.dolphinscheduler.api.interceptor;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.security.Authenticator; import org.apache.dolphinscheduler.api.security.Authenticator;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.commons.httpclient.HttpStatus;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java

@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common; package org.apache.dolphinscheduler.common;
/** /**
* server stop interface. * server stop interface.
*/ */
public interface IStoppable { public interface IStoppable {
/**
* Stop this service.
* @param cause why stopping
*/
void stop(String cause);
} /**
* Stop this service.
* @param cause why stopping
*/
void stop(String cause);
}

43
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/BaseException.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.exception;
/**
* Base Exception class for DolphinScheduler
*/
public class BaseException extends Exception {
public BaseException() {
}
public BaseException(String message) {
super(message);
}
public BaseException(String message, Throwable cause) {
super(message, cause);
}
public BaseException(Throwable cause) {
super(cause);
}
public BaseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.task.http; package org.apache.dolphinscheduler.common.task.http;
import org.apache.dolphinscheduler.common.enums.HttpCheckCondition; import org.apache.dolphinscheduler.common.enums.HttpCheckCondition;
@ -21,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.process.HttpProperty; import org.apache.dolphinscheduler.common.process.HttpProperty;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -70,7 +71,7 @@ public class HttpParameters extends AbstractParameters {
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return StringUtils.isNotEmpty(url); return StringUtils.isNotEmpty(url);
} }
@Override @Override

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java

@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.task.procedure; package org.apache.dolphinscheduler.common.task.procedure;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* procedure parameter * procedure parameter
*/ */

32
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -204,21 +205,13 @@ public class HadoopUtils implements Closeable {
* if rmHaIds is empty, single resourcemanager enabled * if rmHaIds is empty, single resourcemanager enabled
* if rmHaIds not empty: resourcemanager HA enabled * if rmHaIds not empty: resourcemanager HA enabled
*/ */
String appUrl = ""; yarnEnabled = true;
String appUrl = StringUtils.isEmpty(rmHaIds) ? appAddress : getAppAddress(appAddress, rmHaIds);
if (StringUtils.isEmpty(rmHaIds)) {
//single resourcemanager enabled
appUrl = appAddress;
yarnEnabled = true;
} else {
//resourcemanager HA enabled
appUrl = getAppAddress(appAddress, rmHaIds);
yarnEnabled = true;
logger.info("application url : {}", appUrl);
}
if (StringUtils.isBlank(appUrl)) { if (StringUtils.isBlank(appUrl)) {
throw new Exception("application url is blank"); throw new BaseException("yarn application url generation failed");
}
if (logger.isDebugEnabled()) {
logger.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId);
} }
return String.format(appUrl, applicationId); return String.format(appUrl, applicationId);
} }
@ -597,6 +590,10 @@ public class HadoopUtils implements Closeable {
//get active ResourceManager //get active ResourceManager
String activeRM = YarnHAAdminUtils.getAcitveRMName(rmHa); String activeRM = YarnHAAdminUtils.getAcitveRMName(rmHa);
if (StringUtils.isEmpty(activeRM)) {
return null;
}
String[] split1 = appAddress.split(Constants.DOUBLE_SLASH); String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
if (split1.length != 2) { if (split1.length != 2) {
@ -660,12 +657,7 @@ public class HadoopUtils implements Closeable {
} }
} catch (Exception e) { } catch (Exception e) {
for (int i = 1; i < rmIdArr.length; i++) { logger.error("yarn ha application url generation failed, message:{}", e.getMessage());
String state = getRMState(String.format(yarnUrl, rmIdArr[i]));
if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
return rmIdArr[i];
}
}
} }
return null; return null;
} }

27
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -19,6 +19,10 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -26,6 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* logger utils * logger utils
@ -36,6 +41,8 @@ public class LoggerUtils {
throw new UnsupportedOperationException("Construct LoggerUtils"); throw new UnsupportedOperationException("Construct LoggerUtils");
} }
private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
/** /**
* rules for extracting application ID * rules for extracting application ID
*/ */
@ -101,6 +108,26 @@ public class LoggerUtils {
return appIds; return appIds;
} }
/**
* read whole file content
*
* @param filePath file path
* @return whole file content
*/
public static String readWholeFileContent(String filePath) {
String line;
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) {
while ((line = br.readLine()) != null) {
sb.append(line + "\r\n");
}
return sb.toString();
} catch (IOException e) {
logger.error("read file error", e);
}
return "";
}
public static void logError(Optional<Logger> optionalLogger public static void logError(Optional<Logger> optionalLogger
, String error) { , String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error)); optionalLogger.ifPresent((Logger logger) -> logger.error(error));

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java

@ -184,7 +184,7 @@ public class HadoopUtilsTest {
} }
} }
@Test @Test(expected = Exception.class)
public void getApplicationUrl() throws Exception { public void getApplicationUrl() throws Exception {
String application_1516778421218_0042 = hadoopUtils.getApplicationUrl("application_1529051418016_0167"); String application_1516778421218_0042 = hadoopUtils.getApplicationUrl("application_1529051418016_0167");
logger.info(application_1516778421218_0042); logger.info(application_1516778421218_0042);

54
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java

@ -14,30 +14,72 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Optional;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.Test.None;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List; @RunWith(PowerMockRunner.class)
@PrepareForTest({LoggerUtils.class})
public class LoggerUtilsTest { public class LoggerUtilsTest {
private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class); private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class);
@Test @Test
public void buildTaskId() { public void buildTaskId() {
String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210); String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, 79, 4084, 15210);
Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId); Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
} }
@Test @Test
public void getAppIds() { public void getAppIds() {
List<String> appIdList = LoggerUtils.getAppIds("Running job: application_1_1",logger); List<String> appIdList = LoggerUtils.getAppIds("Running job: application_1_1", logger);
Assert.assertEquals("application_1_1", appIdList.get(0)); Assert.assertEquals("application_1_1", appIdList.get(0));
}
@Test
public void testReadWholeFileContent() throws Exception {
BufferedReader bufferedReader = PowerMockito.mock(BufferedReader.class);
PowerMockito.whenNew(BufferedReader.class).withAnyArguments().thenReturn(bufferedReader);
PowerMockito.when(bufferedReader.readLine()).thenReturn("").thenReturn(null);
FileInputStream fileInputStream = PowerMockito.mock(FileInputStream.class);
PowerMockito.whenNew(FileInputStream.class).withAnyArguments().thenReturn(fileInputStream);
InputStreamReader inputStreamReader = PowerMockito.mock(InputStreamReader.class);
PowerMockito.whenNew(InputStreamReader.class).withAnyArguments().thenReturn(inputStreamReader);
String log = LoggerUtils.readWholeFileContent("/tmp/log");
Assert.assertNotNull(log);
PowerMockito.when(bufferedReader.readLine()).thenThrow(new IOException());
log = LoggerUtils.readWholeFileContent("/tmp/log");
Assert.assertNotNull(log);
}
@Test(expected = None.class)
public void testLogError() {
Optional<Logger> loggerOptional = Optional.of(this.logger);
LoggerUtils.logError(loggerOptional, "error message");
LoggerUtils.logError(loggerOptional, new RuntimeException("error message"));
LoggerUtils.logError(loggerOptional, "error message", new RuntimeException("runtime exception"));
LoggerUtils.logInfo(loggerOptional, "info message");
} }
} }

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.utils.IOUtils; import org.apache.dolphinscheduler.common.utils.IOUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@ -31,13 +32,11 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collections; import java.util.Collections;
@ -86,7 +85,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
case VIEW_WHOLE_LOG_REQUEST: case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject( ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
command.getBody(), ViewLogRequestCommand.class); command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath()); String msg = LoggerUtils.readWholeFileContent(viewLogRequest.getPath());
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque())); channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break; break;
@ -182,27 +181,4 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
return Collections.emptyList(); return Collections.emptyList();
} }
/**
* read whole file content
*
* @param filePath file path
* @return whole file content
*/
private String readWholeFileContent(String filePath) {
BufferedReader br = null;
String line;
StringBuilder sb = new StringBuilder();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
while ((line = br.readLine()) != null) {
sb.append(line + "\r\n");
}
return sb.toString();
} catch (IOException e) {
logger.error("read file error",e);
} finally {
IOUtils.closeQuietly(br);
}
return "";
}
} }

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -85,9 +85,7 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService; private MasterSchedulerService masterSchedulerService;
/** /**
* master server startup * master server startup, not use web service
* <p>
* master server not use web service
* *
* @param args arguments * @param args arguments
*/ */
@ -131,14 +129,11 @@ public class MasterServer implements IStoppable {
} }
/** /**
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@Override if (Stopper.isRunning()) {
public void run() { close("shutdownHook");
if (Stopper.isRunning()) {
close("shutdownHook");
}
} }
})); }));
@ -152,7 +147,7 @@ public class MasterServer implements IStoppable {
public void close(String cause) { public void close(String cause) {
try { try {
//execute only once // execute only once
if (Stopper.isStopped()) { if (Stopper.isStopped()) {
return; return;
} }
@ -163,27 +158,24 @@ public class MasterServer implements IStoppable {
Stopper.stop(); Stopper.stop();
try { try {
//thread sleep 3 seconds for thread quietly stop // thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L); Thread.sleep(3000L);
} catch (Exception e) { } catch (Exception e) {
logger.warn("thread sleep exception ", e); logger.warn("thread sleep exception ", e);
} }
//close // close
this.masterSchedulerService.close(); this.masterSchedulerService.close();
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.zkMasterClient.close(); this.zkMasterClient.close();
//close quartz // close quartz
try { try {
QuartzExecutors.getInstance().shutdown(); QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped"); logger.info("Quartz service stopped");
} catch (Exception e) { } catch (Exception e) {
logger.warn("Quartz service stopped exception:{}", e.getMessage()); logger.warn("Quartz service stopped exception:{}", e.getMessage());
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("master server stop exception ", e); logger.error("master server stop exception ", e);
} finally {
System.exit(-1);
} }
} }
@ -192,4 +184,3 @@ public class MasterServer implements IStoppable {
close(cause); close(cause);
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -116,6 +116,8 @@ public class MasterRegistry {
String localNodePath = getMasterPath(); String localNodePath = getMasterPath();
zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath); zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
logger.info("master node : {} unRegistry to ZK.", address); logger.info("master node : {} unRegistry to ZK.", address);
heartBeatExecutor.shutdown();
logger.info("heartbeat executor shutdown");
} }
/** /**

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -14,15 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.runner;
import java.util.concurrent.ThreadPoolExecutor; package org.apache.dolphinscheduler.server.master.runner;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -36,6 +30,15 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -90,14 +93,14 @@ public class MasterSchedulerService extends Thread {
* constructor of MasterSchedulerService * constructor of MasterSchedulerService
*/ */
@PostConstruct @PostConstruct
public void init(){ public void init() {
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig(); NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
} }
@Override @Override
public synchronized void start(){ public synchronized void start() {
super.setName("MasterSchedulerService"); super.setName("MasterSchedulerService");
super.start(); super.start();
} }
@ -110,7 +113,7 @@ public class MasterSchedulerService extends Thread {
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
if(!terminated){ if (!terminated) {
logger.warn("masterExecService shutdown without terminated, increase await time"); logger.warn("masterExecService shutdown without terminated, increase await time");
} }
nettyRemotingClient.close(); nettyRemotingClient.close();
@ -123,7 +126,7 @@ public class MasterSchedulerService extends Thread {
@Override @Override
public void run() { public void run() {
logger.info("master scheduler started"); logger.info("master scheduler started");
while (Stopper.isRunning()){ while (Stopper.isRunning()) {
try { try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if (!runCheckFlag) { if (!runCheckFlag) {

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -43,9 +43,8 @@ public class HeartBeatTask implements Runnable {
private Set<String> heartBeatPaths; private Set<String> heartBeatPaths;
private String serverType; private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter; private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* server stop or not // server stop or not
*/
protected IStoppable stoppable = null; protected IStoppable stoppable = null;
public HeartBeatTask(String startTime, public HeartBeatTask(String startTime,

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -227,18 +227,14 @@ public class ZookeeperRegistryCenter implements InitializingBean {
* @throws Exception errors * @throws Exception errors
*/ */
protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception { protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception {
//ip_sequenceno // ip_sequence_no
String[] zNodesPath = zNode.split("\\/"); String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1]; String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) { return !registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath);
return true;
}
return false;
} }
public RegisterOperator getRegisterOperator() { public RegisterOperator getRegisterOperator() {

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -95,9 +95,8 @@ public class WorkerServer implements IStoppable {
private WorkerManagerThread workerManagerThread; private WorkerManagerThread workerManagerThread;
/** /**
* worker server startup * worker server startup, not use web service
* *
* worker server not use web service
* @param args arguments * @param args arguments
*/ */
public static void main(String[] args) { public static void main(String[] args) {
@ -143,12 +142,9 @@ public class WorkerServer implements IStoppable {
/** /**
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@Override if (Stopper.isRunning()) {
public void run() { close("shutdownHook");
if (Stopper.isRunning()) {
close("shutdownHook");
}
} }
})); }));
} }
@ -156,7 +152,7 @@ public class WorkerServer implements IStoppable {
public void close(String cause) { public void close(String cause) {
try { try {
//execute only once // execute only once
if (Stopper.isStopped()) { if (Stopper.isStopped()) {
return; return;
} }
@ -167,21 +163,18 @@ public class WorkerServer implements IStoppable {
Stopper.stop(); Stopper.stop();
try { try {
//thread sleep 3 seconds for thread quitely stop // thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L); Thread.sleep(3000L);
} catch (Exception e) { } catch (Exception e) {
logger.warn("thread sleep exception", e); logger.warn("thread sleep exception", e);
} }
// close
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.workerRegistry.unRegistry(); this.workerRegistry.unRegistry();
this.alertClientService.close(); this.alertClientService.close();
} catch (Exception e) { } catch (Exception e) {
logger.error("worker server stop exception ", e); logger.error("worker server stop exception ", e);
} finally {
System.exit(-1);
} }
} }

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -47,7 +47,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
/** /**
* worker registry * worker registry
*/ */
@ -115,6 +114,7 @@ public class WorkerRegistry {
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} else if (newState == ConnectionState.SUSPENDED) { } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ", address); logger.warn("worker : {} connection SUSPENDED ", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} }
}); });
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
@ -142,6 +142,7 @@ public class WorkerRegistry {
logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
} }
this.heartBeatExecutor.shutdownNow(); this.heartBeatExecutor.shutdownNow();
logger.info("heartbeat executor shutdown");
} }
/** /**

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task; package org.apache.dolphinscheduler.server.worker.task;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
@ -25,13 +26,12 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.commons.lang.StringUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -110,8 +110,8 @@ public class ZKMasterClient extends AbstractZKClient {
@Override @Override
public void close() { public void close() {
super.close();
masterRegistry.unRegistry(); masterRegistry.unRegistry();
super.close();
} }
/** /**

56
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.junit.Test;
import org.junit.Test.None;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
@RunWith(PowerMockRunner.class)
@PrepareForTest({LoggerUtils.class})
public class LoggerRequestProcessorTest {
@Test(expected = None.class)
public void testProcessViewWholeLogRequest() {
Channel channel = PowerMockito.mock(Channel.class);
PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand("/log/path");
Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
command.setBody(JSONUtils.toJsonByteArray(logRequestCommand));
LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor();
loggerRequestProcessor.process(channel, command);
}
}

9
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java

@ -14,18 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker; package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EnvFileTest { public class EnvFileTest {

18
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.service.log; package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.IPUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -116,12 +118,16 @@ public class LogClientService {
String result = ""; String result = "";
final Host address = new Host(host, port); final Host address = new Host(host, port);
try { try {
Command command = request.convert2Command(); if (IPUtils.getLocalHost().equals(host)) {
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); result = LoggerUtils.readWholeFileContent(request.getPath());
if (response != null) { } else {
ViewLogResponseCommand viewLog = JSONUtils.parseObject( Command command = request.convert2Command();
response.getBody(), ViewLogResponseCommand.class); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
return viewLog.getMsg(); if (response != null) {
ViewLogResponseCommand viewLog = JSONUtils.parseObject(
response.getBody(), ViewLogResponseCommand.class);
result = viewLog.getMsg();
}
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("view log error", e); logger.error("view log error", e);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -58,7 +58,7 @@ public class ZookeeperOperator implements InitializingBean {
protected CuratorFramework zkClient; protected CuratorFramework zkClient;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() {
this.zkClient = buildClient(); this.zkClient = buildClient();
initStateListener(); initStateListener();
treeCacheStart(); treeCacheStart();

143
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.IPUtils;
import java.nio.charset.StandardCharsets;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Test.None;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({LogClientService.class, IPUtils.class, LoggerUtils.class, NettyRemotingClient.class})
public class LogClientServiceTest {
@Test
public void testViewLogFromLocal() {
String localMachine = "LOCAL_MACHINE";
int port = 1234;
String path = "/tmp/log";
PowerMockito.mockStatic(IPUtils.class);
PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
LogClientService logClientService = new LogClientService();
String log = logClientService.viewLog(localMachine, port, path);
Assert.assertNotNull(log);
}
@Test
public void testViewLogFromRemote() throws Exception {
String localMachine = "LOCAL_MACHINE";
int port = 1234;
String path = "/tmp/log";
PowerMockito.mockStatic(IPUtils.class);
PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine + "1");
NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
Command command = new Command();
command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
String log = logClientService.viewLog(localMachine, port, path);
Assert.assertNotNull(log);
}
@Test(expected = None.class)
public void testClose() throws Exception {
NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
PowerMockito.doNothing().when(remotingClient).close();
LogClientService logClientService = new LogClientService();
logClientService.close();
}
@Test
public void testRollViewLog() throws Exception {
NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
Command command = new Command();
command.setBody(JSONUtils.toJsonByteArray(new RollViewLogResponseCommand("success")));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
Assert.assertNotNull(msg);
}
@Test
public void testGetLogBytes() throws Exception {
NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
Command command = new Command();
command.setBody(JSONUtils.toJsonByteArray(new GetLogBytesResponseCommand("log".getBytes(StandardCharsets.UTF_8))));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log");
Assert.assertNotNull(logBytes);
}
@Test
public void testRemoveTaskLog() throws Exception {
NettyRemotingClient remotingClient = PowerMockito.mock(NettyRemotingClient.class);
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
Command command = new Command();
command.setBody(JSONUtils.toJsonByteArray(new RemoveTaskLogResponseCommand(true)));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path");
Assert.assertTrue(status);
}
@Test
public void testIsRunning() {
LogClientService logClientService = new LogClientService();
Assert.assertTrue(logClientService.isRunning());
}
}

3
pom.xml

@ -909,6 +909,7 @@
<include>**/server/entity/SQLTaskExecutionContextTest.java</include> <include>**/server/entity/SQLTaskExecutionContextTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include> <include>**/server/log/MasterLogFilterTest.java</include>
<include>**/server/log/SensitiveDataConverterTest.java</include> <include>**/server/log/SensitiveDataConverterTest.java</include>
<include>**/server/log/LoggerRequestProcessorTest.java</include>
<!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>--> <!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->
<include>**/server/log/TaskLogFilterTest.java</include> <include>**/server/log/TaskLogFilterTest.java</include>
<include>**/server/log/WorkerLogFilterTest.java</include> <include>**/server/log/WorkerLogFilterTest.java</include>
@ -967,7 +968,7 @@
<include>**/service/zk/RegisterOperatorTest.java</include> <include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include> <include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include> <include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
<include>**/service/log/LogClientServiceTest.java</include>
<include>**/service/alert/AlertClientServiceTest.java</include> <include>**/service/alert/AlertClientServiceTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include> <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>--> <!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->

Loading…
Cancel
Save