diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 88ecfbde57..a76dd5e066 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -39,6 +39,7 @@ public class MasterConfig { private int stateWheelInterval; private double maxCpuLoadAvg; private double reservedMemory; + private boolean taskLogger; public int getListenPort() { return listenPort; @@ -135,4 +136,12 @@ public class MasterConfig { public void setReservedMemory(double reservedMemory) { this.reservedMemory = reservedMemory; } + + public boolean isTaskLogger() { + return taskLogger; + } + + public void setTaskLogger(boolean taskLogger) { + this.taskLogger = taskLogger; + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index a974e6d31b..24d49e89a5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -825,7 +825,7 @@ public class WorkflowExecuteThread implements Runnable { // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); - boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval()); + boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger()); if (!submit) { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index b4951f449d..ceefa26d1b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -85,22 +85,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { /** * pause task, common tasks donot need this. - * - * @return */ protected abstract boolean pauseTask(); /** * kill task, all tasks need to realize this function - * - * @return */ protected abstract boolean killTask(); /** * task timeout process - * - * @return */ protected abstract boolean taskTimeout(); @@ -134,7 +128,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { } /** - * @return + * */ protected boolean pause() { if (paused) { @@ -227,7 +221,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { /** * set master task running logger. */ - public void setTaskExecutionLogger() { + public void setTaskExecutionLogger(boolean isTaskLogger) { + if (!isTaskLogger) { + return; + } logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), @@ -240,7 +237,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { * set procedure task relation * * @param procedureTaskExecutionContext procedureTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) { ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class); @@ -253,7 +250,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { * set datax task relation * * @param dataxTaskExecutionContext dataxTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) { DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class); @@ -278,7 +275,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { * set sqoop task relation * * @param sqoopTaskExecutionContext sqoopTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) { SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class); @@ -309,7 +306,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { * set SQL task relation * * @param sqlTaskExecutionContext sqlTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) { SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class); @@ -345,7 +342,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { /** * whehter tenant is null * - * @param tenant tenant + * @param tenant tenant * @param taskInstance taskInstance * @return result */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index d44315a6ba..f7053ff884 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -54,14 +54,14 @@ public class CommonTaskProcessor extends BaseTaskProcessor { NettyExecutorManager nettyExecutorManager; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { + public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) { this.processInstance = processInstance; this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } - setTaskExecutionLogger(); + setTaskExecutionLogger(isTaskLogger); int taskGroupId = task.getTaskGroupId(); if (taskGroupId > 0) { boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(), @@ -85,7 +85,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { @Override public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) { - this.dispatchTask(taskInstance,processInstance); + this.dispatchTask(taskInstance, processInstance); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index c3c65b3fba..bf2390a347 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -71,7 +71,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { private TaskDefinition taskDefinition; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { + public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { this.processInstance = processInstance; this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); @@ -82,7 +82,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); - setTaskExecutionLogger(); + setTaskExecutionLogger(isTaskLogger); String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); Thread.currentThread().setName(threadLoggerInfoName); initTaskParameters(); @@ -141,7 +141,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { } private void initTaskParameters() { - taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(), + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 28cd0e7ab7..6d311ee008 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -81,7 +81,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { boolean allDependentItemFinished; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { + public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { this.processInstance = processInstance; this.taskInstance = task; this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); @@ -97,7 +97,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - setTaskExecutionLogger(); + setTaskExecutionLogger(isTaskLogger); taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index 882533726b..66e49f99cc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -32,7 +32,7 @@ public interface ITaskProcessor { String getType(); - boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval); + boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger); ExecutionStatus taskState(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 10d1b28464..0f8ccbc354 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -53,7 +53,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { private StateEventCallbackService stateEventCallbackService; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { + public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { this.processInstance = processInstance; taskDefinition = processService.findTaskDefinition( task.getTaskCode(), task.getTaskDefinitionVersion() @@ -63,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { if (this.taskInstance == null) { return false; } - setTaskExecutionLogger(); + setTaskExecutionLogger(isTaskLogger); return true; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 0fa6e5dbd2..a4259ce6b2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -65,8 +65,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { private DependResult conditionResult; @Override - public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { - + public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { this.processInstance = processInstance; this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); @@ -80,7 +79,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - setTaskExecutionLogger(); + setTaskExecutionLogger(isTaskLogger); taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 6800496050..f043196e4c 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -104,6 +104,8 @@ master: max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 + # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file + task-logger: true server: port: 5679 diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 4aacdbab10..bd98cd280e 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -113,6 +113,8 @@ master: max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 + # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file + task-logger: true worker: # worker listener port