From 053e548bf30aa9af0877364a7b06e7603f1c57d0 Mon Sep 17 00:00:00 2001 From: blackberrier Date: Thu, 10 Jun 2021 09:39:12 +0800 Subject: [PATCH] [Improvement-5539][Master] Check status of taskInstance from cache (#5572) * improvement:check status of taskInstance from cache * issue5572 use timer instead of while&sleep; consider concurrent modification * use computeifpresent instead of lock * simplify getByTaskInstanceId function * add ut for TaskInstanceCacheManagerImpl; fix bug in TaskInstanceCacheManagerImpl * add Apache license header;add test class in root pom --- .../dolphinscheduler/common/Constants.java | 5 + .../impl/TaskInstanceCacheManagerImpl.java | 48 ++++- .../master/runner/MasterTaskExecThread.java | 3 +- .../TaskInstanceCacheManagerImplTest.java | 177 ++++++++++++++++++ pom.xml | 1 + 5 files changed, 227 insertions(+), 7 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index c366bace80..59e65c578e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -530,6 +530,11 @@ public final class Constants { */ public static final int SLEEP_TIME_MILLIS = 1000; + /** + * master task instance cache-database refresh interval + */ + public static final int CACHE_REFRESH_TIME_MILLIS = 20 * 1000; + /** * heartbeat for zk info length */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java index 366a6c4a9c..43632aed65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java @@ -14,8 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.cache.impl; +import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS; + import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -25,8 +28,14 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -47,6 +56,24 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { @Autowired private ProcessService processService; + /** + * taskInstance cache refresh timer + */ + private Timer refreshTaskInstanceTimer = null; + + @PostConstruct + public void init() { + //issue#5539 add thread to fetch task state from database in a fixed rate + this.refreshTaskInstanceTimer = new Timer(true); + refreshTaskInstanceTimer.scheduleAtFixedRate( + new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS + ); + } + + @PreDestroy + public void close() { + this.refreshTaskInstanceTimer.cancel(); + } /** * get taskInstance by taskInstance id @@ -56,12 +83,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { */ @Override public TaskInstance getByTaskInstanceId(Integer taskInstanceId) { - TaskInstance taskInstance = taskInstanceCache.get(taskInstanceId); - if (taskInstance == null){ - taskInstance = processService.findTaskInstanceById(taskInstanceId); - taskInstanceCache.put(taskInstanceId,taskInstance); - } - return taskInstance; + return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId)); } /** @@ -106,6 +128,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId()); taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus())); taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime()); + taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance); } /** @@ -116,4 +139,17 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { public void removeByTaskInstanceId(Integer taskInstanceId) { taskInstanceCache.remove(taskInstanceId); } + + class RefreshTaskInstanceTimerTask extends TimerTask { + @Override + public void run() { + for (Entry taskInstanceEntry : taskInstanceCache.entrySet()) { + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey()); + if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { + taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance); + } + } + + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 4ffcc2279a..d01ef0f1a0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -143,7 +143,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { this.checkTimeoutFlag = !alertTimeout(); } // updateProcessInstance task instance - taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + //issue#5539 Check status of taskInstance from cache + taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId()); Thread.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java new file mode 100644 index 0000000000..8dc3f802de --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache.impl; + +import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TaskInstanceCacheManagerImplTest { + + @InjectMocks + private TaskInstanceCacheManagerImpl taskInstanceCacheManager; + + @Mock(name = "processService") + private ProcessService processService; + + @Before + public void before() { + + TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand(); + taskExecuteAckCommand.setStatus(1); + taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker"); + taskExecuteAckCommand.setHost("worker007"); + taskExecuteAckCommand.setLogPath("/temp/worker.log"); + taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7)); + taskExecuteAckCommand.setTaskInstanceId(0); + + taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand); + + } + + @Test + public void testInit() throws InterruptedException { + + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(0); + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + taskInstance.setExecutePath("/dolphinscheduler/worker"); + taskInstance.setHost("worker007"); + taskInstance.setLogPath("/temp/worker.log"); + taskInstance.setProcessInstanceId(0); + + Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance); + + taskInstanceCacheManager.init(); + TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000); + + Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState()); + + } + + @Test + public void getByTaskInstanceIdFromCache() { + TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0); + + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(0); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setExecutePath("/dolphinscheduler/worker"); + taskInstance.setHost("worker007"); + taskInstance.setLogPath("/temp/worker.log"); + taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7)); + + Assert.assertEquals(taskInstance.toString(), instanceGot.toString()); + + } + + @Test + public void getByTaskInstanceIdFromDatabase() { + + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setExecutePath("/dolphinscheduler/worker"); + taskInstance.setHost("worker007"); + taskInstance.setLogPath("/temp/worker.log"); + taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7)); + + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + + TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1); + + Assert.assertEquals(taskInstance, instanceGot); + + } + + @Test + public void cacheTaskInstanceByTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(2); + taskExecutionContext.setTaskName("blackberrier test"); + taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7)); + taskExecutionContext.setTaskType(TaskType.SPARK.getDesc()); + taskExecutionContext.setExecutePath("/tmp"); + + taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext); + + TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2); + + Assert.assertEquals(taskInstance.getId(), 2); + Assert.assertEquals(taskInstance.getName(), "blackberrier test"); + Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7)); + Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc()); + Assert.assertEquals(taskInstance.getExecutePath(), "/tmp"); + + } + + @Test + public void testCacheTaskInstanceByTaskExecuteAckCommand() { + TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0); + + Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState()); + Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime()); + Assert.assertEquals("worker007", taskInstance.getHost()); + Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath()); + Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath()); + + } + + @Test + public void testCacheTaskInstanceByTaskExecuteResponseCommand() { + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + responseCommand.setTaskInstanceId(0); + responseCommand.setStatus(9); + responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8)); + + taskInstanceCacheManager.cacheTaskInstance(responseCommand); + + TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0); + + Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime()); + Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState()); + + } + + @Test + public void removeByTaskInstanceId() { + taskInstanceCacheManager.removeByTaskInstanceId(0); + Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0)); + + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2003b8cd4f..4f9fa3e6cf 100644 --- a/pom.xml +++ b/pom.xml @@ -966,6 +966,7 @@ **/server/log/TaskLogFilterTest.java **/server/log/WorkerLogFilterTest.java + **/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java **/server/master/config/MasterConfigTest.java **/server/master/consumer/TaskPriorityQueueConsumerTest.java **/server/master/runner/MasterTaskExecThreadTest.java