Browse Source

[Workflow] Print WorkflowInstance detail/ TaskInstance detail when finished (#14677)

* [Workflow] Print WorkflowInstance detail/ TaskInstance detail when finished

* [Workflow] minor fix

* [Workflow] add ut

* [Workflow] add header

---------

Co-authored-by: tengting.xu <xtt@dipeak.com>
3.2.1-prepare
Tengting Xu 11 months ago committed by GitHub
parent
commit
9963bfc612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  2. 88
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java
  3. 115
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -85,6 +85,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
@ -433,7 +434,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
taskInstance.getTaskCode(),
taskInstance.getState());
this.updateProcessInstanceState();
// log the taskInstance in detail after task is finished
log.info(WorkflowInstanceUtils.logTaskInstanceInDetail(taskInstance));
sendTaskLogOnMasterToRemoteIfNeeded(taskInstance);
} catch (Exception ex) {
log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskSet", ex);
@ -759,6 +761,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
// release task group
processService.releaseAllTaskGroup(workflowInstance.getId());
}
// Log the workflowInstance in detail
log.info(WorkflowInstanceUtils.logWorkflowInstanceInDetails(workflowInstance));
}
public void checkSerialProcess(ProcessDefinition processDefinition) {

88
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import lombok.experimental.UtilityClass;
import com.google.common.base.Strings;
@UtilityClass
public class WorkflowInstanceUtils {
public static String logWorkflowInstanceInDetails(ProcessInstance workflowInstance) {
StringBuilder logBuilder = new StringBuilder();
// set the length for '*'
int horizontalLineLength = 80;
// Append the title and the centered "Workflow Instance Detail"
int titleLength = 40;
int leftSpaces = (horizontalLineLength - titleLength) / 2;
String centeredTitle = String.format("%" + leftSpaces + "s%s", "", "Workflow Instance Detail");
logBuilder.append("\n").append(Strings.repeat("*", horizontalLineLength)).append("\n")
.append(centeredTitle).append("\n")
.append(Strings.repeat("*", horizontalLineLength)).append("\n")
.append("Workflow Name: ").append(workflowInstance.getProcessDefinition().getName())
.append("\n")
.append("Workflow Instance Name: ").append(workflowInstance.getName()).append("\n")
.append("Command Type: ").append(workflowInstance.getCommandType()).append("\n")
.append("State: ").append(workflowInstance.getState().getDesc()).append("\n")
.append("Host: ").append(workflowInstance.getHost()).append("\n")
.append("Is Sub Process: ").append(workflowInstance.getIsSubProcess().getDescp())
.append("\n")
.append("Run Times: ").append(workflowInstance.getRunTimes()).append("\n")
.append("Max Try Times: ").append(workflowInstance.getMaxTryTimes()).append("\n")
.append("Schedule Time: ").append(workflowInstance.getScheduleTime()).append("\n")
.append("Dry Run: ").append(workflowInstance.getDryRun()).append("\n")
.append("Tenant: ").append(workflowInstance.getTenantCode()).append("\n")
.append("Restart Time: ").append(workflowInstance.getRestartTime()).append("\n")
.append("Work Group: ").append(workflowInstance.getWorkerGroup()).append("\n")
.append("Start Time: ").append(workflowInstance.getStartTime()).append("\n")
.append("End Time: ").append(workflowInstance.getEndTime()).append("\n");
return logBuilder.toString();
}
public String logTaskInstanceInDetail(TaskInstance taskInstance) {
StringBuilder logBuilder = new StringBuilder();
// set the length for '*'
int horizontalLineLength = 80;
// Append the title and the centered "Task Instance Detail"
int titleLength = 40;
int leftSpaces = (horizontalLineLength - titleLength) / 2;
String centeredTitle = String.format("%" + leftSpaces + "s%s", "", "Task Instance Detail");
logBuilder.append("\n").append(Strings.repeat("*", horizontalLineLength)).append("\n")
.append(centeredTitle).append("\n")
.append(Strings.repeat("*", horizontalLineLength)).append("\n")
.append("Task Name: ").append(taskInstance.getName()).append("\n")
.append("Workflow Instance Name: ").append(taskInstance.getProcessInstance().getName()).append("\n")
.append("Task Execute Type: ").append(taskInstance.getTaskExecuteType().getDesc()).append("\n")
.append("Execute State: ").append(taskInstance.getState().getDesc()).append("\n")
.append("Host: ").append(taskInstance.getHost()).append("\n")
.append("Task Type: ").append(taskInstance.getTaskType()).append("\n")
.append("Priority: ").append(taskInstance.getTaskInstancePriority().getDescp())
.append("\n")
.append("Tenant: ").append(taskInstance.getProcessInstance().getTenantCode())
.append("\n")
.append("First Submit Time: ").append(taskInstance.getFirstSubmitTime()).append("\n")
.append("Submit Time: ").append(taskInstance.getSubmitTime()).append("\n")
.append("Start Time: ").append(taskInstance.getStartTime()).append("\n")
.append("End Time: ").append(taskInstance.getEndTime()).append("\n");
return logBuilder.toString();
}
}

115
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.sql.Date;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class WorkflowInstanceUtilsTest {
@Test
public void testLogWorkflowInstanceInDetails() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setName("test_workflow");
ProcessInstance workflowInstance = new ProcessInstance();
workflowInstance.setProcessDefinition(processDefinition);
workflowInstance.setName("test_workflow_20230801");
workflowInstance.setCommandType(CommandType.REPEAT_RUNNING);
workflowInstance.setState(WorkflowExecutionStatus.SUCCESS);
workflowInstance.setHost("127.0.0.1");
workflowInstance.setIsSubProcess(Flag.NO);
workflowInstance.setRunTimes(1);
workflowInstance.setMaxTryTimes(0);
workflowInstance.setScheduleTime(Date.valueOf("2023-08-01"));
workflowInstance.setDryRun(0);
workflowInstance.setTenantCode("default");
workflowInstance.setRestartTime(Date.valueOf("2023-08-01"));
workflowInstance.setWorkerGroup("default");
workflowInstance.setStartTime(Date.valueOf("2023-08-01"));
workflowInstance.setEndTime(Date.valueOf("2023-08-01"));
Assertions.assertEquals("\n"
+ "********************************************************************************\n"
+ " Workflow Instance Detail\n"
+ "********************************************************************************\n"
+ "Workflow Name: test_workflow\n"
+ "Workflow Instance Name: test_workflow_20230801\n"
+ "Command Type: REPEAT_RUNNING\n"
+ "State: success\n"
+ "Host: 127.0.0.1\n"
+ "Is Sub Process: no\n"
+ "Run Times: 1\n"
+ "Max Try Times: 0\n"
+ "Schedule Time: 2023-08-01\n"
+ "Dry Run: 0\n"
+ "Tenant: default\n"
+ "Restart Time: 2023-08-01\n"
+ "Work Group: default\n"
+ "Start Time: 2023-08-01\n"
+ "End Time: 2023-08-01\n",
WorkflowInstanceUtils.logWorkflowInstanceInDetails(workflowInstance));
}
@Test
public void testLogTaskInstanceInDetails() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setName("test_process");
processInstance.setTenantCode("default");
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName("test_task");
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance.setTaskExecuteType(TaskExecuteType.BATCH);
taskInstance.setHost("127.0.0.1");
taskInstance.setTaskType("SHELL");
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
taskInstance.setFirstSubmitTime(Date.valueOf("2023-08-01"));
taskInstance.setSubmitTime(Date.valueOf("2023-08-01"));
taskInstance.setStartTime(Date.valueOf("2023-08-01"));
taskInstance.setEndTime(Date.valueOf("2023-08-01"));
Assertions.assertEquals("\n"
+ "********************************************************************************\n"
+ " Task Instance Detail\n"
+ "********************************************************************************\n"
+ "Task Name: test_task\n"
+ "Workflow Instance Name: test_process\n"
+ "Task Execute Type: batch\n"
+ "Execute State: success\n"
+ "Host: 127.0.0.1\n"
+ "Task Type: SHELL\n"
+ "Priority: medium\n"
+ "Tenant: default\n"
+ "First Submit Time: 2023-08-01\n"
+ "Submit Time: 2023-08-01\n"
+ "Start Time: 2023-08-01\n"
+ "End Time: 2023-08-01\n", WorkflowInstanceUtils.logTaskInstanceInDetail(taskInstance));
}
}
Loading…
Cancel
Save