Browse Source

add task-logger config (#7423)

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
1cbc86b9e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  3. 23
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  4. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  5. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  6. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  7. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
  8. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  9. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  10. 2
      dolphinscheduler-master/src/main/resources/application.yaml
  11. 2
      dolphinscheduler-standalone-server/src/main/resources/application.yaml

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -39,6 +39,7 @@ public class MasterConfig {
private int stateWheelInterval; private int stateWheelInterval;
private double maxCpuLoadAvg; private double maxCpuLoadAvg;
private double reservedMemory; private double reservedMemory;
private boolean taskLogger;
public int getListenPort() { public int getListenPort() {
return listenPort; return listenPort;
@ -135,4 +136,12 @@ public class MasterConfig {
public void setReservedMemory(double reservedMemory) { public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory; this.reservedMemory = reservedMemory;
} }
public boolean isTaskLogger() {
return taskLogger;
}
public void setTaskLogger(boolean taskLogger) {
this.taskLogger = taskLogger;
}
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -825,7 +825,7 @@ public class WorkflowExecuteThread implements Runnable {
// package task instance before submit // package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance); processService.packageTaskInstance(taskInstance, processInstance);
boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval()); boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger());
if (!submit) { if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(), processInstance.getId(), processInstance.getName(),

23
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -85,22 +85,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/** /**
* pause task, common tasks donot need this. * pause task, common tasks donot need this.
*
* @return
*/ */
protected abstract boolean pauseTask(); protected abstract boolean pauseTask();
/** /**
* kill task, all tasks need to realize this function * kill task, all tasks need to realize this function
*
* @return
*/ */
protected abstract boolean killTask(); protected abstract boolean killTask();
/** /**
* task timeout process * task timeout process
*
* @return
*/ */
protected abstract boolean taskTimeout(); protected abstract boolean taskTimeout();
@ -134,7 +128,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
} }
/** /**
* @return *
*/ */
protected boolean pause() { protected boolean pause() {
if (paused) { if (paused) {
@ -227,7 +221,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/** /**
* set master task running logger. * set master task running logger.
*/ */
public void setTaskExecutionLogger() { public void setTaskExecutionLogger(boolean isTaskLogger) {
if (!isTaskLogger) {
return;
}
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getFirstSubmitTime(), taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionCode(),
@ -240,7 +237,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set procedure task relation * set procedure task relation
* *
* @param procedureTaskExecutionContext procedureTaskExecutionContext * @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskInstance taskInstance * @param taskInstance taskInstance
*/ */
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) { private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class); ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
@ -253,7 +250,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set datax task relation * set datax task relation
* *
* @param dataxTaskExecutionContext dataxTaskExecutionContext * @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskInstance taskInstance * @param taskInstance taskInstance
*/ */
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) { protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class); DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
@ -278,7 +275,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set sqoop task relation * set sqoop task relation
* *
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskInstance taskInstance * @param taskInstance taskInstance
*/ */
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) { private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class); SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
@ -309,7 +306,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set SQL task relation * set SQL task relation
* *
* @param sqlTaskExecutionContext sqlTaskExecutionContext * @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskInstance taskInstance * @param taskInstance taskInstance
*/ */
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) { private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class); SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
@ -345,7 +342,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/** /**
* whehter tenant is null * whehter tenant is null
* *
* @param tenant tenant * @param tenant tenant
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return result * @return result
*/ */

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -54,14 +54,14 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
NettyExecutorManager nettyExecutorManager; NettyExecutorManager nettyExecutorManager;
@Override @Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
this.processInstance = processInstance; this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval); this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
if (this.taskInstance == null) { if (this.taskInstance == null) {
return false; return false;
} }
setTaskExecutionLogger(); setTaskExecutionLogger(isTaskLogger);
int taskGroupId = task.getTaskGroupId(); int taskGroupId = task.getTaskGroupId();
if (taskGroupId > 0) { if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(), boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(),
@ -85,7 +85,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Override @Override
public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) { public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
this.dispatchTask(taskInstance,processInstance); this.dispatchTask(taskInstance, processInstance);
} }
@Override @Override

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -71,7 +71,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
private TaskDefinition taskDefinition; private TaskDefinition taskDefinition;
@Override @Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance; this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@ -82,7 +82,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
); );
setTaskExecutionLogger(); setTaskExecutionLogger(isTaskLogger);
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters(); initTaskParameters();
@ -141,7 +141,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
} }
private void initTaskParameters() { private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(), taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(), taskInstance.getProcessInstanceId(),
taskInstance.getId())); taskInstance.getId()));

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -81,7 +81,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
boolean allDependentItemFinished; boolean allDependentItemFinished;
@Override @Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance; this.processInstance = processInstance;
this.taskInstance = task; this.taskInstance = task;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@ -97,7 +97,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(), processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(), taskInstance.getProcessInstanceId(),
taskInstance.getId())); taskInstance.getId()));
setTaskExecutionLogger(); setTaskExecutionLogger(isTaskLogger);
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java

@ -32,7 +32,7 @@ public interface ITaskProcessor {
String getType(); String getType();
boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval); boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger);
ExecutionStatus taskState(); ExecutionStatus taskState();

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -53,7 +53,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
private StateEventCallbackService stateEventCallbackService; private StateEventCallbackService stateEventCallbackService;
@Override @Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance; this.processInstance = processInstance;
taskDefinition = processService.findTaskDefinition( taskDefinition = processService.findTaskDefinition(
task.getTaskCode(), task.getTaskDefinitionVersion() task.getTaskCode(), task.getTaskDefinitionVersion()
@ -63,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) { if (this.taskInstance == null) {
return false; return false;
} }
setTaskExecutionLogger(); setTaskExecutionLogger(isTaskLogger);
return true; return true;
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -65,8 +65,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private DependResult conditionResult; private DependResult conditionResult;
@Override @Override
public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance; this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
@ -80,7 +79,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(), processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(), taskInstance.getProcessInstanceId(),
taskInstance.getId())); taskInstance.getId()));
setTaskExecutionLogger(); setTaskExecutionLogger(isTaskLogger);
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());

2
dolphinscheduler-master/src/main/resources/application.yaml

@ -104,6 +104,8 @@ master:
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3 reserved-memory: 0.3
# use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
task-logger: true
server: server:
port: 5679 port: 5679

2
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -113,6 +113,8 @@ master:
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3 reserved-memory: 0.3
# use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
task-logger: true
worker: worker:
# worker listener port # worker listener port

Loading…
Cancel
Save