samz406
5 years ago
committed by
GitHub
19 changed files with 473 additions and 41 deletions
@ -0,0 +1,109 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.utils; |
||||
|
||||
import org.junit.After; |
||||
import org.junit.Before; |
||||
import org.junit.Rule; |
||||
import org.junit.Test; |
||||
import org.junit.rules.TemporaryFolder; |
||||
import org.mockito.Mockito; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.core.io.Resource; |
||||
import org.springframework.web.multipart.MultipartFile; |
||||
|
||||
import java.io.ByteArrayInputStream; |
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.io.InputStream; |
||||
|
||||
import static org.junit.Assert.*; |
||||
|
||||
public class FileUtilsTest { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileUtilsTest.class); |
||||
|
||||
@Rule |
||||
public TemporaryFolder folder = null; |
||||
|
||||
private String rootPath = null; |
||||
|
||||
@Before |
||||
public void setUp() throws Exception { |
||||
|
||||
folder = new TemporaryFolder(); |
||||
folder.create(); |
||||
|
||||
rootPath = folder.getRoot().getAbsolutePath(); |
||||
} |
||||
|
||||
@After |
||||
public void tearDown() throws Exception { |
||||
|
||||
folder.delete(); |
||||
} |
||||
|
||||
/** |
||||
* Use mock to test copyFile |
||||
* @throws IOException |
||||
*/ |
||||
@Test |
||||
public void testCopyFile() throws IOException { |
||||
|
||||
//Define dest file path
|
||||
String destFilename = rootPath + System.getProperty("file.separator") + "data.txt"; |
||||
logger.info("destFilename: "+destFilename); |
||||
|
||||
//Define InputStream for MultipartFile
|
||||
String data = "data text"; |
||||
InputStream targetStream = new ByteArrayInputStream(data.getBytes()); |
||||
|
||||
//Use Mockito to mock MultipartFile
|
||||
MultipartFile file = Mockito.mock(MultipartFile.class); |
||||
Mockito.when(file.getInputStream()).thenReturn(targetStream); |
||||
|
||||
//Invoke copyFile
|
||||
FileUtils.copyFile(file,destFilename); |
||||
|
||||
//Test file exists
|
||||
File destFile = new File(destFilename); |
||||
assertTrue(destFile.exists()); |
||||
|
||||
} |
||||
|
||||
@Test |
||||
public void testFile2Resource() throws IOException { |
||||
|
||||
//Define dest file path
|
||||
String destFilename = rootPath + System.getProperty("file.separator") + "data.txt"; |
||||
logger.info("destFilename: "+destFilename); |
||||
|
||||
//Define test resource
|
||||
File file = folder.newFile("resource.txt"); |
||||
|
||||
//Invoke file2Resource and test not null
|
||||
Resource resource = FileUtils.file2Resource(file.getAbsolutePath()); |
||||
assertNotNull(resource); |
||||
|
||||
//Invoke file2Resource and test null
|
||||
Resource resource1 = FileUtils.file2Resource(file.getAbsolutePath()+"abc"); |
||||
assertNull(resource1); |
||||
|
||||
} |
||||
} |
@ -0,0 +1,40 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.common.enums; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.EnumValue; |
||||
import lombok.Getter; |
||||
|
||||
@Getter |
||||
public enum SparkVersion { |
||||
|
||||
/** |
||||
* 0 SPARK1 |
||||
* 1 SPARK2 |
||||
*/ |
||||
SPARK1(0, "SPARK1"), |
||||
SPARK2(1, "SPARK2"); |
||||
|
||||
SparkVersion(int code, String descp){ |
||||
this.code = code; |
||||
this.descp = descp; |
||||
} |
||||
|
||||
@EnumValue |
||||
private final int code; |
||||
private final String descp; |
||||
} |
@ -0,0 +1,141 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.worker.task.spark; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
import org.apache.dolphinscheduler.common.enums.SparkVersion; |
||||
import org.apache.dolphinscheduler.common.process.Property; |
||||
import org.apache.dolphinscheduler.common.task.spark.SparkParameters; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.ParameterUtils; |
||||
import org.apache.dolphinscheduler.server.utils.ParamUtils; |
||||
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; |
||||
import org.apache.dolphinscheduler.server.worker.task.TaskProps; |
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
|
||||
public class SparkTaskTest { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class); |
||||
|
||||
/** |
||||
* spark1 command |
||||
*/ |
||||
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; |
||||
|
||||
/** |
||||
* spark2 command |
||||
*/ |
||||
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; |
||||
|
||||
@Test |
||||
public void testSparkTaskInit() { |
||||
|
||||
TaskProps taskProps = new TaskProps(); |
||||
|
||||
String spark1Params = "{" + |
||||
"\"mainArgs\":\"\", " + |
||||
"\"driverMemory\":\"1G\", " + |
||||
"\"executorMemory\":\"2G\", " + |
||||
"\"programType\":\"SCALA\", " + |
||||
"\"mainClass\":\"basicetl.GlobalUserCar\", " + |
||||
"\"driverCores\":\"2\", " + |
||||
"\"deployMode\":\"cluster\", " + |
||||
"\"executorCores\":2, " + |
||||
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + |
||||
"\"sparkVersion\":\"SPARK1\", " + |
||||
"\"numExecutors\":\"10\", " + |
||||
"\"localParams\":[], " + |
||||
"\"others\":\"\", " + |
||||
"\"resourceList\":[]" + |
||||
"}"; |
||||
|
||||
String spark2Params = "{" + |
||||
"\"mainArgs\":\"\", " + |
||||
"\"driverMemory\":\"1G\", " + |
||||
"\"executorMemory\":\"2G\", " + |
||||
"\"programType\":\"SCALA\", " + |
||||
"\"mainClass\":\"basicetl.GlobalUserCar\", " + |
||||
"\"driverCores\":\"2\", " + |
||||
"\"deployMode\":\"cluster\", " + |
||||
"\"executorCores\":2, " + |
||||
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + |
||||
"\"sparkVersion\":\"SPARK2\", " + |
||||
"\"numExecutors\":\"10\", " + |
||||
"\"localParams\":[], " + |
||||
"\"others\":\"\", " + |
||||
"\"resourceList\":[]" + |
||||
"}"; |
||||
|
||||
taskProps.setTaskParams(spark2Params); |
||||
|
||||
logger.info("spark task params {}", taskProps.getTaskParams()); |
||||
|
||||
SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class); |
||||
|
||||
assert sparkParameters != null; |
||||
if (!sparkParameters.checkParameters()) { |
||||
throw new RuntimeException("spark task params is not valid"); |
||||
} |
||||
sparkParameters.setQueue(taskProps.getQueue()); |
||||
|
||||
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { |
||||
String args = sparkParameters.getMainArgs(); |
||||
|
||||
/** |
||||
* combining local and global parameters |
||||
*/ |
||||
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), |
||||
taskProps.getDefinedParams(), |
||||
sparkParameters.getLocalParametersMap(), |
||||
taskProps.getCmdTypeIfComplement(), |
||||
taskProps.getScheduleTime()); |
||||
if (paramsMap != null) { |
||||
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); |
||||
} |
||||
sparkParameters.setMainArgs(args); |
||||
} |
||||
|
||||
List<String> args = new ArrayList<>(); |
||||
|
||||
//spark version
|
||||
String sparkCommand = SPARK2_COMMAND; |
||||
|
||||
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { |
||||
sparkCommand = SPARK1_COMMAND; |
||||
} |
||||
|
||||
args.add(sparkCommand); |
||||
|
||||
// other parameters
|
||||
args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); |
||||
|
||||
String sparkArgs = String.join(" ", args); |
||||
|
||||
logger.info("spark task command : {}", sparkArgs); |
||||
|
||||
Assert.assertEquals(sparkArgs.split(" ")[0], SPARK2_COMMAND ); |
||||
|
||||
} |
||||
} |
@ -0,0 +1,16 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
@ -0,0 +1,16 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
@ -0,0 +1,25 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); |
||||
SET FOREIGN_KEY_CHECKS=0; |
||||
UPDATE QRTZ_CRON_TRIGGERS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler'; |
||||
UPDATE QRTZ_TRIGGERS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler'; |
||||
UPDATE QRTZ_FIRED_TRIGGERS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler'; |
||||
UPDATE QRTZ_JOB_DETAILS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler'; |
||||
UPDATE QRTZ_JOB_DETAILS SET JOB_CLASS_NAME='org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob' WHERE JOB_CLASS_NAME='cn.escheduler.server.quartz.ProcessScheduleJob'; |
||||
UPDATE QRTZ_LOCKS SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler'; |
||||
UPDATE QRTZ_SCHEDULER_STATE SET SCHED_NAME='DolphinScheduler' WHERE SCHED_NAME='EasyScheduler'; |
@ -0,0 +1,16 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
@ -0,0 +1,16 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
Loading…
Reference in new issue