diff --git a/escheduler-alert/pom.xml b/escheduler-alert/pom.xml
index eebf8b8f50..79defd54b8 100644
--- a/escheduler-alert/pom.xml
+++ b/escheduler-alert/pom.xml
@@ -4,7 +4,7 @@
cn.analysys
escheduler
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
escheduler-alert
jar
diff --git a/escheduler-api/pom.xml b/escheduler-api/pom.xml
index e5c5bda8f5..74a97b4784 100644
--- a/escheduler-api/pom.xml
+++ b/escheduler-api/pom.xml
@@ -3,7 +3,7 @@
cn.analysys
escheduler
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
escheduler-api
jar
diff --git a/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java b/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java
index 3a6fc1e64d..6fcaf1171d 100644
--- a/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java
+++ b/escheduler-api/src/main/java/cn/escheduler/api/quartz/ProcessScheduleJob.java
@@ -125,7 +125,7 @@ public class ProcessScheduleJob implements Job {
}
Command command = new Command();
- command.setCommandType(CommandType.START_PROCESS);
+ command.setCommandType(CommandType.SCHEDULER);
command.setExecutorId(schedule.getUserId());
command.setFailureStrategy(schedule.getFailureStrategy());
command.setProcessDefinitionId(schedule.getProcessDefinitionId());
diff --git a/escheduler-common/pom.xml b/escheduler-common/pom.xml
index c0daa0bce7..178e969bb3 100644
--- a/escheduler-common/pom.xml
+++ b/escheduler-common/pom.xml
@@ -4,7 +4,7 @@
escheduler
cn.analysys
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
escheduler-common
escheduler-common
diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java
index 4e9a11c841..e0155e606e 100644
--- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java
+++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java
@@ -70,25 +70,6 @@ public final class Constants {
*/
public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
- /**
- * spring.redis.maxIdle
- */
- public static final String SPRING_REDIS_MAXIDLE = "spring.redis.maxIdle";
-
- /**
- * spring.redis.maxTotal
- */
- public static final String SPRING_REDIS_MAXTOTAL = "spring.redis.maxTotal";
-
- /**
- * spring.redis.host
- */
- public static final String SPRING_REDIS_HOST = "spring.redis.host";
-
- /**
- * spring.redis.port
- */
- public static final String SPRING_REDIS_PORT = "spring.redis.port";
/**
* hdfs configuration
@@ -117,9 +98,14 @@ public final class Constants {
public static final String ESCHEDULER_ENV_PATH = "escheduler.env.path";
/**
- * escheduler.env.py
+ * escheduler.env.sh
*/
- public static final String ESCHEDULER_ENV_PY = "escheduler.env.py";
+ public static final String ESCHEDULER_ENV_SH = ".escheduler_env.sh";
+
+ /**
+ * python home
+ */
+ public static final String PYTHON_HOME="PYTHON_HOME";
/**
* resource.view.suffixs
@@ -255,8 +241,6 @@ public final class Constants {
public static final String SCHEDULER_QUEUE_IMPL = "escheduler.queue.impl";
- public static final String SCHEDULER_QUEUE_REDIS_IMPL = "redis";
-
/**
* date format of yyyy-MM-dd HH:mm:ss
diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
index 6930d8db35..d0164791d2 100644
--- a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
+++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
@@ -46,13 +46,6 @@ public class CommonUtils {
return envPath;
}
- /**
- * @return get the path of Python system environment variables
- */
- public static String getPythonSystemEnvPath() {
- return getString(ESCHEDULER_ENV_PY);
- }
-
/**
* @return get queue implementation name
*/
diff --git a/escheduler-common/src/main/resources/common/common.properties b/escheduler-common/src/main/resources/common/common.properties
index c501ad0d1a..1cb995ba0e 100644
--- a/escheduler-common/src/main/resources/common/common.properties
+++ b/escheduler-common/src/main/resources/common/common.properties
@@ -18,7 +18,6 @@ hdfs.startup.state=true
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
escheduler.env.path=/opt/.escheduler_env.sh
-escheduler.env.py=/opt/escheduler_env.py
#resource.view.suffixs
resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml
diff --git a/escheduler-dao/pom.xml b/escheduler-dao/pom.xml
index 87b5765410..cf0eb1dc1c 100644
--- a/escheduler-dao/pom.xml
+++ b/escheduler-dao/pom.xml
@@ -4,7 +4,7 @@
cn.analysys
escheduler
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
escheduler-dao
escheduler-dao
diff --git a/escheduler-rpc/pom.xml b/escheduler-rpc/pom.xml
index e869ca3a8b..5c2b998fdb 100644
--- a/escheduler-rpc/pom.xml
+++ b/escheduler-rpc/pom.xml
@@ -4,7 +4,7 @@
escheduler
cn.analysys
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
4.0.0
diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml
index cb93089689..c3b8b6ca15 100644
--- a/escheduler-server/pom.xml
+++ b/escheduler-server/pom.xml
@@ -3,7 +3,7 @@
escheduler
cn.analysys
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
escheduler-server
escheduler-server
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
index 6a28e1bbdf..64e5b57498 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
@@ -172,7 +172,7 @@ public class FetchTaskThread implements Runnable{
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger);
-
+ logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
index 6e4d015734..94caffe596 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
@@ -16,12 +16,13 @@
*/
package cn.escheduler.server.worker.task;
+import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -34,6 +35,8 @@ import java.util.function.Consumer;
*/
public class PythonCommandExecutor extends AbstractCommandExecutor {
+ private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);
+
public static final String PYTHON = "python";
@@ -63,27 +66,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
- logger.info("proxy user:{}, work dir:{}", tenantCode, taskDir);
+ logger.info("tenant :{}, work dir:{}", tenantCode, taskDir);
if (!Files.exists(Paths.get(commandFile))) {
logger.info("generate command file:{}", commandFile);
StringBuilder sb = new StringBuilder(200);
sb.append("#-*- encoding=utf8 -*-\n");
- sb.append("import os,sys\n");
- sb.append("BASEDIR = os.path.dirname(os.path.realpath(__file__))\n");
- sb.append("os.chdir(BASEDIR)\n");
-
- if (StringUtils.isNotEmpty(envFile)) {
- String[] envArray = envFile.split("\\.");
- if(envArray.length == 2){
- String path = envArray[0];
- logger.info("path:"+path);
- int index = path.lastIndexOf("/");
- sb.append(String.format("sys.path.append('%s')\n",path.substring(0,index)));
- sb.append(String.format("import %s\n",path.substring(index+1)));
- }
- }
sb.append("\n\n");
sb.append(String.format("import py_%s_node\n",taskAppId));
@@ -96,7 +85,14 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
@Override
protected String commandType() {
- return PYTHON;
+
+ String envPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + "conf"+
+ Constants.SINGLE_SLASH +"env" + Constants.SINGLE_SLASH + Constants.ESCHEDULER_ENV_SH;
+ String pythonHome = getPythonHome(envPath);
+ if (StringUtils.isEmpty(pythonHome)){
+ return PYTHON;
+ }
+ return pythonHome;
}
@Override
@@ -109,4 +105,45 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
return true;
}
+
+ /**
+ * get python home
+ * @param envPath
+ * @return
+ */
+ private static String getPythonHome(String envPath){
+ BufferedReader br = null;
+ String line = null;
+ StringBuilder sb = new StringBuilder();
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
+ while ((line = br.readLine()) != null){
+ if (line.contains(Constants.PYTHON_HOME)){
+ sb.append(line);
+ break;
+ }
+ }
+ String result = sb.toString();
+ if (org.apache.commons.lang.StringUtils.isEmpty(result)){
+ return null;
+ }
+ String[] arrs = result.split("=");
+ if (arrs.length == 2){
+ return arrs[1];
+ }
+
+ }catch (IOException e){
+ logger.error("read file failed : " + e.getMessage(),e);
+ }finally {
+ try {
+ if (br != null){
+ br.close();
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(),e);
+ }
+ }
+ return null;
+ }
+
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
index 2b7ae29b96..49d754404a 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
@@ -72,7 +72,7 @@ public class PythonTask extends AbstractTask {
this.pythonProcessTask = new PythonCommandExecutor(this::logHandle,
taskProps.getTaskDir(), taskProps.getTaskAppId(),
- taskProps.getTenantCode(), CommonUtils.getPythonSystemEnvPath(), taskProps.getTaskStartTime(),
+ taskProps.getTenantCode(), null, taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
diff --git a/escheduler-server/src/test/java/cn/escheduler/server/worker/EnvFileTest.java b/escheduler-server/src/test/java/cn/escheduler/server/worker/EnvFileTest.java
new file mode 100644
index 0000000000..06e2b22678
--- /dev/null
+++ b/escheduler-server/src/test/java/cn/escheduler/server/worker/EnvFileTest.java
@@ -0,0 +1,65 @@
+package cn.escheduler.server.worker;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Created by qiaozhanwei on 2019/4/15.
+ */
+public class EnvFileTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(EnvFileTest.class);
+
+ public static void main(String[] args) {
+ String path = System.getProperty("user.dir")+"\\script\\env\\.escheduler_env.sh";
+ String pythonHome = getPythonHome(path);
+ logger.info(pythonHome);
+
+ }
+
+ /**
+ * get python home
+ * @param path
+ * @return
+ */
+ private static String getPythonHome(String path){
+ BufferedReader br = null;
+ String line = null;
+ StringBuilder sb = new StringBuilder();
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
+ while ((line = br.readLine()) != null){
+ if (line.contains("PYTHON_HOME")){
+ sb.append(line);
+ break;
+ }
+ }
+ String result = sb.toString();
+ if (StringUtils.isEmpty(result)){
+ return null;
+ }
+ String[] arrs = result.split("=");
+ if (arrs.length == 2){
+ return arrs[1];
+ }
+
+ }catch (IOException e){
+ logger.error("read file failed : " + e.getMessage(),e);
+ }finally {
+ try {
+ if (br != null){
+ br.close();
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(),e);
+ }
+ }
+ return null;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 83b2032985..56921834d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
cn.analysys
escheduler
- 1.0.0-SNAPSHOT
+ 1.0.1-SNAPSHOT
pom
escheduler
http://maven.apache.org
diff --git a/script/env/escheduler_env.py b/script/env/escheduler_env.py
deleted file mode 100644
index e1d0afef4a..0000000000
--- a/script/env/escheduler_env.py
+++ /dev/null
@@ -1,12 +0,0 @@
-import os
-
-HADOOP_HOME="/opt/soft/hadoop"
-SPARK_HOME1="/opt/soft/spark1"
-SPARK_HOME2="/opt/soft/spark2"
-PYTHON_HOME="/opt/soft/python"
-JAVA_HOME="/opt/soft/java"
-HIVE_HOME="/opt/soft/hive"
-PATH=os.environ['PATH']
-PATH="%s/bin:%s/bin:%s/bin:%s/bin:%s/bin:%s/bin:%s"%(HIVE_HOME,HADOOP_HOME,SPARK_HOME1,SPARK_HOME2,JAVA_HOME,PYTHON_HOME,PATH)
-
-os.putenv('PATH','%s'%PATH)
\ No newline at end of file