From 7a100086b7cf0d86456446150811989ce54939b1 Mon Sep 17 00:00:00 2001 From: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Date: Tue, 28 Apr 2020 17:57:30 +0800 Subject: [PATCH 1/6] Increase dataX environment variable, sslTrust default value (#2555) * add LoginTest license * Delete useless packages * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group * Increase dataX environment variable, sslTrust default value Co-authored-by: chenxingchun <438044805@qq.com> --- .../src/main/resources/config/install_config.conf | 3 ++- script/env/dolphinscheduler_env.sh | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/resources/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf index 4671be7371..cba117e048 100644 --- a/dolphinscheduler-server/src/main/resources/config/install_config.conf +++ b/dolphinscheduler-server/src/main/resources/config/install_config.conf @@ -63,7 +63,8 @@ mailPassword="xxxxxxxxxx" # TLS mail protocol support starttlsEnable="false" -sslTrust="xxxxxxxxxx" +#note: sslTrust is the same as mailServerHost +sslTrust="smtp.exmail.qq.com" # SSL mail protocol support # note: The SSL protocol is enabled by default. diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh index e5b99e2857..066f379875 100644 --- a/script/env/dolphinscheduler_env.sh +++ b/script/env/dolphinscheduler_env.sh @@ -23,4 +23,6 @@ export PYTHON_HOME=/opt/soft/python export JAVA_HOME=/opt/soft/java export HIVE_HOME=/opt/soft/hive export FLINK_HOME=/opt/soft/flink -export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$PATH +export DATAX_HOME=/opt/soft/dataX + +export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH From a12103b08cfca98301d8deee5515d7ba9462a46d Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 28 Apr 2020 18:39:53 +0800 Subject: [PATCH 2/6] =?UTF-8?q?no=20valid=20worker=20group=EF=BC=8Cmaster?= =?UTF-8?q?=20can=20kill=20task=20directly=20(#2541)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * #2486 bug fix * host and workergroup compatible * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * #2499 bug fix * add comment * revert comment * revert comment * #2499 buf fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly Co-authored-by: qiaozhanwei --- .../consumer/TaskPriorityQueueConsumer.java | 16 +- .../master/runner/MasterTaskExecThread.java | 46 ++- .../TaskPriorityQueueConsumerTest.java | 262 ++++++++++++++++++ .../runner/MasterTaskExecThreadTest.java | 80 ++++++ .../server/registry/DependencyConfig.java | 8 + pom.xml | 2 + 6 files changed, 411 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 480d6657c2..3314789fdb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -120,19 +120,31 @@ public class TaskPriorityQueueConsumer extends Thread{ Boolean result = false; while (Stopper.isRunning()){ try { - result = dispatcher.dispatch(executionContext); + result = dispatcher.dispatch(executionContext); } catch (ExecuteException e) { logger.error("dispatch error",e); ThreadUtils.sleep(SLEEP_TIME_MILLIS); } - if (result){ + if (result || taskInstanceIsFinalState(taskInstanceId)){ break; } } return result; } + + /** + * taskInstance is final state + * success,failure,kill,stop,pause,threadwaiting is final state + * @param taskInstanceId taskInstanceId + * @return taskInstance is final state + */ + public Boolean taskInstanceIsFinalState(int taskInstanceId){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + return taskInstance.getState().typeIsFinished(); + } + /** * get TaskExecutionContext * @param taskInstanceId taskInstanceId diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 9986b07319..105584fe99 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -35,9 +36,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; +import java.util.Set; /** @@ -53,6 +57,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { private NettyExecutorManager nettyExecutorManager; + + /** + * zookeeper register center + */ + private ZookeeperRegistryCenter zookeeperRegistryCenter; + /** * constructor of MasterTaskExecThread * @param taskInstance task instance @@ -61,6 +71,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { super(taskInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); + this.zookeeperRegistryCenter = SpringApplicationContext.getBean(ZookeeperRegistryCenter.class); } /** @@ -175,6 +186,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } alreadyKilled = true; + String taskInstanceWorkerGroup = taskInstance.getWorkerGroup(); + + // not exists + if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){ + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + return; + } + TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); killCommand.setTaskInstanceId(taskInstance.getId()); @@ -185,10 +206,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { nettyExecutorManager.executeDirectly(executionContext); - logger.info("master add kill task :{} id:{} to kill queue", + logger.info("master kill taskInstance name :{} taskInstance id:{}", taskInstance.getName(), taskInstance.getId() ); } + /** + * whether exists valid worker group + * @param taskInstanceWorkerGroup taskInstanceWorkerGroup + * @return whether exists + */ + public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup){ + Set workerGroups = zookeeperRegistryCenter.getWorkerGroupDirectly(); + // not worker group + if (CollectionUtils.isEmpty(workerGroups)){ + return false; + } + + // has worker group , but not taskInstance assigned worker group + if (!workerGroups.contains(taskInstanceWorkerGroup)){ + return false; + } + Set workers = zookeeperRegistryCenter.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup); + if (CollectionUtils.isEmpty(workers)) { + return false; + } + return true; + } + /** * get task timeout parameter * @return TaskTimeoutParameter diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java new file mode 100644 index 0000000000..b6f118a734 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.consumer; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.Date; + + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, + NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) +public class TaskPriorityQueueConsumerTest { + + + @Autowired + private TaskPriorityQueue taskPriorityQueue; + + @Autowired + private TaskPriorityQueueConsumer taskPriorityQueueConsumer; + + @Autowired + private ProcessService processService; + + @Autowired + private ExecutorDispatcher dispatcher; + + @Before + public void init(){ + + Tenant tenant = new Tenant(); + tenant.setId(1); + tenant.setTenantCode("journey"); + tenant.setTenantName("journey"); + tenant.setDescription("journey"); + tenant.setQueueId(1); + tenant.setCreateTime(new Date()); + tenant.setUpdateTime(new Date()); + + Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant); + + Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default"); + } + + + @Test + public void testSHELLTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + Thread.sleep(10000); + } + + + @Test + public void testSQLTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SQL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + DataSource dataSource = new DataSource(); + dataSource.setId(1); + dataSource.setName("sqlDatasource"); + dataSource.setType(DbType.MYSQL); + dataSource.setUserId(2); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setCreateTime(new Date()); + dataSource.setUpdateTime(new Date()); + + Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); + + Thread.sleep(10000); + } + + + @Test + public void testDataxTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("DATAX"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\\":80,\\\"dataTarget\\\":80,\\\"sql\\\":\\\"SELECT dt,count FROM pv\\\",\\\"preStatements\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"DATAX\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + + + DataSource dataSource = new DataSource(); + dataSource.setId(80); + dataSource.setName("datax"); + dataSource.setType(DbType.MYSQL); + dataSource.setUserId(2); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setCreateTime(new Date()); + dataSource.setUpdateTime(new Date()); + + Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource); + + Thread.sleep(10000); + } + + + @Test + public void testSqoopTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SQOOP"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\",\\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\",\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\",\\\"modelType\\\":\\\"import\\\",\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\",\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\",\\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\",\\\"localParams\\\":[],\\\"concurrency\\\":1}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQOOP\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + + + DataSource dataSource = new DataSource(); + dataSource.setId(1); + dataSource.setName("datax"); + dataSource.setType(DbType.MYSQL); + dataSource.setUserId(2); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setCreateTime(new Date()); + dataSource.setUpdateTime(new Date()); + + Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); + + Thread.sleep(10000); + } + + + @Test + public void testTaskInstanceIsFinalState(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + + Mockito.when( processService.findTaskInstanceById(1)).thenReturn(taskInstance); + + taskPriorityQueueConsumer.taskInstanceIsFinalState(1); + } + + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java new file mode 100644 index 0000000000..ebddec4fb1 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import junit.framework.Assert; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; +import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.HashSet; +import java.util.Set; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, + NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class}) +public class MasterTaskExecThreadTest { + + @Test + public void testExistsValidWorkerGroup1(){ + ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); + Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(null); + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null); + masterTaskExecThread.existsValidWorkerGroup("default"); + } + @Test + public void testExistsValidWorkerGroup2(){ + ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); + Set workerGorups = new HashSet<>(); + workerGorups.add("test1"); + workerGorups.add("test2"); + + Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups); + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null); + masterTaskExecThread.existsValidWorkerGroup("default"); + } + + @Test + public void testExistsValidWorkerGroup3(){ + ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); + Set workerGorups = new HashSet<>(); + workerGorups.add("test1"); + + Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups); + Mockito.when(zookeeperRegistryCenter.getWorkerGroupNodesDirectly("test1")).thenReturn(workerGorups); + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null); + masterTaskExecThread.existsValidWorkerGroup("test1"); + } + + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index 0adea44cfd..93d2b03010 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.mockito.Mockito; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -144,4 +147,9 @@ public class DependencyConfig { public TaskResponseService taskResponseService(){ return Mockito.mock(TaskResponseService.class); } + + @Bean + public TaskPriorityQueue taskPriorityQueue(){ + return new TaskPriorityQueueImpl(); + } } diff --git a/pom.xml b/pom.xml index 0647724ed0..dad1e3696b 100644 --- a/pom.xml +++ b/pom.xml @@ -778,6 +778,8 @@ **/server/log/TaskLogDiscriminatorTest.java **/server/log/TaskLogFilterTest.java **/server/log/WorkerLogFilterTest.java + **/server/master/consumer/TaskPriorityQueueConsumerTest.java + **/server/master/runner/MasterTaskExecThreadTest.java **/server/master/dispatch/executor/NettyExecutorManagerTest.java **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java **/server/master/dispatch/host/assign/RandomSelectorTest.java From 5666e6b75af0e0be2f3778e476faeaa3201baddc Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 29 Apr 2020 17:36:21 +0800 Subject: [PATCH 3/6] No master don't create command #2571 (#2575) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * #2486 bug fix * host and workergroup compatible * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * #2499 bug fix * add comment * revert comment * revert comment * #2499 buf fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * No master don't create command #2571 * No master don't create command #2571 Co-authored-by: qiaozhanwei --- .../api/service/ExecutorService.java | 11 ++++- .../api/service/SchedulerService.java | 1 + .../api/service/ExecutorService2Test.java | 43 ++++++++++++++++--- 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 51f5420ac5..1aab0953da 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -59,7 +60,7 @@ public class ExecutorService extends BaseService{ private ProcessDefinitionMapper processDefinitionMapper; @Autowired - private ProcessDefinitionService processDefinitionService; + private MonitorService monitorService; @Autowired @@ -123,6 +124,14 @@ public class ExecutorService extends BaseService{ return result; } + // check master server exists + List masterServers = monitorService.getServerListFromZK(true); + + + if (masterServers.size() == 0) { + putMsg(result, Status.MASTER_NOT_EXISTS); + return result; + } /** * create command */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index cb07ffbbe3..9328fe0375 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -365,6 +365,7 @@ public class SchedulerService extends BaseService { if (masterServers.size() == 0) { putMsg(result, Status.MASTER_NOT_EXISTS); + return result; } // set status diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index a8777541b7..59523bdd11 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; @@ -63,6 +64,9 @@ public class ExecutorService2Test { @Mock private ProjectService projectService; + @Mock + private MonitorService monitorService; + private int processDefinitionId = 1; private int tenantId = 1; @@ -102,6 +106,7 @@ public class ExecutorService2Test { Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); + Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); } /** @@ -121,7 +126,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ - Assert.assertTrue(false); } } @@ -142,7 +146,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); }catch (Exception e){ - Assert.assertTrue(false); } } @@ -163,7 +166,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ - Assert.assertTrue(false); } } @@ -184,7 +186,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); }catch (Exception e){ - Assert.assertTrue(false); } } @@ -205,10 +206,42 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); }catch (Exception e){ - Assert.assertTrue(false); } } + + @Test + public void testNoMsterServers() throws ParseException{ + Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList()); + + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Assert.assertEquals(result.get(Constants.STATUS),Status.MASTER_NOT_EXISTS); + + } + + private List getMasterServersList(){ + List masterServerList = new ArrayList<>(); + Server masterServer1 = new Server(); + masterServer1.setId(1); + masterServer1.setHost("192.168.220.188"); + masterServer1.setPort(1121); + masterServerList.add(masterServer1); + + Server masterServer2 = new Server(); + masterServer2.setId(2); + masterServer2.setHost("192.168.220.189"); + masterServer2.setPort(1122); + masterServerList.add(masterServer2); + + return masterServerList; + + } + private List zeroSchedulerList(){ return Collections.EMPTY_LIST; } From 22d4ee942ca91652240f58a96217d2bb11dc07ad Mon Sep 17 00:00:00 2001 From: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Date: Wed, 29 Apr 2020 18:29:50 +0800 Subject: [PATCH 4/6] modify dataX environment variable (#2577) * add LoginTest license * Delete useless packages * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group * Increase dataX environment variable, sslTrust default value * modify dataX environment variable Co-authored-by: chenxingchun <438044805@qq.com> --- script/env/dolphinscheduler_env.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh index 066f379875..026b620203 100644 --- a/script/env/dolphinscheduler_env.sh +++ b/script/env/dolphinscheduler_env.sh @@ -23,6 +23,6 @@ export PYTHON_HOME=/opt/soft/python export JAVA_HOME=/opt/soft/java export HIVE_HOME=/opt/soft/hive export FLINK_HOME=/opt/soft/flink -export DATAX_HOME=/opt/soft/dataX +export DATAX_HOME=/opt/soft/datax/bin/datax.py -export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH +export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH From 049470311aee1b51045542130179683f710bcc6d Mon Sep 17 00:00:00 2001 From: break60 <790061044@qq.com> Date: Thu, 30 Apr 2020 09:25:53 +0800 Subject: [PATCH 5/6] Fix condition judgment state value --- .../src/js/conf/home/pages/dag/_source/dag.vue | 7 +++++++ .../dag/_source/formModel/tasks/_source/nodeStatus.vue | 4 ++-- .../pages/dag/_source/formModel/tasks/conditions.vue | 9 ++++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index 6f630071c1..ae15e20f74 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -259,8 +259,15 @@ if (v2.name === v1.name) { let dom = $(`#${v2.id}`) let state = dom.find('.state-p') + let depState = '' + taskList.forEach(item=>{ + if(item.name==v1.name) { + depState = item.state + } + }) dom.attr('data-state-id', v1.stateId) dom.attr('data-dependent-result', v1.dependentResult || '') + dom.attr('data-dependent-depState', depState) state.append(``) state.find('strong').attr('title', titleTpl(v2, v1.desc)) } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue index 0c3f7433a3..972782207c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue @@ -28,8 +28,8 @@ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue index fb3f2c295c..621de81037 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue @@ -143,13 +143,20 @@ created () { let o = this.backfillItem let dependentResult = $(`#${o.id}`).data('dependent-result') || {} + // Does not represent an empty object backfill if (!_.isEmpty(o)) { this.relation = _.cloneDeep(o.dependence.relation) || 'AND' this.dependTaskList = _.cloneDeep(o.dependence.dependTaskList) || [] let defaultState = this.isDetails ? 'WAITING' : '' // Process instance return status display matches by key - _.map(this.dependTaskList, v => _.map(v.dependItemList, v1 => v1.state = dependentResult[`${v1.definitionId}-${v1.depTasks}-${v1.cycle}-${v1.dateValue}`] || defaultState)) + _.map(this.dependTaskList, v => _.map(v.dependItemList, v1 => { + $(`#${o.id}`).siblings().each(function(){ + if(v1.depTasks == $(this).text()) { + v1.state = $(this).attr('data-dependent-depstate') + } + }); + })) } }, mounted () { From 16a4d59085eb7425367873523195cfa1b0303611 Mon Sep 17 00:00:00 2001 From: break60 <790061044@qq.com> Date: Thu, 30 Apr 2020 10:53:02 +0800 Subject: [PATCH 6/6] fix --- .../pages/dag/_source/formModel/tasks/_source/nodeStatus.vue | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue index 972782207c..549cb119f9 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue @@ -176,8 +176,7 @@ } }) }, - mounted () { - }, + mounted () {}, components: {} }