|
|
@ -21,14 +21,10 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; |
|
|
|
import static org.powermock.api.mockito.PowerMockito.mock; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.Flag; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; |
|
|
|
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; |
|
|
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
|
|
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
|
|
|
import org.apache.dolphinscheduler.common.graph.DAG; |
|
|
|
import org.apache.dolphinscheduler.common.graph.DAG; |
|
|
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
@ -61,16 +57,10 @@ import org.junit.Before; |
|
|
|
import org.junit.Test; |
|
|
|
import org.junit.Test; |
|
|
|
import org.junit.runner.RunWith; |
|
|
|
import org.junit.runner.RunWith; |
|
|
|
import org.mockito.Mockito; |
|
|
|
import org.mockito.Mockito; |
|
|
|
import org.powermock.api.mockito.PowerMockito; |
|
|
|
import org.mockito.junit.MockitoJUnitRunner; |
|
|
|
import org.powermock.core.classloader.annotations.PrepareForTest; |
|
|
|
|
|
|
|
import org.powermock.modules.junit4.PowerMockRunner; |
|
|
|
|
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@RunWith(MockitoJUnitRunner.class) |
|
|
|
* test for WorkflowExecuteThread |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
@RunWith(PowerMockRunner.class) |
|
|
|
|
|
|
|
@PrepareForTest({WorkflowExecuteRunnable.class}) |
|
|
|
|
|
|
|
public class WorkflowExecuteRunnableTest { |
|
|
|
public class WorkflowExecuteRunnableTest { |
|
|
|
|
|
|
|
|
|
|
|
private WorkflowExecuteRunnable workflowExecuteThread; |
|
|
|
private WorkflowExecuteRunnable workflowExecuteThread; |
|
|
@ -91,44 +81,32 @@ public class WorkflowExecuteRunnableTest { |
|
|
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
@Before |
|
|
|
public void init() throws Exception { |
|
|
|
public void init() throws Exception { |
|
|
|
applicationContext = mock(ApplicationContext.class); |
|
|
|
applicationContext = Mockito.mock(ApplicationContext.class); |
|
|
|
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); |
|
|
|
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); |
|
|
|
springApplicationContext.setApplicationContext(applicationContext); |
|
|
|
springApplicationContext.setApplicationContext(applicationContext); |
|
|
|
|
|
|
|
|
|
|
|
config = new MasterConfig(); |
|
|
|
config = new MasterConfig(); |
|
|
|
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); |
|
|
|
processService = Mockito.mock(ProcessService.class); |
|
|
|
|
|
|
|
processInstanceDao = Mockito.mock(ProcessInstanceDao.class); |
|
|
|
processService = mock(ProcessService.class); |
|
|
|
processInstance = Mockito.mock(ProcessInstance.class); |
|
|
|
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
processInstanceDao = mock(ProcessInstanceDao.class); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
processInstance = mock(ProcessInstance.class); |
|
|
|
|
|
|
|
Mockito.when(processInstance.getState()).thenReturn(WorkflowExecutionStatus.SUCCESS); |
|
|
|
|
|
|
|
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); |
|
|
|
|
|
|
|
Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO); |
|
|
|
|
|
|
|
Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00")); |
|
|
|
|
|
|
|
Map<String, String> cmdParam = new HashMap<>(); |
|
|
|
Map<String, String> cmdParam = new HashMap<>(); |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); |
|
|
|
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam)); |
|
|
|
|
|
|
|
ProcessDefinition processDefinition = new ProcessDefinition(); |
|
|
|
ProcessDefinition processDefinition = new ProcessDefinition(); |
|
|
|
processDefinition.setGlobalParamMap(Collections.emptyMap()); |
|
|
|
processDefinition.setGlobalParamMap(Collections.emptyMap()); |
|
|
|
processDefinition.setGlobalParamList(Collections.emptyList()); |
|
|
|
processDefinition.setGlobalParamList(Collections.emptyList()); |
|
|
|
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); |
|
|
|
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); |
|
|
|
|
|
|
|
|
|
|
|
stateWheelExecuteThread = mock(StateWheelExecuteThread.class); |
|
|
|
stateWheelExecuteThread = Mockito.mock(StateWheelExecuteThread.class); |
|
|
|
curingGlobalParamsService = mock(CuringParamsService.class); |
|
|
|
curingGlobalParamsService = Mockito.mock(CuringParamsService.class); |
|
|
|
NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class); |
|
|
|
NettyExecutorManager nettyExecutorManager = Mockito.mock(NettyExecutorManager.class); |
|
|
|
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class); |
|
|
|
ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class); |
|
|
|
workflowExecuteThread = PowerMockito.spy( |
|
|
|
workflowExecuteThread = Mockito.spy( |
|
|
|
new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, |
|
|
|
new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, |
|
|
|
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService)); |
|
|
|
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService)); |
|
|
|
// prepareProcess init dag
|
|
|
|
|
|
|
|
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); |
|
|
|
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); |
|
|
|
dag.setAccessible(true); |
|
|
|
dag.setAccessible(true); |
|
|
|
dag.set(workflowExecuteThread, new DAG()); |
|
|
|
dag.set(workflowExecuteThread, new DAG()); |
|
|
|
PowerMockito.doNothing().when(workflowExecuteThread, "endProcess"); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
@ -136,7 +114,6 @@ public class WorkflowExecuteRunnableTest { |
|
|
|
try { |
|
|
|
try { |
|
|
|
Map<String, String> cmdParam = new HashMap<>(); |
|
|
|
Map<String, String> cmdParam = new HashMap<>(); |
|
|
|
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3"); |
|
|
|
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3"); |
|
|
|
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam)); |
|
|
|
|
|
|
|
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class; |
|
|
|
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class; |
|
|
|
Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class); |
|
|
|
Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class); |
|
|
|
method.setAccessible(true); |
|
|
|
method.setAccessible(true); |
|
|
@ -249,19 +226,15 @@ public class WorkflowExecuteRunnableTest { |
|
|
|
processDefinition1.setCode(11L); |
|
|
|
processDefinition1.setCode(11L); |
|
|
|
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT); |
|
|
|
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT); |
|
|
|
Mockito.when(processInstance.getId()).thenReturn(225); |
|
|
|
Mockito.when(processInstance.getId()).thenReturn(225); |
|
|
|
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance); |
|
|
|
|
|
|
|
workflowExecuteThread.checkSerialProcess(processDefinition1); |
|
|
|
workflowExecuteThread.checkSerialProcess(processDefinition1); |
|
|
|
|
|
|
|
|
|
|
|
Mockito.when(processInstance.getId()).thenReturn(225); |
|
|
|
|
|
|
|
Mockito.when(processInstance.getNextProcessInstanceId()).thenReturn(222); |
|
|
|
Mockito.when(processInstance.getNextProcessInstanceId()).thenReturn(222); |
|
|
|
|
|
|
|
|
|
|
|
ProcessInstance processInstance9 = new ProcessInstance(); |
|
|
|
ProcessInstance processInstance9 = new ProcessInstance(); |
|
|
|
processInstance9.setId(222); |
|
|
|
processInstance9.setId(222); |
|
|
|
processInstance9.setProcessDefinitionCode(11L); |
|
|
|
processInstance9.setProcessDefinitionCode(11L); |
|
|
|
processInstance9.setProcessDefinitionVersion(1); |
|
|
|
processInstance9.setProcessDefinitionVersion(1); |
|
|
|
processInstance9.setState(WorkflowExecutionStatus.SERIAL_WAIT); |
|
|
|
processInstance9.setState(WorkflowExecutionStatus.SERIAL_WAIT); |
|
|
|
|
|
|
|
|
|
|
|
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance); |
|
|
|
|
|
|
|
Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9); |
|
|
|
Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9); |
|
|
|
workflowExecuteThread.checkSerialProcess(processDefinition1); |
|
|
|
workflowExecuteThread.checkSerialProcess(processDefinition1); |
|
|
|
} catch (Exception e) { |
|
|
|
} catch (Exception e) { |
|
|
|