@ -20,13 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS ;
import static org.mockito.ArgumentMatchers.any ;
import static org.powermock.api.mockito.PowerMockito.doReturn ;
import static org.powermock.api.mockito.PowerMockito.mock ;
import static org.powermock.api.mockito.PowerMockito.mockStatic ;
import static org.powermock.api.mockito.PowerMockito.spy ;
import static org.powermock.api.mockito.PowerMockito.when ;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack ;
import org.apache.dolphinscheduler.plugin.task.api.TaskException ;
@ -42,13 +36,10 @@ import org.junit.Assert;
import org.junit.Before ;
import org.junit.Test ;
import org.junit.runner.RunWith ;
import org.powermock.api.mockito.PowerMockito ;
import org.powermock.core.classloader.annotations.PowerMockIgnore ;
import org.powermock.core.classloader.annotations.PrepareForTest ;
import org.powermock.modules.junit4.PowerMockRunner ;
import org.mockito.Mockito ;
import org.mockito.junit.MockitoJUnitRunner ;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce ;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder ;
import com.amazonaws.services.elasticmapreduce.model.AmazonElasticMapReduceException ;
import com.amazonaws.services.elasticmapreduce.model.Cluster ;
import com.amazonaws.services.elasticmapreduce.model.ClusterState ;
@ -58,14 +49,7 @@ import com.amazonaws.services.elasticmapreduce.model.ClusterStatus;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult ;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult ;
@RunWith ( PowerMockRunner . class )
@PrepareForTest ( {
AmazonElasticMapReduceClientBuilder . class ,
EmrJobFlowTask . class ,
AmazonElasticMapReduce . class ,
JSONUtils . class
} )
@PowerMockIgnore ( { "javax.*" } )
@RunWith ( MockitoJUnitRunner . class )
public class EmrJobFlowTaskTest {
private final ClusterStatus startingStatus =
@ -126,22 +110,22 @@ public class EmrJobFlowTaskTest {
@Before
public void before ( ) throws Exception {
String emrParameters = buildEmrTaskParameters ( ) ;
TaskExecutionContext taskExecutionContext = Power Mockito. mock ( TaskExecutionContext . class ) ;
when ( taskExecutionContext . getTaskParams ( ) ) . thenReturn ( emrParameters ) ;
emrJobFlowTask = spy ( new EmrJobFlowTask ( taskExecutionContext ) ) ;
TaskExecutionContext taskExecutionContext = Mockito . mock ( TaskExecutionContext . class ) ;
Mockito . when ( taskExecutionContext . getTaskParams ( ) ) . thenReturn ( emrParameters ) ;
emrJobFlowTask = Mockito . spy ( new EmrJobFlowTask ( taskExecutionContext ) ) ;
// mock emrClient and behavior
emrClient = mock ( AmazonElasticMapReduce . class ) ;
RunJobFlowResult runJobFlowResult = mock ( RunJobFlowResult . class ) ;
when ( emrClient . runJobFlow ( any ( ) ) ) . thenReturn ( runJobFlowResult ) ;
when ( runJobFlowResult . getJobFlowId ( ) ) . thenReturn ( "xx" ) ;
doReturn ( emrClient ) . when ( emrJobFlowTask , "createEmrClient" ) ;
DescribeClusterResult describeClusterResult = mock ( DescribeClusterResult . class ) ;
when ( emrClient . describeCluster ( any ( ) ) ) . thenReturn ( describeClusterResult ) ;
emrClient = Mockito . mock ( AmazonElasticMapReduce . class ) ;
RunJobFlowResult runJobFlowResult = Mockito . mock ( RunJobFlowResult . class ) ;
Mockito . when ( emrClient . runJobFlow ( any ( ) ) ) . thenReturn ( runJobFlowResult ) ;
Mockito . when ( runJobFlowResult . getJobFlowId ( ) ) . thenReturn ( "xx" ) ;
Mockito . doReturn ( emrClient ) . when ( emrJobFlowTask ) . createEmrClient ( ) ;
DescribeClusterResult describeClusterResult = Mockito . mock ( DescribeClusterResult . class ) ;
Mockito . when ( emrClient . describeCluster ( any ( ) ) ) . thenReturn ( describeClusterResult ) ;
// mock cluster
cluster = mock ( Cluster . class ) ;
when ( describeClusterResult . getCluster ( ) ) . thenReturn ( cluster ) ;
cluster = Mockito . mock ( Cluster . class ) ;
Mockito . when ( describeClusterResult . getCluster ( ) ) . thenReturn ( cluster ) ;
emrJobFlowTask . init ( ) ;
}
@ -149,7 +133,7 @@ public class EmrJobFlowTaskTest {
@Test
public void testHandle ( ) throws Exception {
when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , softwareConfigStatus , runningStatus , terminatingStatus ) ;
Mockito . when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , softwareConfigStatus , runningStatus , terminatingStatus ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
Assert . assertEquals ( EXIT_CODE_SUCCESS , emrJobFlowTask . getExitStatusCode ( ) ) ;
@ -158,7 +142,7 @@ public class EmrJobFlowTaskTest {
@Test
public void testHandleAliveWhenNoSteps ( ) throws Exception {
when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , softwareConfigStatus , runningStatus , waitingStatus ) ;
Mockito . when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , softwareConfigStatus , runningStatus , waitingStatus ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
Assert . assertEquals ( EXIT_CODE_SUCCESS , emrJobFlowTask . getExitStatusCode ( ) ) ;
@ -166,7 +150,7 @@ public class EmrJobFlowTaskTest {
@Test
public void testHandleUserRequestTerminate ( ) throws Exception {
when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , userRequestTerminateStatus ) ;
Mockito . when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , userRequestTerminateStatus ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
Assert . assertEquals ( EXIT_CODE_KILL , emrJobFlowTask . getExitStatusCode ( ) ) ;
@ -174,7 +158,7 @@ public class EmrJobFlowTaskTest {
@Test
public void testHandleTerminatedWithError ( ) throws Exception {
when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , softwareConfigStatus , runningStatus , terminatedWithErrorsStatus ) ;
Mockito . when ( cluster . getStatus ( ) ) . thenReturn ( startingStatus , softwareConfigStatus , runningStatus , terminatedWithErrorsStatus ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
Assert . assertEquals ( EXIT_CODE_FAILURE , emrJobFlowTask . getExitStatusCode ( ) ) ;
@ -182,21 +166,20 @@ public class EmrJobFlowTaskTest {
@Test ( expected = TaskException . class )
public void testCanNotParseJson ( ) throws Exception {
mockStatic ( JSONUtils . class ) ;
when ( emrJobFlowTask , "createRunJobFlowRequest" ) . thenThrow ( new EmrTaskException ( "can not parse RunJobFlowRequest from json" , new Exception ( "error" ) ) ) ;
Mockito . when ( emrJobFlowTask . createRunJobFlowRequest ( ) ) . thenThrow ( new EmrTaskException ( "can not parse RunJobFlowRequest from json" , new Exception ( "error" ) ) ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
}
@Test ( expected = TaskException . class )
public void testClusterStatusNull ( ) throws Exception {
when ( emrClient . describeCluster ( any ( ) ) ) . thenReturn ( null ) ;
Mockito . when ( emrClient . describeCluster ( any ( ) ) ) . thenReturn ( null ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
}
@Test ( expected = TaskException . class )
public void testRunJobFlowError ( ) throws Exception {
when ( emrClient . runJobFlow ( any ( ) ) ) . thenThrow ( new AmazonElasticMapReduceException ( "error" ) , new EmrTaskException ( ) ) ;
Mockito . when ( emrClient . runJobFlow ( any ( ) ) ) . thenThrow ( new AmazonElasticMapReduceException ( "error" ) , new EmrTaskException ( ) ) ;
emrJobFlowTask . handle ( taskCallBack ) ;
}