From c4375a54c167f8e96d80c3cc67ad0a607ba2ef7b Mon Sep 17 00:00:00 2001 From: LiuBodong Date: Thu, 23 Sep 2021 22:23:17 +0800 Subject: [PATCH] [Fix][Task] Task lost resource information (#6292) (#6295) Co-authored-by: liubodong --- .../common/process/ResourceInfo.java | 51 ++++--- .../service/process/ProcessService.java | 69 ++++++++++ .../service/process/ProcessServiceTest.java | 129 +++++++++++++++++- 3 files changed, 231 insertions(+), 18 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java index 287f7267bc..cacdbf5579 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java @@ -20,26 +20,43 @@ package org.apache.dolphinscheduler.common.process; * resource info */ public class ResourceInfo { - /** - * res the name of the resource that was uploaded - */ - private int id; + /** + * res id of the resource that was uploaded + */ + private int id; - public int getId() { - return id; - } + private String res; - public void setId(int id) { - this.id = id; - } + /** + * full name of the resource that was uploaded + */ + private String resourceName; - private String res; + public ResourceInfo() { + // do nothing, void constructor + } - public String getRes() { - return res; - } + public int getId() { + return id; + } - public void setRes(String res) { - this.res = res; - } + public void setId(int id) { + this.id = id; + } + + public String getRes() { + return res; + } + + public void setRes(String res) { + this.res = res; + } + + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c652a511a2..dca69ff15b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; import static java.util.stream.Collectors.toSet; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -1527,10 +1528,78 @@ public class ProcessService { TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); + updateTaskDefinitionResources(taskDefinition); taskInstance.setTaskDefine(taskDefinition); return taskInstance; } + /** + * Update {@link ResourceInfo} information in {@link TaskDefinition} + * + * @param taskDefinition the given {@link TaskDefinition} + */ + private void updateTaskDefinitionResources(TaskDefinition taskDefinition) { + Map taskParameters = JSONUtils.parseObject( + taskDefinition.getTaskParams(), + new TypeReference>() { }); + if (taskParameters != null) { + // if contains mainJar field, query resource from database + // Flink, Spark, MR + if (taskParameters.containsKey("mainJar")) { + Object mainJarObj = taskParameters.get("mainJar"); + ResourceInfo mainJar = JSONUtils.parseObject( + JSONUtils.toJsonString(mainJarObj), + ResourceInfo.class); + ResourceInfo resourceInfo = updateResourceInfo(mainJar); + if (resourceInfo != null) { + taskParameters.put("mainJar", resourceInfo); + } + } + // update resourceList information + if (taskParameters.containsKey("resourceList")) { + String resourceListStr = JSONUtils.toJsonString(taskParameters.get("resourceList")); + List resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class); + List updatedResourceInfos = resourceInfos + .stream() + .map(this::updateResourceInfo) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + taskParameters.put("resourceList", updatedResourceInfos); + } + // set task parameters + taskDefinition.setTaskParams(JSONUtils.toJsonString(taskParameters)); + } + } + + /** + * update {@link ResourceInfo} by given original ResourceInfo + * + * @param res origin resource info + * @return {@link ResourceInfo} + */ + private ResourceInfo updateResourceInfo(ResourceInfo res) { + ResourceInfo resourceInfo = null; + // only if mainJar is not null and does not contains "resourceName" field + if (res != null) { + int resourceId = res.getId(); + if (resourceId <= 0) { + logger.error("invalid resourceId, {}", resourceId); + return null; + } + resourceInfo = new ResourceInfo(); + // get resource from database, only one resource should be returned + Resource resource = getResourceById(resourceId); + resourceInfo.setId(resourceId); + resourceInfo.setRes(resource.getFileName()); + resourceInfo.setResourceName(resource.getFullName()); + if (logger.isInfoEnabled()) { + logger.info("updated resource info {}", + JSONUtils.toJsonString(resourceInfo)); + } + } + return resourceInfo; + } + /** * get id list by task state * diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 1bbbca3104..69b708a8c4 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; - import static org.mockito.ArgumentMatchers.any; import org.apache.dolphinscheduler.common.Constants; @@ -32,6 +31,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; @@ -41,6 +42,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; @@ -51,6 +54,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; @@ -58,10 +62,13 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Assert; import org.junit.Test; @@ -70,6 +77,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +116,8 @@ public class ProcessServiceTest { private ProcessTaskRelationMapper processTaskRelationMapper; @Mock private ProcessDefinitionLogMapper processDefineLogMapper; + @Mock + private ResourceMapper resourceMapper; @Test public void testCreateSubCommand() { @@ -477,4 +487,121 @@ public class ProcessServiceTest { processService.changeOutParam(taskInstance); } + @Test + public void testUpdateTaskDefinitionResources() throws Exception { + TaskDefinition taskDefinition = new TaskDefinition(); + String taskParameters = "{\n" + + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n" + + " \"mainJar\": {\n" + + " \"id\": 1\n" + + " },\n" + + " \"deployMode\": \"cluster\",\n" + + " \"resourceList\": [\n" + + " {\n" + + " \"id\": 3\n" + + " },\n" + + " {\n" + + " \"id\": 4\n" + + " }\n" + + " ],\n" + + " \"localParams\": [],\n" + + " \"driverCores\": 1,\n" + + " \"driverMemory\": \"512M\",\n" + + " \"numExecutors\": 2,\n" + + " \"executorMemory\": \"2G\",\n" + + " \"executorCores\": 2,\n" + + " \"appName\": \"\",\n" + + " \"mainArgs\": \"\",\n" + + " \"others\": \"\",\n" + + " \"programType\": \"JAVA\",\n" + + " \"sparkVersion\": \"SPARK2\",\n" + + " \"dependence\": {},\n" + + " \"conditionResult\": {\n" + + " \"successNode\": [\n" + + " \"\"\n" + + " ],\n" + + " \"failedNode\": [\n" + + " \"\"\n" + + " ]\n" + + " },\n" + + " \"waitStartTimeout\": {}\n" + + "}"; + taskDefinition.setTaskParams(taskParameters); + + Map resourceMap = + Stream.of(1, 3, 4) + .map(i -> { + Resource resource = new Resource(); + resource.setId(i); + resource.setFileName("file" + i); + resource.setFullName("/file" + i); + return resource; + }) + .collect( + Collectors.toMap( + Resource::getId, + resource -> resource) + ); + for (Integer integer : Arrays.asList(1, 3, 4)) { + Mockito.when(resourceMapper.selectById(integer)) + .thenReturn(resourceMap.get(integer)); + } + + Whitebox.invokeMethod(processService, + "updateTaskDefinitionResources", + taskDefinition); + + String taskParams = taskDefinition.getTaskParams(); + SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, SparkParameters.class); + ResourceInfo mainJar = sparkParameters.getMainJar(); + Assert.assertEquals(1, mainJar.getId()); + Assert.assertEquals("file1", mainJar.getRes()); + Assert.assertEquals("/file1", mainJar.getResourceName()); + + Assert.assertEquals(2, sparkParameters.getResourceList().size()); + ResourceInfo res1 = sparkParameters.getResourceList().get(0); + ResourceInfo res2 = sparkParameters.getResourceList().get(1); + Assert.assertEquals(3, res1.getId()); + Assert.assertEquals("file3", res1.getRes()); + Assert.assertEquals("/file3", res1.getResourceName()); + Assert.assertEquals(4, res2.getId()); + Assert.assertEquals("file4", res2.getRes()); + Assert.assertEquals("/file4", res2.getResourceName()); + + } + + @Test + public void testUpdateResourceInfo() throws Exception { + // test if input is null + ResourceInfo resourceInfoNull = null; + ResourceInfo updatedResourceInfo1 = Whitebox.invokeMethod(processService, + "updateResourceInfo", + resourceInfoNull); + Assert.assertNull(updatedResourceInfo1); + + // test if resource id less than 1 + ResourceInfo resourceInfoVoid = new ResourceInfo(); + ResourceInfo updatedResourceInfo2 = Whitebox.invokeMethod(processService, + "updateResourceInfo", + resourceInfoVoid); + Assert.assertNull(updatedResourceInfo2); + + // test normal situation + ResourceInfo resourceInfoNormal = new ResourceInfo(); + resourceInfoNormal.setId(1); + Resource resource = new Resource(); + resource.setId(1); + resource.setFileName("test.txt"); + resource.setFullName("/test.txt"); + Mockito.when(resourceMapper.selectById(1)).thenReturn(resource); + ResourceInfo updatedResourceInfo3 = Whitebox.invokeMethod(processService, + "updateResourceInfo", + resourceInfoNormal); + + Assert.assertEquals(1, updatedResourceInfo3.getId()); + Assert.assertEquals("test.txt", updatedResourceInfo3.getRes()); + Assert.assertEquals("/test.txt", updatedResourceInfo3.getResourceName()); + + } + }