diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index cd72a843e0..a304ab5a34 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -329,6 +329,9 @@ public final class Constants {
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server";
+ // suffix of crc file
+ public static final String CRC_SUFFIX = ".crc";
+
/**
* complement date default cron string
*/
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index 3808dd5747..3514117d82 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -28,11 +28,14 @@ import org.apache.commons.io.IOUtils;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -264,4 +267,36 @@ public class FileUtils {
}
}
+ /**
+ * Calculate file checksum with CRC32 algorithm
+ * @param pathName
+ * @return checksum of file/dir
+ */
+ public static String getFileChecksum(String pathName) throws IOException {
+ CRC32 crc32 = new CRC32();
+ File file = new File(pathName);
+ String crcString = "";
+ if (file.isDirectory()) {
+ // file system interface remains the same order
+ String[] subPaths = file.list();
+ StringBuilder concatenatedCRC = new StringBuilder();
+ for (String subPath : subPaths) {
+ concatenatedCRC.append(getFileChecksum(pathName + FOLDER_SEPARATOR + subPath));
+ }
+ crcString = concatenatedCRC.toString();
+ } else {
+ try (
+ FileInputStream fileInputStream = new FileInputStream(pathName);
+ CheckedInputStream checkedInputStream = new CheckedInputStream(fileInputStream, crc32);) {
+ while (checkedInputStream.read() != -1) {
+ }
+ } catch (IOException e) {
+ throw new IOException("Calculate checksum error.");
+ }
+ crcString = Long.toHexString(crc32.getValue());
+ }
+
+ return crcString;
+ }
+
}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
index e8a628ff08..355b9e09c9 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
@@ -118,4 +118,28 @@ public class FileUtilsTest {
Assertions.assertTrue(FileUtils.directoryTraversal(path));
}
+ @Test
+ void testGetFileChecksum() throws Exception {
+ String filePath1 = "test/testFile1.txt";
+ String filePath2 = "test/testFile2.txt";
+ String filePath3 = "test/testFile3.txt";
+ String content1 = "正正正faffdasfasdfas,한국어; 한글……にほんご\nfrançais";
+ String content2 = "正正正faffdasfasdfas,한국어; 한글……にほん\nfrançais";
+ FileUtils.writeContent2File(content1, filePath1);
+ FileUtils.writeContent2File(content2, filePath2);
+ FileUtils.writeContent2File(content1, filePath3);
+
+ String checksum1 = FileUtils.getFileChecksum(filePath1);
+ String checksum2 = FileUtils.getFileChecksum(filePath2);
+ String checksum3 = FileUtils.getFileChecksum(filePath3);
+
+ Assertions.assertNotEquals(checksum1, checksum2);
+ Assertions.assertEquals(checksum1, checksum3);
+
+ String dirPath = "test/";
+
+ Assertions.assertDoesNotThrow(
+ () -> FileUtils.getFileChecksum(dirPath));
+ }
+
}
diff --git a/dolphinscheduler-dao/pom.xml b/dolphinscheduler-dao/pom.xml
index e5620d1588..2c2fdb5b6c 100644
--- a/dolphinscheduler-dao/pom.xml
+++ b/dolphinscheduler-dao/pom.xml
@@ -112,6 +112,10 @@
h2
test
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-storage-api
+
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
index 207756af77..2c6a7d0960 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
@@ -17,9 +17,14 @@
package org.apache.dolphinscheduler.dao.utils;
+import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;
+
+import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -27,8 +32,11 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,10 +44,15 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.fasterxml.jackson.databind.JsonNode;
public class TaskCacheUtils {
+ protected static final Logger logger = LoggerFactory.getLogger(TaskCacheUtils.class);
+
private TaskCacheUtils() {
throw new IllegalStateException("Utility class");
}
@@ -54,15 +67,17 @@ public class TaskCacheUtils {
* 4. input VarPool, from upstream task and workflow global parameters
* @param taskInstance task instance
* @param taskExecutionContext taskExecutionContext
+ * @param storageOperate storageOperate
* @return cache key
*/
- public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) {
+ public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext,
+ StorageOperate storageOperate) {
List keyElements = new ArrayList<>();
keyElements.add(String.valueOf(taskInstance.getTaskCode()));
keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig()));
- keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext));
+ keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate));
String data = StringUtils.join(keyElements, "_");
return DigestUtils.sha256Hex(data);
}
@@ -109,7 +124,8 @@ public class TaskCacheUtils {
* @param taskInstance task instance
* taskExecutionContext taskExecutionContext
*/
- public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) {
+ public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context,
+ StorageOperate storageOperate) {
JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams());
// The set of input values considered from localParams in the taskParams
@@ -123,6 +139,12 @@ public class TaskCacheUtils {
// var pool value from upstream task
List varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
+ Map fileCheckSumMap = new HashMap<>();
+ List fileInput = varPool.stream().filter(property -> property.getType().equals(DataType.FILE))
+ .collect(Collectors.toList());
+ fileInput.forEach(
+ property -> fileCheckSumMap.put(property.getProp(), getValCheckSum(property, context, storageOperate)));
+
// var pool value from workflow global parameters
if (context.getPrepareParamsMap() != null) {
Set taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet());
@@ -139,9 +161,40 @@ public class TaskCacheUtils {
.filter(property -> propertyInSet.contains(property.getProp()))
.sorted(Comparator.comparing(Property::getProp))
.collect(Collectors.toList());
+
+ varPool.forEach(property -> {
+ if (property.getType() == DataType.FILE) {
+ property.setValue(fileCheckSumMap.get(property.getValue()));
+ }
+ });
return JSONUtils.toJsonString(varPool);
}
+ /**
+ * get checksum from crc32 file of file property in varPool
+ * cache can be used if content of upstream output files are the same
+ * @param fileProperty
+ * @param context
+ * @param storageOperate
+ */
+ public static String getValCheckSum(Property fileProperty, TaskExecutionContext context,
+ StorageOperate storageOperate) {
+ String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX;
+ String resourceCRCWholePath = storageOperate.getResourceFileName(context.getTenantCode(), resourceCRCPath);
+ String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath);
+ logger.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath);
+ String crcString = "";
+ try {
+ storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, false,
+ true);
+ crcString = FileUtils.readFile2Str(new FileInputStream(targetPath));
+ fileProperty.setValue(crcString);
+ } catch (IOException e) {
+ logger.error("Replace checksum failed for file property {}.", fileProperty.getProp());
+ }
+ return crcString;
+ }
+
/**
* get var in set from task definition
* @param taskInstance task instance
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java
index 8d3dbe655b..abdc023101 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java
@@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.dao.utils;
+import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;
+
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -35,6 +39,7 @@ import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
class TaskCacheUtilsTest {
@@ -42,6 +47,8 @@ class TaskCacheUtilsTest {
private TaskExecutionContext taskExecutionContext;
+ private StorageOperate storageOperate;
+
@BeforeEach
void setUp() {
String taskParams = "{\n" +
@@ -95,6 +102,7 @@ class TaskCacheUtilsTest {
prepareParamsMap.put("a", property);
taskExecutionContext.setPrepareParamsMap(prepareParamsMap);
+ storageOperate = Mockito.mock(StorageOperate.class);
}
@Test
@@ -121,25 +129,26 @@ class TaskCacheUtilsTest {
@Test
void TestGetTaskInputVarPoolData() {
- TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext);
+ TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate);
// only a=aa and c=cc will influence the result,
// b=bb is a fixed value, will be considered in task version
// k=kk is not in task params, will be ignored
String except =
"[{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"aa\"},{\"prop\":\"c\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"cc\"}]";
- Assertions.assertEquals(except, TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext));
+ Assertions.assertEquals(except,
+ TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate));
}
@Test
void TestGenerateCacheKey() {
- String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+ String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
Property propertyI = new Property();
propertyI.setProp("i");
propertyI.setDirect(Direct.IN);
propertyI.setType(DataType.VARCHAR);
propertyI.setValue("ii");
taskExecutionContext.getPrepareParamsMap().put("i", propertyI);
- String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+ String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// i will not influence the result, because task instance not use it
Assertions.assertEquals(cacheKeyBase, cacheKeyNew);
@@ -149,17 +158,17 @@ class TaskCacheUtilsTest {
propertyD.setType(DataType.VARCHAR);
propertyD.setValue("dd");
taskExecutionContext.getPrepareParamsMap().put("i", propertyD);
- String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+ String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// d will influence the result, because task instance use it
Assertions.assertNotEquals(cacheKeyBase, cacheKeyD);
taskInstance.setTaskDefinitionVersion(100);
- String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+ String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// task definition version is changed, so cache key changed
Assertions.assertNotEquals(cacheKeyD, cacheKeyE);
taskInstance.setEnvironmentConfig("export PYTHON_HOME=/bin/python3");
- String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+ String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// EnvironmentConfig is changed, so cache key changed
Assertions.assertNotEquals(cacheKeyE, cacheKeyF);
}
@@ -169,4 +178,23 @@ class TaskCacheUtilsTest {
String cacheKey = TaskCacheUtils.generateTagCacheKey(1, "123");
Assertions.assertEquals("1-123", cacheKey);
}
+
+ @Test
+ void testReplaceWithCheckSum() {
+ String content = "abcdefg";
+ String filePath = "test/testFile.txt";
+ FileUtils.writeContent2File(content, filePath + CRC_SUFFIX);
+
+ Property property = new Property();
+ property.setProp("f1");
+ property.setValue("testFile.txt");
+ property.setType(DataType.FILE);
+ property.setDirect(Direct.IN);
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setExecutePath("test");
+ taskExecutionContext.setTenantCode("aaa");
+
+ String crc = TaskCacheUtils.getValCheckSum(property, taskExecutionContext, storageOperate);
+ Assertions.assertEquals(crc, content);
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 6de30a7e66..db2b66a2b8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@@ -104,6 +105,12 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@Autowired
private TaskEventService taskEventService;
+ /**
+ * storage operator
+ */
+ @Autowired(required = false)
+ private StorageOperate storageOperate;
+
/**
* consumer thread pool
*/
@@ -298,7 +305,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
return false;
}
// check if task is cache execution
- String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
+ String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context, storageOperate);
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
// if we can find the cache task instance, we will add cache event, and return true.
if (cacheTaskInstance != null) {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
index cee09d1085..44eeeabe40 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
@@ -17,7 +17,10 @@
package org.apache.dolphinscheduler.server.worker.utils;
+import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;
+
import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -84,16 +87,31 @@ public class TaskFilesTransferUtils {
logger.info("Upload output files ...");
for (Property property : localParamsProperty) {
// get local file path
- String srcPath =
- packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+ String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue());
+ String srcPath = packIfDir(path);
+
+ // get crc file path
+ String srcCRCPath = srcPath + CRC_SUFFIX;
+ try {
+ FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath);
+ } catch (IOException ex) {
+ throw new TaskException(ex.getMessage(), ex);
+ }
+
// get remote file path
String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+ String resourceCRCPath = resourcePath + CRC_SUFFIX;
try {
// upload file to storage
String resourceWholePath =
storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+ String resourceCRCWholePath =
+ storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourceCRCPath);
logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+ logger.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
+ storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false,
+ true);
} catch (IOException ex) {
throw new TaskException("Upload file to storage error", ex);
}