From b280c305ce00c424576c148e910672765878ffa6 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Wed, 13 Nov 2019 11:18:51 +0800 Subject: [PATCH] Add method and parameters comments (#1220) * rename from DatasourceUserMapper to DataSourceUserMapper * add unit test in UserMapper and WorkerGroupMapper * change cn.escheduler to org.apache.dolphinscheduler * add unit test in UdfFuncMapperTest * add unit test in UdfFuncMapperTest * remove DatabaseConfiguration * add ConnectionFactoryTest * cal duration in processInstancesList * change desc to description * change table name in mysql ddl * change table name in mysql ddl * change escheduler to dolphinscheduler * change escheduler to dolphinscheduler * change escheduler to dolphinscheduler * remove log4j-1.2-api and modify AlertMapperTest * remove log4j-1.2-api * Add alertDao to spring management * Add alertDao to spring management * get SqlSessionFactory from MybatisSqlSessionFactoryBean * get processDao by DaoFactory * read druid properties in ConneciontFactory * read druid properties in ConneciontFactory * change get alertDao by spring to DaoFactory * add log4j to resolve #967 * resole verify udf name error and delete udf error * Determine if principal is empty * Determine whether the logon user has the right to delete the project * Fixed an issue that produced attatch file named such as ATT00002.bin * fix too many connection in upgrade or create * fix NEED_FAULT_TOLERANCE and WAITTING_THREAD count fail * Added a judgment on whether the currently login user is an administrator * fix update udf database not change and create time is changed * add enterprise.wechat.enable to decide whether to send enterprise WeChat * change method check * Remove the administrator's judgment on query access token list * only admin can create worker group * delete alert group need delete the relation of user and alert group * add timeout in proxy when upload large resource * add gets scheduled times by expect fire times * add gets scheduled times by expect fire times * Increase the judgment of whether it is admin * Increase the judgment of whether it is admin * when delete access token add whether login user has perm to delete * change mysql-connector-java scope to test * update scm test * add profile test * Add method and parameters comments * roll back --- .../server/master/AbstractServer.java | 13 +- .../server/master/MasterServer.java | 19 +- .../server/master/log/MasterLogFilter.java | 9 +- .../runner/MasterBaseTaskExecThread.java | 38 +++- .../master/runner/MasterExecThread.java | 201 +++++++++++------- .../master/runner/MasterSchedulerThread.java | 27 ++- .../master/runner/MasterTaskExecThread.java | 31 ++- .../runner/SubProcessTaskExecThread.java | 19 +- .../quartz/DruidConnectionProvider.java | 13 ++ .../server/quartz/ProcessScheduleJob.java | 26 +-- .../server/quartz/QuartzExecutors.java | 74 +++---- .../server/rpc/LogClient.java | 53 +++-- .../server/rpc/LoggerServer.java | 41 +++- .../server/utils/AlertManager.java | 45 ++-- .../server/utils/FlinkArgsUtils.java | 11 +- .../server/utils/LoggerUtils.java | 26 ++- .../server/utils/ParamUtils.java | 20 +- .../server/utils/ProcessUtils.java | 101 +++++++-- .../server/utils/SparkArgsUtils.java | 7 +- .../server/utils/UDFUtils.java | 16 +- .../server/worker/WorkerServer.java | 33 ++- .../worker/log/TaskLogDiscriminator.java | 9 + .../server/worker/log/TaskLogFilter.java | 8 + .../server/worker/log/WorkerLogFilter.java | 8 + .../server/worker/runner/FetchTaskThread.java | 34 +-- .../worker/runner/TaskScheduleThread.java | 16 +- .../worker/task/AbstractCommandExecutor.java | 50 ++--- .../server/worker/task/AbstractTask.java | 31 ++- .../server/worker/task/AbstractYarnTask.java | 15 +- .../worker/task/PythonCommandExecutor.java | 40 +++- .../worker/task/ShellCommandExecutor.java | 33 ++- .../server/worker/task/TaskManager.java | 14 +- .../server/worker/task/TaskProps.java | 51 +++-- .../task/dependent/DependentExecute.java | 58 +++-- .../worker/task/dependent/DependentTask.java | 24 ++- .../server/worker/task/flink/FlinkTask.java | 6 +- .../server/worker/task/http/HttpTask.java | 65 +++++- .../server/worker/task/mr/MapReduceTask.java | 18 +- .../task/processdure/ProcedureTask.java | 15 +- .../server/worker/task/python/PythonTask.java | 12 +- .../server/worker/task/shell/ShellTask.java | 20 +- .../server/worker/task/spark/SparkTask.java | 8 +- .../server/worker/task/sql/SqlTask.java | 38 ++-- .../server/zk/ZKMasterClient.java | 95 +++++++-- .../server/zk/ZKWorkerClient.java | 26 ++- 45 files changed, 1087 insertions(+), 430 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java index b5875e191e..41cd993846 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/AbstractServer.java @@ -29,10 +29,13 @@ import org.springframework.context.annotation.ComponentScan; @ComponentScan("org.apache.dolphinscheduler") public abstract class AbstractServer implements CommandLineRunner, IStoppable { + /** + * logger of AbstractServer + */ private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class); /** - * conf + * abstract server onfiguration */ protected static Configuration conf; @@ -46,17 +49,14 @@ public abstract class AbstractServer implements CommandLineRunner, IStoppable { */ protected boolean terminated = false; - /** * heartbeat interval, unit second */ protected int heartBeatInterval; - - /** - * blocking implement - * @throws InterruptedException + * blocking implement + * @throws InterruptedException reasonInter */ public void awaitTermination() throws InterruptedException { synchronized (lock) { @@ -66,7 +66,6 @@ public abstract class AbstractServer implements CommandLineRunner, IStoppable { } } - /** * Callback used to run the bean. * @param args incoming main method arguments diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 3b76d51ca6..95c2054d95 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -41,11 +41,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** - * master server + * master server */ @ComponentScan("org.apache.dolphinscheduler") public class MasterServer extends AbstractServer { + /** + * logger of MasterServer + */ private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); /** @@ -69,8 +72,15 @@ public class MasterServer extends AbstractServer { */ private ExecutorService masterSchedulerService; + /** + * default constructor + */ public MasterServer(){} + /** + * constructor of MasterServers + * @param processDao process dao + */ public MasterServer(ProcessDao processDao){ try { conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); @@ -82,11 +92,11 @@ public class MasterServer extends AbstractServer { this.masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); } - /** * master server startup * * master server not use web service + * @param args arguments */ public static void main(String[] args) { SpringApplication app = new SpringApplication(MasterServer.class); @@ -107,7 +117,10 @@ public class MasterServer extends AbstractServer { masterServer.awaitTermination(); } - + /** + * run master server + * @param processDao process dao + */ public void run(ProcessDao processDao){ // heartbeat interval diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java index 6f2ccf291d..1fa047c316 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java @@ -25,9 +25,16 @@ import ch.qos.logback.core.spi.FilterReply; * master log filter */ public class MasterLogFilter extends Filter { - + /** + * log level + */ Level level; + /** + * Accept or reject based on thread name + * @param event event + * @return FilterReply + */ @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith("Master-")){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 9e856b39cd..06ce711ee8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -37,15 +37,18 @@ import java.util.concurrent.Callable; */ public class MasterBaseTaskExecThread implements Callable { + /** + * logger of MasterBaseTaskExecThread + */ private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class); /** - * process dao + * process dao */ protected ProcessDao processDao; /** - * alert database access + * alert database access */ protected AlertDao alertDao; @@ -60,9 +63,13 @@ public class MasterBaseTaskExecThread implements Callable { protected TaskInstance taskInstance; /** - * task queue + * task queue */ protected ITaskQueue taskQueue; + + /** + * whether need cancel + */ protected boolean cancel; /** @@ -79,6 +86,11 @@ public class MasterBaseTaskExecThread implements Callable { } } + /** + * constructor of MasterBaseTaskExecThread + * @param taskInstance task instance + * @param processInstance process instance + */ public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ this.processDao = BeanContext.getBean(ProcessDao.class); this.alertDao = BeanContext.getBean(AlertDao.class); @@ -88,14 +100,25 @@ public class MasterBaseTaskExecThread implements Callable { this.taskInstance = taskInstance; } + /** + * get task instance + * @return TaskInstance + */ public TaskInstance getTaskInstance(){ return this.taskInstance; } + /** + * kill master base task exec thread + */ public void kill(){ this.cancel = true; } + /** + * submit master base task exec thread + * @return TaskInstance + */ protected TaskInstance submit(){ Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES, Constants.defaultMasterCommitRetryTimes); @@ -120,10 +143,19 @@ public class MasterBaseTaskExecThread implements Callable { return null; } + /** + * submit wait complete + * @return true + */ protected Boolean submitWaitComplete(){ return true; } + /** + * call + * @return boolean + * @throws Exception exception + */ @Override public Boolean call() throws Exception { return submitWaitComplete(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 01e5a3cbfb..c112f848dc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -55,6 +55,9 @@ import static org.apache.dolphinscheduler.common.Constants.*; */ public class MasterExecThread implements Runnable { + /** + * logger of MasterExecThread + */ private static final Logger logger = LoggerFactory.getLogger(MasterExecThread.class); /** @@ -62,28 +65,64 @@ public class MasterExecThread implements Runnable { */ private ProcessInstance processInstance; - /** * runing TaskNode */ private final Map> activeTaskNode = new ConcurrentHashMap>(); + /** + * task exec service + */ private final ExecutorService taskExecService; /** * submit failure nodes */ private Boolean taskFailedSubmit = false; + + /** + * recover node id list + */ private List recoverNodeIdList = new ArrayList<>(); + + /** + * error task list + */ private Map errorTaskList = new ConcurrentHashMap<>(); + + /** + * complete task list + */ private Map completeTaskList = new ConcurrentHashMap<>(); + + /** + * ready to submit task list + */ private Map readyToSubmitTaskList = new ConcurrentHashMap<>(); + + /** + * depend failed task map + */ private Map dependFailedTask = new ConcurrentHashMap<>(); + + /** + * forbidden task map + */ private Map forbiddenTaskList = new ConcurrentHashMap<>(); + + /** + * recover tolerance fault task list + */ private List recoverToleranceFaultTaskList = new ArrayList<>(); + /** + * alert manager + */ private AlertManager alertManager = new AlertManager(); + /** + * the object of DAG + */ private DAG dag; /** @@ -96,6 +135,11 @@ public class MasterExecThread implements Runnable { */ private static Configuration conf; + /** + * constructor of MasterExecThread + * @param processInstance process instance + * @param processDao process dao + */ public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){ this.processDao = processDao; @@ -153,6 +197,10 @@ public class MasterExecThread implements Runnable { } } + /** + * execute process + * @throws Exception excpetion + */ private void executeProcess() throws Exception { prepareProcess(); runProcess(); @@ -160,8 +208,8 @@ public class MasterExecThread implements Runnable { } /** - * execute complement process - * @throws Exception + * execute complement process + * @throws Exception excpetion */ private void executeComplementProcess() throws Exception { @@ -237,7 +285,7 @@ public class MasterExecThread implements Runnable { /** * prepare process parameter - * @throws Exception + * @throws Exception excpetion */ private void prepareProcess() throws Exception { // init task queue @@ -265,7 +313,7 @@ public class MasterExecThread implements Runnable { /** * generate process dag - * @throws Exception + * @throws Exception excpetion */ private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); @@ -277,7 +325,6 @@ public class MasterExecThread implements Runnable { ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(), startNodeNameList, recoveryNameList, processInstance.getTaskDependType()); if(processDag == null){ - //TODO... logger.error("processDag is null"); return; } @@ -286,6 +333,9 @@ public class MasterExecThread implements Runnable { } + /** + * init task queue + */ private void initTaskQueue(){ taskFailedSubmit = false; @@ -325,11 +375,10 @@ public class MasterExecThread implements Runnable { } } - - /** - * submit task to execute - * @param taskInstance + * submit task to execute + * @param taskInstance task instance + * @return TaskInstance */ private TaskInstance submitTaskExec(TaskInstance taskInstance) { MasterBaseTaskExecThread abstractExecThread = null; @@ -346,8 +395,8 @@ public class MasterExecThread implements Runnable { /** * find task instance in db. * in case submit more than one same name task in the same time. - * @param taskName - * @return + * @param taskName task name + * @return TaskInstance */ private TaskInstance findTaskIfExists(String taskName){ List taskInstanceList = processDao.findValidTaskListByProcessId(this.processInstance.getId()); @@ -361,9 +410,9 @@ public class MasterExecThread implements Runnable { /** * encapsulation task - * @param processInstance - * @param nodeName - * @return + * @param processInstance process instance + * @param nodeName node name + * @return TaskInstance */ private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, TaskNode taskNode, String parentNodeName) { @@ -415,14 +464,11 @@ public class MasterExecThread implements Runnable { return taskInstance; } - - /** - * get post task instance by node - * - * @param dag - * @param parentNodeName - * @return + * get post task instance by node + * @param dag dag + * @param parentNodeName parent node name + * @return task instance list */ private List getPostTaskInstanceByNode(DAG dag, String parentNodeName){ @@ -442,10 +488,8 @@ public class MasterExecThread implements Runnable { } /** - * - * return start task node list - * - * @return + * return start task node list + * @return task instance list */ private List getStartSubmitTaskList(){ @@ -476,8 +520,8 @@ public class MasterExecThread implements Runnable { } /** - * submit post node - * @param parentNodeName + * submit post node + * @param parentNodeName parent node name */ private void submitPostNode(String parentNodeName){ @@ -507,7 +551,7 @@ public class MasterExecThread implements Runnable { /** * determine whether the dependencies of the task node are complete - * @return + * @return DependResult */ private DependResult isTaskDepsComplete(String taskName) { @@ -545,8 +589,8 @@ public class MasterExecThread implements Runnable { /** * query task instance by complete state - * @param state - * @return + * @param state state + * @return task isntance list */ private List getCompleteTaskByState(ExecutionStatus state){ List resultList = new ArrayList<>(); @@ -560,8 +604,8 @@ public class MasterExecThread implements Runnable { /** * where there are ongoing tasks - * @param state - * @return + * @param state state + * @return ExecutionStatus */ private ExecutionStatus runningState(ExecutionStatus state){ if(state == ExecutionStatus.READY_STOP || @@ -575,9 +619,9 @@ public class MasterExecThread implements Runnable { } /** - * exists failure task , contains submit failure、dependency failure,execute failure(retry after) + * exists failure task,contains submit failure、dependency failure,execute failure(retry after) * - * @return + * @return Boolean whether has failed task */ private Boolean hasFailedTask(){ @@ -591,9 +635,9 @@ public class MasterExecThread implements Runnable { } /** - * process instance failure + * process instance failure * - * @return + * @return Boolean whether process instance failed */ private Boolean processFailed(){ if(hasFailedTask()) { @@ -608,11 +652,10 @@ public class MasterExecThread implements Runnable { } /** - * whether task for waiting thread - * @return + * whether task for waiting thread + * @return Boolean whether has waiting thread task */ private Boolean hasWaitingThreadTask(){ - List waitingList = getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD); return waitingList.size() > 0; } @@ -622,7 +665,7 @@ public class MasterExecThread implements Runnable { * 1,failed retry task in the preparation queue , returns to failure directly * 2,exists pause task,complement not completed, pending submission of tasks, return to suspension * 3,success - * @return + * @return ExecutionStatus */ private ExecutionStatus processReadyPause(){ if(hasRetryTaskInStandBy()){ @@ -642,7 +685,7 @@ public class MasterExecThread implements Runnable { /** * generate the latest process instance status by the tasks state - * @return + * @return process instance execution status */ private ExecutionStatus getProcessInstanceState(){ ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId()); @@ -692,8 +735,8 @@ public class MasterExecThread implements Runnable { } /** - * whether complement end - * @return + * whether complement end + * @return Boolean whether is complement end */ private Boolean isComplementEnd() { if(!processInstance.isComplementData()){ @@ -711,7 +754,7 @@ public class MasterExecThread implements Runnable { } /** - * updateProcessInstance process instance state + * updateProcessInstance process instance state * after each batch of tasks is executed, the status of the process instance is updated */ private void updateProcessInstanceState() { @@ -732,9 +775,9 @@ public class MasterExecThread implements Runnable { } /** - * get task dependency result - * @param taskInstance - * @return + * get task dependency result + * @param taskInstance task instance + * @return DependResult */ private DependResult getDependResultForTask(TaskInstance taskInstance){ DependResult inner = isTaskDepsComplete(taskInstance.getName()); @@ -742,8 +785,8 @@ public class MasterExecThread implements Runnable { } /** - * add task to standy list - * @param taskInstance + * add task to standy list + * @param taskInstance task instance */ private void addTaskToStandByList(TaskInstance taskInstance){ logger.info("add task to stand by list: {}", taskInstance.getName()); @@ -751,8 +794,8 @@ public class MasterExecThread implements Runnable { } /** - * remove task from stand by list - * @param taskInstance + * remove task from stand by list + * @param taskInstance task instance */ private void removeTaskFromStandbyList(TaskInstance taskInstance){ logger.info("remove task from stand by list: {}", taskInstance.getName()); @@ -760,8 +803,8 @@ public class MasterExecThread implements Runnable { } /** - * has retry task in standby - * @return + * has retry task in standby + * @return Boolean whether has retry task in standby */ private Boolean hasRetryTaskInStandBy(){ for (Map.Entry entry: readyToSubmitTaskList.entrySet()) { @@ -861,9 +904,9 @@ public class MasterExecThread implements Runnable { } /** - * check process time out - * @param processInstance - * @return + * whether check process time out + * @param processInstance task instance + * @return true if time out of process instance > running time of process instance */ private boolean checkProcessTimeOut(ProcessInstance processInstance) { if(processInstance.getTimeout() == 0 ){ @@ -879,6 +922,10 @@ public class MasterExecThread implements Runnable { return false; } + /** + * whether can submit task to queue + * @return boolean + */ private boolean canSubmitTaskToQueue() { return OSUtils.checkResource(conf, true); } @@ -912,9 +959,9 @@ public class MasterExecThread implements Runnable { } /** - * whether the retry interval is timed out - * @param taskInstance - * @return + * whether the retry interval is timed out + * @param taskInstance task instance + * @return Boolean */ private Boolean retryTaskIntervalOverTime(TaskInstance taskInstance){ if(taskInstance.getState() != ExecutionStatus.FAILURE){ @@ -955,6 +1002,11 @@ public class MasterExecThread implements Runnable { } } + /** + * get recovery task instance + * @param taskId task id + * @return recovery task instance + */ private TaskInstance getRecoveryTaskInstance(String taskId){ if(!StringUtils.isNotEmpty(taskId)){ return null; @@ -974,11 +1026,11 @@ public class MasterExecThread implements Runnable { } /** - * get start task instance list - * @param cmdParam - * @return + * get start task instance list + * @param cmdParam command param + * @return task instance list */ - private List getStartTaskInstanceList( String cmdParam){ + private List getStartTaskInstanceList(String cmdParam){ List instanceList = new ArrayList<>(); Map paramMap = JSONUtils.toMap(cmdParam); @@ -996,9 +1048,9 @@ public class MasterExecThread implements Runnable { } /** - * parse "StartNodeNameList" from cmd param - * @param cmdParam - * @return + * parse "StartNodeNameList" from cmd param + * @param cmdParam command param + * @return start node name list */ private List parseStartNodeName(String cmdParam){ List startNodeNameList = new ArrayList<>(); @@ -1013,9 +1065,9 @@ public class MasterExecThread implements Runnable { } /** - * generate start node name list from parsing command param; - * if "StartNodeIdList" exists in command param, return StartNodeIdList - * @return + * generate start node name list from parsing command param; + * if "StartNodeIdList" exists in command param, return StartNodeIdList + * @return recovery node name list */ private List getRecoveryNodeNameList(){ List recoveryNodeNameList = new ArrayList<>(); @@ -1029,9 +1081,12 @@ public class MasterExecThread implements Runnable { /** * generate flow dag - * @param processDefinitionJson - * @return - * @throws Exception + * @param processDefinitionJson process definition json + * @param startNodeNameList start node name list + * @param recoveryNodeNameList recovery node name list + * @param depNodeType depend node type + * @return ProcessDag process dag + * @throws Exception exception */ public ProcessDag generateFlowDag(String processDefinitionJson, List startNodeNameList, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java index 8aa5ba96c6..dc1c2fb75f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java @@ -38,8 +38,14 @@ import java.util.concurrent.ThreadPoolExecutor; */ public class MasterSchedulerThread implements Runnable { + /** + * logger of MasterSchedulerThread + */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class); + /** + * master exec service + */ private final ExecutorService masterExecService; /** @@ -47,13 +53,28 @@ public class MasterSchedulerThread implements Runnable { */ private final ProcessDao processDao; + /** + * zookeeper master client + */ private final ZKMasterClient zkMasterClient ; + /** + * master exec thread num + */ private int masterExecThreadNum; + /** + * Configuration of MasterSchedulerThread + */ private final Configuration conf; - + /** + * constructor of MasterSchedulerThread + * @param zkClient zookeeper master client + * @param processDao process dao + * @param conf conf + * @param masterExecThreadNum master exec thread num + */ public MasterSchedulerThread(ZKMasterClient zkClient, ProcessDao processDao, Configuration conf, int masterExecThreadNum){ this.processDao = processDao; this.zkMasterClient = zkClient; @@ -62,7 +83,9 @@ public class MasterSchedulerThread implements Runnable { this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum); } - + /** + * run of MasterSchedulerThread + */ @Override public void run() { while (Stopper.isRunning()){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index a05e51e981..f617d5f74d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -34,28 +34,42 @@ import java.util.Date; import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_KILL; /** - * master task exec thread + * master task exec thread */ public class MasterTaskExecThread extends MasterBaseTaskExecThread { + /** + * logger of MasterTaskExecThread + */ private static final Logger logger = LoggerFactory.getLogger(MasterTaskExecThread.class); - + /** + * constructor of MasterTaskExecThread + * @param taskInstance task instance + * @param processInstance process instance + */ public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ super(taskInstance, processInstance); } /** - * get task instance - * @return + * get task instance + * @return TaskInstance */ @Override public TaskInstance getTaskInstance(){ return this.taskInstance; } + /** + * whether already Killed,default false + */ private Boolean alreadyKilled = false; + /** + * submit task instance and wait complete + * @return true is task quit is true + */ @Override public Boolean submitWaitComplete() { Boolean result = false; @@ -70,7 +84,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { return result; } - + /** + * wait task quit + * @return true if task quit success + */ public Boolean waitTaskQuit(){ // query new state taskInstance = processDao.findTaskInstanceById(taskInstance.getId()); @@ -147,7 +164,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** * get task timeout parameter - * @return + * @return TaskTimeoutParameter */ private TaskTimeoutParameter getTaskTimeoutParameter(){ String taskJson = taskInstance.getTaskJson(); @@ -159,7 +176,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** * get remain time(s) * - * @return + * @return remain time */ private long getRemaintime(long timeoutSeconds) { Date startTime = taskInstance.getStartTime(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java index 7f0737a463..2eb48cd8d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java @@ -31,12 +31,21 @@ import java.util.Date; */ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { - + /** + * logger of SubProcessTaskExecThread + */ private static final Logger logger = LoggerFactory.getLogger(SubProcessTaskExecThread.class); - + /** + * sub process instance + */ private ProcessInstance subProcessInstance; + /** + * sub process task exec thread + * @param taskInstance task instance + * @param processInstance process instance + */ public SubProcessTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ super(taskInstance, processInstance); } @@ -111,7 +120,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { } /** - * wait task quit + * wait task quit * @throws InterruptedException */ private void waitTaskQuit() throws InterruptedException { @@ -153,7 +162,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { } /** - * stop subflow + * stop sub process */ private void stopSubProcess() { if(subProcessInstance.getState() == ExecutionStatus.STOP || @@ -165,7 +174,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { } /** - * pause subflow + * pause sub process */ private void pauseSubProcess() { if(subProcessInstance.getState() == ExecutionStatus.PAUSE || diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java index c4f9740564..32932127f0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java @@ -98,16 +98,29 @@ public class DruidConnectionProvider implements ConnectionProvider { */ private DruidDataSource datasource; + /** + * get connection + * @return Connection + * @throws SQLException sql exception + */ @Override public Connection getConnection() throws SQLException { return datasource.getConnection(); } + /** + * shutdown data source + * @throws SQLException sql exception + */ @Override public void shutdown() throws SQLException { datasource.close(); } + /** + * data source initialize + * @throws SQLException sql exception + */ @Override public void initialize() throws SQLException{ if (this.URL == null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java index 828a54d7b8..9a19233cba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java @@ -33,49 +33,37 @@ import java.util.Date; /** * process schedule job - *

- * {@link Job} - *

*/ public class ProcessScheduleJob implements Job { + /** + * logger of ProcessScheduleJob + */ private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class); /** - * {@link ProcessDao} + * process dao */ private static ProcessDao processDao; /** * init + * @param processDao process dao */ public static void init(ProcessDao processDao) { ProcessScheduleJob.processDao = processDao; } /** - *

- * Called by the {@link Scheduler} when a {@link Trigger} - * fires that is associated with the Job. - *

- * - *

- * The implementation may wish to set a - * {@link JobExecutionContext#setResult(Object) result} object on the - * {@link JobExecutionContext} before this method exits. The result itself - * is meaningless to Quartz, but may be informative to - * {@link JobListener}s or - * {@link TriggerListener}s that are watching the job's - * execution. - *

+ * Called by the Scheduler when a Trigger fires that is associated with the Job * + * @param context JobExecutionContext * @throws JobExecutionException if there is an exception while executing the job. */ @Override public void execute(JobExecutionContext context) throws JobExecutionException { - //TODO... Assert.notNull(processDao, "please call init() method first"); JobDataMap dataMap = context.getJobDetail().getJobDataMap(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java index 8e22ead291..bb0f79897d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java @@ -40,29 +40,31 @@ import static org.quartz.TriggerBuilder.newTrigger; */ public class QuartzExecutors { + /** + * logger of QuartzExecutors + */ private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class); + /** + * read write lock + */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); - /** - *

- * A Scheduler maintains a registry of {@link org.quartz.JobDetail}s - * and {@link Trigger}s. Once registered, the Scheduler - * is responsible for executing Job s when their associated - * Trigger s fire (when their scheduled time arrives). - *

- * {@link Scheduler} + * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger. */ private static Scheduler scheduler; + /** + * instance of QuartzExecutors + */ private static volatile QuartzExecutors INSTANCE = null; private QuartzExecutors() {} /** * thread safe and performance promote - * @return + * @return instance of Quartz Executors */ public static QuartzExecutors getInstance() { if (INSTANCE == null) { @@ -82,9 +84,7 @@ public class QuartzExecutors { /** * init * - *

- * Returns a client-usable handle to a Scheduler. - *

+ * Returns a client-usable handle to a Scheduler. */ private void init() { try { @@ -101,14 +101,7 @@ public class QuartzExecutors { /** * Whether the scheduler has been started. * - *

- * Note: This only reflects whether {@link #start()} has ever - * been called on this Scheduler, so it will return true even - * if the Scheduler is currently in standby mode or has been - * since shutdown. - *

- * - * @see Scheduler#start() + * @throws SchedulerException scheduler exception */ public void start() throws SchedulerException { if (!scheduler.isStarted()){ @@ -120,14 +113,11 @@ public class QuartzExecutors { /** * stop all scheduled tasks * - * Halts the Scheduler's firing of {@link Trigger}s, - * and cleans up all resources associated with the Scheduler. Equivalent to - * shutdown(false). + * Halts the Scheduler's firing of Triggers, + * and cleans up all resources associated with the Scheduler. * - *

* The scheduler cannot be re-started. - *

- * + * @throws SchedulerException scheduler exception */ public void shutdown() throws SchedulerException { if (!scheduler.isShutdown()) { @@ -148,7 +138,6 @@ public class QuartzExecutors { * @param endDate job end date * @param cronExpression cron expression * @param jobDataMap job parameters data map - * @return */ public void addJob(Class clazz,String jobName,String jobGroupName,Date startDate, Date endDate, String cronExpression, @@ -180,10 +169,10 @@ public class QuartzExecutors { TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName); /** - * Instructs the {@link Scheduler} that upon a mis-fire - * situation, the {@link CronTrigger} wants to have it's + * Instructs the Scheduler that upon a mis-fire + * situation, the CronTrigger wants to have it's * next-fire-time updated to the next time in the schedule after the - * current time (taking into account any associated {@link Calendar}, + * current time (taking into account any associated Calendar), * but it does not want to be fired now. */ CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate) @@ -219,8 +208,8 @@ public class QuartzExecutors { /** * delete job * - * @param jobName - * @param jobGroupName + * @param jobName job name + * @param jobGroupName job group name * @return true if the Job was found and deleted. */ public boolean deleteJob(String jobName, String jobGroupName) { @@ -244,15 +233,8 @@ public class QuartzExecutors { /** * delete all jobs in job group - *

- * Note that while this bulk operation is likely more efficient than - * invoking deleteJob(JobKey jobKey) several - * times, it may have the adverse affect of holding data locks for a - * single long duration of time (rather than lots of small durations - * of time). - *

* - * @param jobGroupName + * @param jobGroupName job group name * * @return true if all of the Jobs were found and deleted, false if * one or more were not deleted. @@ -275,6 +257,8 @@ public class QuartzExecutors { /** * build job name + * @param processId process id + * @return job name */ public static String buildJobName(int processId) { StringBuilder sb = new StringBuilder(30); @@ -284,6 +268,8 @@ public class QuartzExecutors { /** * build job group name + * @param projectId project id + * @return job group name */ public static String buildJobGroupName(int projectId) { StringBuilder sb = new StringBuilder(30); @@ -294,10 +280,10 @@ public class QuartzExecutors { /** * add params to map * - * @param projectId - * @param scheduleId - * @param schedule - * @return + * @param projectId project id + * @param scheduleId schedule id + * @param schedule schedule + * @return data map */ public static Map buildDataMap(int projectId, int scheduleId, Schedule schedule) { Map dataMap = new HashMap<>(3); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java index 6bb1a14a7c..3f12ae75dd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java @@ -30,12 +30,27 @@ import java.util.concurrent.TimeUnit; */ public class LogClient { + /** + * logger of LogClient + */ private static final Logger logger = LoggerFactory.getLogger(LogClient.class); + /** + * managed channel + */ private final ManagedChannel channel; + + /** + * blocking stub + */ private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub; - /** Construct client connecting to HelloWorld server at {@code host:port}. */ + /** + * Construct client connecting to HelloWorld server at host:port. + * + * @param host host + * @param port port + */ public LogClient(String host, int port) { this(ManagedChannelBuilder.forAddress(host, port) // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid @@ -43,7 +58,11 @@ public class LogClient { .usePlaintext(true)); } - /** Construct client for accessing RouteGuide server using the existing channel. */ + /** + * Construct client for accessing RouteGuide server using the existing channel. + * + * @param channelBuilder channel builder + */ LogClient(ManagedChannelBuilder channelBuilder) { /** * set max message read size @@ -53,16 +72,22 @@ public class LogClient { blockingStub = LogViewServiceGrpc.newBlockingStub(channel); } + /** + * shut down channel + * + * @throws InterruptedException interrupted exception + */ public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } /** - * roll view log - * @param path - * @param skipLineNum - * @param limit - * @return + * roll view log + * + * @param path log path + * @param skipLineNum skip line num + * @param limit limit + * @return log content */ public String rollViewLog(String path,int skipLineNum,int limit) { logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit); @@ -83,9 +108,10 @@ public class LogClient { } /** - * view all log - * @param path - * @return + * view all log + * + * @param path log path + * @return log content */ public String viewLog(String path) { logger.info("view log path : {}",path); @@ -102,9 +128,10 @@ public class LogClient { } /** - * get log bytes - * @param path - * @return + * get log bytes + * + * @param path log path + * @return log content */ public byte[] getLogBytes(String path) { logger.info("get log bytes {}",path); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java index ace34a2eeb..5ec5df92fc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java @@ -33,17 +33,21 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** - * logger server + * logger server */ public class LoggerServer { private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class); /** - * server + * server */ private Server server; + /** + * server start + * @throws IOException io exception + */ public void start() throws IOException { /* The port on which the server should run */ int port = Constants.RPC_PORT; @@ -63,6 +67,9 @@ public class LoggerServer { }); } + /** + * stop + */ private void stop() { if (server != null) { server.shutdown(); @@ -81,13 +88,22 @@ public class LoggerServer { /** * main launches the server from the command line. */ + + /** + * main launches the server from the command line. + * @param args arguments + * @throws IOException io exception + * @throws InterruptedException interrupted exception + */ public static void main(String[] args) throws IOException, InterruptedException { final LoggerServer server = new LoggerServer(); server.start(); server.blockUntilShutdown(); } - + /** + * Log View Service Grpc Implementation + */ static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase { @Override public void rollViewLog(LogParameter request, StreamObserver responseObserver) { @@ -130,10 +146,11 @@ public class LoggerServer { } /** - * get files bytes - * @param path - * @return - * @throws Exception + * get files bytes + * + * @param path path + * @return byte array of file + * @throws Exception exception */ private static byte[] getFileBytes(String path){ InputStream in = null; @@ -169,7 +186,8 @@ public class LoggerServer { } /** - * read file content + * read file content + * * @param path * @param skipLine * @param limit @@ -186,9 +204,10 @@ public class LoggerServer { /** * read file content - * @param path - * @return - * @throws Exception + * + * @param path path + * @return string of file content + * @throws Exception exception */ private static String readFile(String path){ BufferedReader br = null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java index 41f0abbc3c..8ae96a3136 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java @@ -42,15 +42,22 @@ import java.util.List; */ public class AlertManager { + /** + * logger of AlertManager + */ private static final Logger logger = LoggerFactory.getLogger(AlertManager.class); + /** + * alert dao + */ private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); /** - * command type convert chinese - * @param commandType - * @return + * command type convert chinese + * + * @param commandType command type + * @return command name */ private String getCommandCnName(CommandType commandType) { switch (commandType) { @@ -80,7 +87,7 @@ public class AlertManager { } /** - * process instance format + * process instance format */ private static final String PROCESS_INSTANCE_FORMAT = "\"Id:%d\"," + @@ -94,9 +101,10 @@ public class AlertManager { "\"Host: %s\"" ; /** - * get process instance content - * @param processInstance - * @return + * get process instance content + * @param processInstance process instance + * @param taskInstances task instance list + * @return process instance format content */ public String getContentProcessInstance(ProcessInstance processInstance, List taskInstances){ @@ -142,10 +150,11 @@ public class AlertManager { } /** - * getting worker fault tolerant content - * @param processInstance - * @param toleranceTaskList - * @return + * getting worker fault tolerant content + * + * @param processInstance process instance + * @param toleranceTaskList tolerance task list + * @return worker tolerance content */ private String getWorkerToleranceContent(ProcessInstance processInstance, List toleranceTaskList){ @@ -164,8 +173,9 @@ public class AlertManager { /** * send worker alert fault tolerance - * @param processInstance - * @param toleranceTaskList + * + * @param processInstance process instance + * @param toleranceTaskList tolerance task list */ public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List toleranceTaskList){ try{ @@ -190,7 +200,8 @@ public class AlertManager { /** * send process instance alert - * @param processInstance + * @param processInstance process instance + * @param taskInstances task instance list */ public void sendAlertProcessInstance(ProcessInstance processInstance, List taskInstances){ @@ -238,6 +249,12 @@ public class AlertManager { logger.info("add alert to db , alert: {}", alert.toString()); } + /** + * send process timeout alert + * + * @param processInstance process instance + * @param processDefinition process definition + */ public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) { alertDao.sendProcessTimeoutAlert(processInstance, processDefinition); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 9277496763..b6fcb2b40d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -33,13 +33,15 @@ import java.util.List; public class FlinkArgsUtils { /** - * build args - * - * @param param - * @return + * logger of FlinkArgsUtils */ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(FlinkArgsUtils.class); + /** + * build args + * @param param flink parameters + * @return argument list + */ public static List buildArgs(FlinkParameters param) { List args = new ArrayList<>(); String deployMode = "cluster"; @@ -116,7 +118,6 @@ public class FlinkArgsUtils { } - return args; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java index a4985f2c9b..8b40d943c0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java @@ -39,15 +39,19 @@ public class LoggerUtils { */ public static final String TASK_LOGGER_INFO_PREFIX = "TASK"; + /** + * Task Logger Thread's name + */ public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; /** - * build job id - * @param affix - * @param processDefId - * @param processInstId - * @param taskId - * @return + * build job id + * + * @param affix Task Logger's prefix + * @param processDefId process define id + * @param processInstId process instance id + * @param taskId task id + * @return task id format */ public static String buildTaskId(String affix, int processDefId, @@ -62,11 +66,11 @@ public class LoggerUtils { /** - * processing log - * get yarn application id list - * @param log - * @param logger - * @return + * processing log + * get yarn application id list + * @param log log content + * @param logger logger + * @return app id list */ public static List getAppIds(String log, Logger logger) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java index a326977729..3965b0e8f4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java @@ -27,16 +27,18 @@ import java.util.Iterator; import java.util.Map; /** - * param utils + * param utils */ public class ParamUtils { /** - * parameter conversion - * - * @param globalParams - * @param localParams - * @return + * parameter conversion + * @param globalParams global params + * @param globalParamsMap global params map + * @param localParams local params + * @param commandType command type + * @param scheduleTime schedule time + * @return global params */ public static Map convert(Map globalParams, Map globalParamsMap, @@ -86,9 +88,9 @@ public class ParamUtils { } /** - * format convert - * @param paramsMap - * @return + * format convert + * @param paramsMap params map + * @return Map of converted */ public static Map convert(Map paramsMap){ Map map = new HashMap<>(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index bcfb1f2dad..a757d62572 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -45,8 +45,10 @@ public class ProcessUtils { private final static Logger logger = LoggerFactory.getLogger(ProcessUtils.class); /** - * build command line characters - * @return + * build command line characters + * @param commandList command list + * @return command + * @throws IOException io exception */ public static String buildCommandStr(List commandList) throws IOException { String cmdstr; @@ -98,6 +100,13 @@ public class ProcessUtils { return cmdstr; } + /** + * get executable path + * + * @param path path + * @return executable path + * @throws IOException io exception + */ private static String getExecutablePath(String path) throws IOException { boolean pathIsQuoted = isQuoted(true, path, "Executable name has embedded quote, split the arguments"); @@ -105,17 +114,34 @@ public class ProcessUtils { return fileToRun.getPath(); } + /** + * whether is shell file + * + * @param executablePath executable path + * @return true if endsWith .CMD or .BAT + */ private static boolean isShellFile(String executablePath) { String upPath = executablePath.toUpperCase(); return (upPath.endsWith(".CMD") || upPath.endsWith(".BAT")); } + /** + * quote string + * + * @param arg argument + * @return format arg + */ private static String quoteString(String arg) { StringBuilder argbuf = new StringBuilder(arg.length() + 2); return argbuf.append('"').append(arg).append('"').toString(); } - + /** + * get tokens from command + * + * @param command command + * @return token string array + */ private static String[] getTokensFromCommand(String command) { ArrayList matchList = new ArrayList<>(8); Matcher regexMatcher = LazyPattern.PATTERN.matcher(command); @@ -125,24 +151,49 @@ public class ProcessUtils { return matchList.toArray(new String[matchList.size()]); } + /** + * Lazy Pattern + */ private static class LazyPattern { // Escape-support version: // "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)"; private static final Pattern PATTERN = Pattern.compile("[^\\s\"]+|\"[^\"]*\""); } - private static final int VERIFICATION_CMD_BAT = 0; + /** + * verification cmd bat + */ + private static final int VERIFICATION_CMD_BAT = 0; + /** + * verification win32 + */ private static final int VERIFICATION_WIN32 = 1; + /** + * verification legacy + */ private static final int VERIFICATION_LEGACY = 2; + /** + * escape verification + */ private static final char[][] ESCAPE_VERIFICATION = {{' ', '\t', '<', '>', '&', '|', '^'}, {' ', '\t', '<', '>'}, {' ', '\t'}}; + /** + * matcher + */ private static Matcher matcher; + /** + * create command line + * @param verificationType verification type + * @param executablePath executable path + * @param cmd cmd + * @return command line + */ private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) { StringBuilder cmdbuf = new StringBuilder(80); @@ -165,6 +216,13 @@ public class ProcessUtils { return cmdbuf.toString(); } + /** + * whether is quoted + * @param noQuotesInside + * @param arg + * @param errorMessage + * @return boolean + */ private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) { int lastPos = arg.length() - 1; if (lastPos >= 1 && arg.charAt(0) == '"' && arg.charAt(lastPos) == '"') { @@ -186,6 +244,13 @@ public class ProcessUtils { return false; } + /** + * whether needs escaping + * + * @param verificationType verification type + * @param arg arg + * @return boolean + */ private static boolean needsEscaping(int verificationType, String arg) { boolean argIsQuoted = isQuoted((verificationType == VERIFICATION_CMD_BAT), arg, "Argument has embedded quote, use the explicit CMD.EXE call."); @@ -201,13 +266,14 @@ public class ProcessUtils { return false; } - /** - * kill yarn application - * @param appIds - * @param logger - * @param tenantCode - * @throws IOException + * kill yarn application + * + * @param appIds app id list + * @param logger logger + * @param tenantCode tenant code + * @param workDir work dir + * @throws IOException io exception */ public static void cancelApplication(List appIds, Logger logger, String tenantCode,String workDir) throws IOException { @@ -248,8 +314,9 @@ public class ProcessUtils { } /** - * kill tasks according to different task types - * @param taskInstance + * kill tasks according to different task types + * + * @param taskInstance task instance */ public static void kill(TaskInstance taskInstance) { try { @@ -276,9 +343,10 @@ public class ProcessUtils { /** * get pids str - * @param processId - * @return - * @throws Exception + * + * @param processId process id + * @return pids + * @throws Exception exception */ private static String getPidsStr(int processId)throws Exception{ StringBuilder sb = new StringBuilder(); @@ -293,7 +361,8 @@ public class ProcessUtils { /** * find logs and kill yarn tasks - * @param taskInstance + * + * @param taskInstance task instance */ public static void killYarnJob(TaskInstance taskInstance) { try { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java index 119ec49bfc..ade087424a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java @@ -32,9 +32,10 @@ import java.util.List; public class SparkArgsUtils { /** - * build args - * @param param - * @return + * build args + * + * @param param param + * @return argument list */ public static List buildArgs(SparkParameters param) { List args = new ArrayList<>(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 99f07da25f..937e35454d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -40,9 +40,12 @@ public class UDFUtils { */ private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''"; - /** * create function list + * @param udfFuncs udf functions + * @param tenantCode tenant code + * @param logger logger + * @return create function list */ public static List createFuncs(List udfFuncs, String tenantCode,Logger logger){ // get hive udf jar path @@ -67,7 +70,10 @@ public class UDFUtils { } /** - * build jar sql + * build jar sql + * @param sqls sql list + * @param resources resource set + * @param uploadPath upload path */ private static void buildJarSql(List sqls, Set resources, String uploadPath) { String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); @@ -81,7 +87,9 @@ public class UDFUtils { } /** - * build temp function sql + * build temp function sql + * @param sqls sql list + * @param udfFuncs udf function list */ private static void buildTempFuncSql(List sqls, List udfFuncs) { if (isNotEmpty(udfFuncs)) { @@ -94,6 +102,8 @@ public class UDFUtils { /** * get the resource names of all functions + * @param udfFuncs udf function list + * @return */ private static Set getFuncResouces(List udfFuncs) { Set resources = new HashSet<>(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 733a9add6e..596d6712ab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -56,6 +56,9 @@ import java.util.concurrent.TimeUnit; @ComponentScan("org.apache.dolphinscheduler") public class WorkerServer extends AbstractServer { + /** + * logger + */ private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); @@ -115,11 +118,11 @@ public class WorkerServer extends AbstractServer { this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); } - /** * master server startup * * master server not use web service + * @param args arguments */ public static void main(String[] args) { SpringApplication app = new SpringApplication(WorkerServer.class); @@ -143,7 +146,10 @@ public class WorkerServer extends AbstractServer { workerServer.awaitTermination(); } - + /** + * worker server run + * @param processDao process dao + */ public void run(ProcessDao processDao){ // heartbeat interval @@ -265,6 +271,7 @@ public class WorkerServer extends AbstractServer { /** * heartbeat thread implement + * * @return */ private Runnable heartBeatThread(){ @@ -284,8 +291,9 @@ public class WorkerServer extends AbstractServer { /** - * kill process thread implement - * @return + * kill process thread implement + * + * @return kill process thread */ private Runnable getKillProcessThread(ProcessDao processDao){ Runnable killProcessThread = new Runnable() { @@ -313,6 +321,12 @@ public class WorkerServer extends AbstractServer { return killProcessThread; } + /** + * kill task + * + * @param taskInfo task info + * @param pd process dao + */ private void killTask(String taskInfo, ProcessDao pd) { logger.info("get one kill command from tasks kill queue: " + taskInfo); String[] taskInfoArray = taskInfo.split("-"); @@ -345,6 +359,12 @@ public class WorkerServer extends AbstractServer { } } + /** + * delete task from queue + * + * @param taskInstance + * @param pd process dao + */ private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessDao pd){ // creating distributed locks, lock path /dolphinscheduler/lock/worker InterProcessMutex mutex = null; @@ -365,6 +385,11 @@ public class WorkerServer extends AbstractServer { } } + /** + * remove Kill info from queue + * + * @param taskInfo task info + */ private void removeKillInfoFromQueue(String taskInfo){ taskQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_KILL,taskInfo); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java index a7941a8824..fa00aed772 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java @@ -21,10 +21,19 @@ import ch.qos.logback.core.sift.AbstractDiscriminator; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.server.utils.LoggerUtils; +/** + * Task Log Discriminator + */ public class TaskLogDiscriminator extends AbstractDiscriminator { + /** + * key + */ private String key; + /** + * log base + */ private String logBase; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java index abeeb34697..6398135481 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java @@ -27,12 +27,20 @@ import org.apache.dolphinscheduler.server.utils.LoggerUtils; */ public class TaskLogFilter extends Filter { + /** + * level + */ private Level level; public void setLevel(String level) { this.level = Level.toLevel(level); } + /** + * Accept or reject based on thread name + * @param event event + * @return FilterReply + */ @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java index e666b3ac2f..23758f918a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java @@ -25,8 +25,16 @@ import ch.qos.logback.core.spi.FilterReply; * worker log filter */ public class WorkerLogFilter extends Filter { + /** + * level + */ Level level; + /** + * Accept or reject based on thread name + * @param event event + * @return FilterReply + */ @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith("Worker-")){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index d09df07df0..7caceee682 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -242,13 +242,19 @@ public class FetchTaskThread implements Runnable{ } } + /** + * remove node from task queue + * + * @param taskQueueStr task queue + */ private void removeNodeFromTaskQueue(String taskQueueStr){ taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr); } /** * verify task instance is null - * @return + * @param taskInstance + * @return true if task instance is null */ private boolean verifyTaskInstanceIsNull(TaskInstance taskInstance) { if (taskInstance == null ) { @@ -259,9 +265,10 @@ public class FetchTaskThread implements Runnable{ } /** - * verify tenant is null - * @param tenant - * @return + * verify tenant is null + * + * @param tenant tenant + * @return true if tenant is null */ private boolean verifyTenantIsNull(Tenant tenant) { if(tenant == null){ @@ -276,7 +283,8 @@ public class FetchTaskThread implements Runnable{ /** * get execute local path - * @return + * + * @return execute local path */ private String getExecLocalPath(){ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), @@ -286,9 +294,10 @@ public class FetchTaskThread implements Runnable{ } /** - * check - * @param poolExecutor - * @return + * check thread count + * + * @param poolExecutor pool executor + * @return true if active count < worker exec nums */ private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) { int activeCount = poolExecutor.getActiveCount(); @@ -304,8 +313,9 @@ public class FetchTaskThread implements Runnable{ } /** - * wait for task instance exists, because of db action would be delayed. - * @throws Exception + * wait for task instance exists, because of db action would be delayed. + * + * @throws Exception exception */ private void waitForTaskInstance()throws Exception{ int retryTimes = 30; @@ -319,8 +329,8 @@ public class FetchTaskThread implements Runnable{ /** * get task instance id * - * @param taskQueueStr - * @return + * @param taskQueueStr task queue + * @return task instance id */ private int getTaskInstanceId(String taskQueueStr){ return Integer.parseInt(taskQueueStr.split(Constants.UNDERLINE)[3]); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 3fdd60ee64..bb7a773d48 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.sift.SiftingAppender; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -26,20 +27,17 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.ProcessDao; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.server.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import com.alibaba.fastjson.JSONObject; -import org.apache.dolphinscheduler.common.utils.CommonUtils; -import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +72,12 @@ public class TaskScheduleThread implements Runnable { */ private AbstractTask task; + /** + * constructor + * + * @param taskInstance task instance + * @param processDao process dao + */ public TaskScheduleThread(TaskInstance taskInstance, ProcessDao processDao){ this.processDao = processDao; this.taskInstance = taskInstance; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 5cece81f1b..befe539256 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -123,9 +123,9 @@ public abstract class AbstractCommandExecutor { /** * task specific execution logic * - * @param execCommand - * @param processDao - * @return + * @param execCommand exec command + * @param processDao process dao + * @return exit status code */ public int run(String execCommand, ProcessDao processDao) { int exitStatusCode; @@ -196,8 +196,8 @@ public abstract class AbstractCommandExecutor { /** * build process * - * @param commandFile - * @throws IOException + * @param commandFile command file + * @throws IOException IO Exception */ private void buildProcess(String commandFile) throws IOException { //init process builder @@ -245,9 +245,9 @@ public abstract class AbstractCommandExecutor { return exitStatusCode; } - /** - * cancel python task + * cancel application + * @throws Exception exception */ public void cancelApplication() throws Exception { if (process == null) { @@ -276,10 +276,10 @@ public abstract class AbstractCommandExecutor { } /** - * soft kill - * @param processId - * @return - * @throws InterruptedException + * soft kill + * @param processId process id + * @return process is alive + * @throws InterruptedException interrupted exception */ private boolean softKill(int processId) { @@ -383,8 +383,8 @@ public abstract class AbstractCommandExecutor { /** * check yarn state * - * @param appIds - * @return + * @param appIds application id list + * @return is success of yarn task state */ public boolean isSuccessOfYarnState(List appIds) { @@ -415,8 +415,8 @@ public abstract class AbstractCommandExecutor { /** * get app links - * @param fileName - * @return + * @param fileName file name + * @return app id list */ private List getAppLinks(String fileName) { List logs = convertFile2List(fileName); @@ -437,9 +437,9 @@ public abstract class AbstractCommandExecutor { } /** - * convert file to list - * @param filename - * @return + * convert file to list + * @param filename file name + * @return line list */ private List convertFile2List(String filename) { List lineList = new ArrayList(100); @@ -472,8 +472,8 @@ public abstract class AbstractCommandExecutor { } /** - * find app id - * + * find app id + * @param line line * @return appid */ private String findAppId(String line) { @@ -490,7 +490,7 @@ public abstract class AbstractCommandExecutor { /** * get remain time(s) * - * @return + * @return remain time */ private long getRemaintime() { long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; @@ -506,8 +506,8 @@ public abstract class AbstractCommandExecutor { /** * get process id * - * @param process - * @return + * @param process process + * @return process id */ private int getProcessId(Process process) { int processId = 0; @@ -528,7 +528,7 @@ public abstract class AbstractCommandExecutor { * when log buffer siz or flush time reach condition , then flush * * @param lastFlushTime last flush time - * @return + * @return last flush time */ private long flush(long lastFlushTime) { long now = System.currentTimeMillis(); @@ -549,7 +549,7 @@ public abstract class AbstractCommandExecutor { /** * close buffer reader * - * @param inReader + * @param inReader in reader */ private void close(BufferedReader inReader) { if (inReader != null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 73f80cc676..999863f76e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -39,7 +39,7 @@ import java.util.List; import java.util.Map; /** - * executive task + * executive task */ public abstract class AbstractTask { @@ -55,7 +55,7 @@ public abstract class AbstractTask { /** - * cancel + * cancel */ protected volatile boolean cancel = false; @@ -65,8 +65,9 @@ public abstract class AbstractTask { protected volatile int exitStatusCode = -1; /** - * @param taskProps - * @param logger + * constructor + * @param taskProps task props + * @param logger logger */ protected AbstractTask(TaskProps taskProps, Logger logger) { this.taskProps = taskProps; @@ -75,32 +76,39 @@ public abstract class AbstractTask { /** * init task + * @throws Exception exception */ public void init() throws Exception { } /** * task handle + * @throws Exception exception */ public abstract void handle() throws Exception; - + /** + * cancel application + * @param status status + * @throws Exception exception + */ public void cancelApplication(boolean status) throws Exception { this.cancel = status; } /** - * log process + * log handle + * @param logs log list */ public void logHandle(List logs) { // note that the "new line" is added here to facilitate log parsing logger.info(" -> {}", String.join("\n\t", logs)); } - /** - * exit code + * get exit status code + * @return exit status code */ public int getExitStatusCode() { return exitStatusCode; @@ -112,6 +120,7 @@ public abstract class AbstractTask { /** * get task parameters + * @return AbstractParameters */ public abstract AbstractParameters getParameters(); @@ -157,7 +166,7 @@ public abstract class AbstractTask { /** * get current task parameter class - * @return + * @return Task Params Class */ private Class getCurTaskParamsClass(){ Class paramsClass = null; @@ -193,8 +202,8 @@ public abstract class AbstractTask { } /** - * get exit status according to exitCode - * @return + * get exit status according to exitCode + * @return exit status */ public ExecutionStatus getExitStatus(){ ExecutionStatus status; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 7af1af5318..ff0443c134 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -44,9 +44,9 @@ public abstract class AbstractYarnTask extends AbstractTask { protected ProcessDao processDao; /** - * @param taskProps - * @param logger - * @throws IOException + * Abstract Yarn Task + * @param taskProps task rops + * @param logger logger */ public AbstractYarnTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); @@ -73,6 +73,11 @@ public abstract class AbstractYarnTask extends AbstractTask { } } + /** + * cancel application + * @param status status + * @throws Exception exception + */ @Override public void cancelApplication(boolean status) throws Exception { cancel = true; @@ -85,7 +90,9 @@ public abstract class AbstractYarnTask extends AbstractTask { } /** - * create command + * create command + * @return String + * @throws Exception exception */ protected abstract String buildCommand() throws Exception; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index 4909f51b6c..17413999e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -35,12 +35,29 @@ import java.util.function.Consumer; */ public class PythonCommandExecutor extends AbstractCommandExecutor { + /** + * logger + */ private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class); + /** + * python + */ public static final String PYTHON = "python"; - + /** + * constructor + * @param logHandler log handler + * @param taskDir task dir + * @param taskAppId task app id + * @param taskInstId task instance id + * @param tenantCode tenant code + * @param envFile env file + * @param startTime start time + * @param timeout timeout + * @param logger logger + */ public PythonCommandExecutor(Consumer> logHandler, String taskDir, String taskAppId, @@ -57,7 +74,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { /** * build command file path * - * @return + * @return command file path */ @Override protected String buildCommandFilePath() { @@ -66,9 +83,9 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { /** * create command file if not exists - * - * @param commandFile - * @throws IOException + * @param execCommand exec command + * @param commandFile command file + * @throws IOException io exception */ @Override protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { @@ -91,6 +108,10 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { } } + /** + * get python home + * @return python home + */ @Override protected String commandType() { String pythonHome = getPythonHome(envFile); @@ -100,6 +121,11 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { return pythonHome; } + /** + * check find yarn application id + * @param line line + * @return boolean + */ @Override protected boolean checkFindApp(String line) { return true; @@ -117,8 +143,8 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties * dolphinscheduler.env.path file. * - * @param envPath - * @return + * @param envPath env path + * @return python home */ private static String getPythonHome(String envPath){ BufferedReader br = null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 0b7493b59e..d1a7fa2258 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -29,13 +29,27 @@ import java.util.List; import java.util.function.Consumer; /** - * shell command executor + * shell command executor */ public class ShellCommandExecutor extends AbstractCommandExecutor { + /** + * sh + */ public static final String SH = "sh"; - + /** + * constructor + * @param logHandler log handler + * @param taskDir task dir + * @param taskAppId task app id + * @param taskInstId task instance id + * @param tenantCode tenant code + * @param envFile env file + * @param startTime start time + * @param timeout timeout + * @param logger logger + */ public ShellCommandExecutor(Consumer> logHandler, String taskDir, String taskAppId, @@ -55,16 +69,31 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { return String.format("%s/%s.command", taskDir, taskAppId); } + /** + * get command type + * @return command type + */ @Override protected String commandType() { return SH; } + /** + * check find yarn application id + * @param line line + * @return true if line contains task app id + */ @Override protected boolean checkFindApp(String line) { return line.contains(taskAppId); } + /** + * create command file if not exists + * @param execCommand exec command + * @param commandFile command file + * @throws IOException io exception + */ @Override protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { logger.info("tenantCode user:{}, task dir:{}", tenantCode, taskAppId); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index e308a906a6..ce27e1974c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -31,18 +31,18 @@ import org.apache.commons.lang3.EnumUtils; import org.slf4j.Logger; /** - * task manaster + * task manaster */ public class TaskManager { /** - * create new task - * @param taskType - * @param props - * @param logger - * @return - * @throws IllegalArgumentException + * create new task + * @param taskType task type + * @param props props + * @param logger logger + * @return AbstractTask + * @throws IllegalArgumentException illegal argument exception */ public static AbstractTask newTask(String taskType, TaskProps props, Logger logger) throws IllegalArgumentException { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java index 340f35d0be..8e5644ed9c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java @@ -28,7 +28,7 @@ import java.util.Iterator; import java.util.Map; /** - * task props + * task props */ public class TaskProps { @@ -38,44 +38,47 @@ public class TaskProps { private String nodeName; /** - * task instance id + * task instance id **/ private int taskInstId; /** - * tenant code , execute task linux user + * tenant code , execute task linux user **/ private String tenantCode; + /** + * task type + */ private String taskType; /** - * task parameters + * task parameters **/ private String taskParams; /** - * task dir + * task dir **/ private String taskDir; /** - * queue + * queue **/ private String queue; /** - * env file + * env file **/ private String envFile; /** - * defined params + * defined params **/ private Map definedParams; /** - * task path + * task path */ private String taskAppId; @@ -85,12 +88,12 @@ public class TaskProps { private Date taskStartTime; /** - * task timeout + * task timeout */ private int taskTimeout; /** - * task timeout strategy + * task timeout strategy */ private TaskTimeoutStrategy taskTimeoutStrategy; /** @@ -100,7 +103,6 @@ public class TaskProps { /** * schedule time - * @return */ private Date scheduleTime; @@ -109,8 +111,27 @@ public class TaskProps { */ private CommandType cmdTypeIfComplement; - + /** + * constructor + */ public TaskProps(){} + + /** + * constructor + * @param taskParams task params + * @param taskDir task dir + * @param scheduleTime schedule time + * @param nodeName node name + * @param taskType task type + * @param taskInstId task instance id + * @param envFile env file + * @param tenantCode tenant code + * @param queue queue + * @param taskStartTime task start time + * @param definedParams defined params + * @param dependence dependence + * @param cmdTypeIfComplement cmd type if complement + */ public TaskProps(String taskParams, String taskDir, Date scheduleTime, @@ -271,8 +292,8 @@ public class TaskProps { } /** - * get parameters map - * @return + * get parameters map + * @return user defined params map */ public Map getUserDefParamsMap() { if (definedParams != null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java index 798311aba8..e7094bbfa0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java @@ -37,27 +37,50 @@ import java.util.*; */ public class DependentExecute { /** - * process dao + * process dao */ private static final ProcessDao processDao = DaoFactory.getDaoInstance(ProcessDao.class); + /** + * depend item list + */ private List dependItemList; + + /** + * dependent relation + */ private DependentRelation relation; + /** + * depend result + */ private DependResult modelDependResult = DependResult.WAITING; + + /** + * depend result map + */ private Map dependResultMap = new HashMap<>(); + /** + * logger + */ private Logger logger = LoggerFactory.getLogger(DependentExecute.class); + /** + * constructor + * @param itemList item list + * @param relation relation + */ public DependentExecute(List itemList, DependentRelation relation){ this.dependItemList = itemList; this.relation = relation; } /** - * get dependent item for one dependent item - * @param dependentItem - * @return + * get dependent item for one dependent item + * @param dependentItem dependent item + * @param currentTime current time + * @return DependResult */ public DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){ List dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue()); @@ -66,9 +89,9 @@ public class DependentExecute { /** * calculate dependent result for one dependent item. - * @param dependentItem - * @param dateIntervals - * @return + * @param dependentItem dependent item + * @param dateIntervals date intervals + * @return dateIntervals */ private DependResult calculateResultForTasks(DependentItem dependentItem, List dateIntervals) { @@ -112,9 +135,9 @@ public class DependentExecute { * find the last one process instance that : * 1. manual run and finish between the interval * 2. schedule run and schedule time between the interval - * @param definitionId - * @param dateInterval - * @return + * @param definitionId definition id + * @param dateInterval date interval + * @return ProcessInstance */ private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) { @@ -144,8 +167,8 @@ public class DependentExecute { /** * get dependent result by task/process instance state - * @param state - * @return + * @param state state + * @return DependResult */ private DependResult getDependResultByState(ExecutionStatus state) { @@ -160,7 +183,8 @@ public class DependentExecute { /** * judge depend item finished - * @return + * @param currentTime current time + * @return boolean */ public boolean finish(Date currentTime){ if(modelDependResult == DependResult.WAITING){ @@ -172,7 +196,8 @@ public class DependentExecute { /** * get model depend result - * @return + * @param currentTime current time + * @return DependResult */ public DependResult getModelDependResult(Date currentTime){ @@ -193,8 +218,9 @@ public class DependentExecute { /** * get dependent item result - * @param item - * @return + * @param item item + * @param currentTime current time + * @return DependResult */ public DependResult getDependResultForItem(DependentItem item, Date currentTime){ String key = item.getKey(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index da50828849..6aec36fbe4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -36,8 +36,14 @@ import java.util.*; import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +/** + * Dependent Task + */ public class DependentTask extends AbstractTask { + /** + * dependent task list + */ private List dependentTaskList = new ArrayList<>(); /** @@ -46,12 +52,26 @@ public class DependentTask extends AbstractTask { */ private Map dependResultMap = new HashMap<>(); + /** + * dependent parameters + */ private DependentParameters dependentParameters; + /** + * dependent date + */ private Date dependentDate; + /** + * process dao + */ private ProcessDao processDao; + /** + * constructor + * @param props props + * @param logger logger + */ public DependentTask(TaskProps props, Logger logger) { super(props, logger); } @@ -120,7 +140,7 @@ public class DependentTask extends AbstractTask { /** * get dependent result - * @return + * @return DependResult */ private DependResult getTaskDependResult(){ List dependResultList = new ArrayList<>(); @@ -136,7 +156,7 @@ public class DependentTask extends AbstractTask { /** * judge all dependent tasks finish - * @return + * @return whether all dependent tasks finish */ private boolean allDependentTaskFinish(){ boolean finish = true; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index de50c52ed6..58ca74a204 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Map; /** - * flink task + * flink task */ public class FlinkTask extends AbstractYarnTask { @@ -90,8 +90,8 @@ public class FlinkTask extends AbstractYarnTask { } /** - * create command - * @return + * create command + * @return command */ @Override protected String buildCommand() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index 77467d7c71..9f5e3a8dc6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -60,6 +60,9 @@ import java.util.Map; */ public class HttpTask extends AbstractTask { + /** + * http parameters + */ private HttpParameters httpParameters; /** @@ -72,11 +75,21 @@ public class HttpTask extends AbstractTask { */ protected static final int MAX_CONNECTION_MILLISECONDS = 60000; + /** + * application json + */ protected static final String APPLICATION_JSON = "application/json"; + /** + * output + */ protected String output; - + /** + * constructor + * @param props props + * @param logger logger + */ public HttpTask(TaskProps props, Logger logger) { super(props, logger); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); @@ -120,6 +133,12 @@ public class HttpTask extends AbstractTask { } } + /** + * send request + * @param client client + * @return CloseableHttpResponse + * @throws IOException io exception + */ protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { RequestBuilder builder = createRequestBuilder(); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); @@ -145,6 +164,13 @@ public class HttpTask extends AbstractTask { return client.execute(request); } + /** + * get response body + * @param httpResponse http response + * @return response body + * @throws ParseException parse exception + * @throws IOException io exception + */ protected String getResponseBody(CloseableHttpResponse httpResponse) throws ParseException, IOException { if (httpResponse == null) { return null; @@ -157,11 +183,22 @@ public class HttpTask extends AbstractTask { return webPage; } + /** + * get status code + * @param httpResponse http response + * @return status code + */ protected int getStatusCode(CloseableHttpResponse httpResponse) { int status = httpResponse.getStatusLine().getStatusCode(); return status; } + /** + * valid response + * @param body body + * @param statusCode status code + * @return exit status code + */ protected int validResponse(String body, String statusCode){ int exitStatusCode = 0; switch (httpParameters.getHttpCheckCondition()) { @@ -199,6 +236,10 @@ public class HttpTask extends AbstractTask { return output; } + /** + * append message + * @param message message + */ protected void appendMessage(String message) { if (output == null) { output = ""; @@ -208,6 +249,11 @@ public class HttpTask extends AbstractTask { } } + /** + * add request params + * @param builder buidler + * @param httpPropertyList http property list + */ protected void addRequestParams(RequestBuilder builder,List httpPropertyList) { if(httpPropertyList != null && httpPropertyList.size() > 0){ JSONObject jsonParam = new JSONObject(); @@ -227,6 +273,11 @@ public class HttpTask extends AbstractTask { } } + /** + * set headers + * @param request request + * @param httpPropertyList http property list + */ protected void setHeaders(HttpUriRequest request,List httpPropertyList) { if(httpPropertyList != null && httpPropertyList.size() > 0){ for (HttpProperty property: httpPropertyList){ @@ -239,6 +290,10 @@ public class HttpTask extends AbstractTask { } } + /** + * create http client + * @return CloseableHttpClient + */ protected CloseableHttpClient createHttpClient() { final RequestConfig requestConfig = requestConfig(); HttpClientBuilder httpClientBuilder; @@ -246,10 +301,18 @@ public class HttpTask extends AbstractTask { return httpClientBuilder.build(); } + /** + * request config + * @return RequestConfig + */ private RequestConfig requestConfig() { return RequestConfig.custom().setSocketTimeout(MAX_CONNECTION_MILLISECONDS).setConnectTimeout(MAX_CONNECTION_MILLISECONDS).build(); } + /** + * create request builder + * @return RequestBuilder + */ protected RequestBuilder createRequestBuilder() { if (httpParameters.getHttpMethod().equals(HttpMethod.GET)) { return RequestBuilder.get(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index ec61643523..4da3d2c656 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -34,7 +34,7 @@ import java.util.List; import java.util.Map; /** - * mapreduce task + * mapreduce task */ public class MapReduceTask extends AbstractYarnTask { @@ -45,8 +45,9 @@ public class MapReduceTask extends AbstractYarnTask { private MapreduceParameters mapreduceParameters; /** - * @param props - * @param logger + * constructor + * @param props task props + * @param logger logger */ public MapReduceTask(TaskProps props, Logger logger) { super(props, logger); @@ -82,6 +83,11 @@ public class MapReduceTask extends AbstractYarnTask { } } + /** + * build command + * @return command + * @throws Exception exception + */ @Override protected String buildCommand() throws Exception { List parameterList = buildParameters(mapreduceParameters); @@ -98,7 +104,11 @@ public class MapReduceTask extends AbstractYarnTask { return mapreduceParameters; } - + /** + * build parameters + * @param mapreduceParameters mapreduce parameters + * @return parameter list + */ private List buildParameters(MapreduceParameters mapreduceParameters){ List result = new ArrayList<>(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index e31cb009ea..b314f6adc0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -65,6 +65,11 @@ public class ProcedureTask extends AbstractTask { */ private BaseDataSource baseDataSource; + /** + * constructor + * @param taskProps task props + * @param logger logger + */ public ProcedureTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); @@ -276,11 +281,11 @@ public class ProcedureTask extends AbstractTask { /** * set out parameter - * @param index - * @param stmt - * @param dataType - * @param value - * @throws Exception + * @param index index + * @param stmt stmt + * @param dataType dataType + * @param value value + * @throws Exception exception */ private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{ if (dataType.equals(VARCHAR)){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 8a9903b09f..f6e0045bc3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -58,7 +58,11 @@ public class PythonTask extends AbstractTask { */ private ProcessDao processDao; - + /** + * constructor + * @param taskProps task props + * @param logger logger + */ public PythonTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); @@ -105,9 +109,9 @@ public class PythonTask extends AbstractTask { } /** - * build command - * @return - * @throws Exception + * build command + * @return raw python script + * @throws Exception exception */ private String buildCommand() throws Exception { String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index a7264d5977..3fc32bf5e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -42,10 +42,13 @@ import java.util.Map; import java.util.Set; /** - * shell task + * shell task */ public class ShellTask extends AbstractTask { + /** + * shell parameters + */ private ShellParameters shellParameters; /** @@ -53,6 +56,9 @@ public class ShellTask extends AbstractTask { */ private String taskDir; + /** + * shell command executor + */ private ShellCommandExecutor shellCommandExecutor; /** @@ -60,7 +66,11 @@ public class ShellTask extends AbstractTask { */ private ProcessDao processDao; - + /** + * constructor + * @param taskProps task props + * @param logger logger + */ public ShellTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); @@ -106,9 +116,9 @@ public class ShellTask extends AbstractTask { } /** - * create command - * @return - * @throws Exception + * create command + * @return file name + * @throws Exception exception */ private String buildCommand() throws Exception { // generate scripts diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 2ee42160fc..1fd54785d1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -33,12 +33,12 @@ import java.util.List; import java.util.Map; /** - * spark task + * spark task */ public class SparkTask extends AbstractYarnTask { /** - * spark command + * spark command */ private static final String SPARK_COMMAND = "spark-submit"; @@ -82,8 +82,8 @@ public class SparkTask extends AbstractYarnTask { } /** - * create command - * @return + * create command + * @return command */ @Override protected String buildCommand() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 5cbace25f6..80c8753892 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -58,7 +58,7 @@ import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.enums.DbType.HIVE; /** - * sql task + * sql task */ public class SqlTask extends AbstractTask { @@ -234,12 +234,12 @@ public class SqlTask extends AbstractTask { } /** - * execute sql - * @param mainSqlBinds - * @param preStatementsBinds - * @param postStatementsBinds - * @param createFuncs - * @return + * execute function and sql + * @param mainSqlBinds main sql binds + * @param preStatementsBinds pre statements binds + * @param postStatementsBinds post statements binds + * @param createFuncs create functions + * @return Connection */ public Connection executeFuncAndSql(SqlBinds mainSqlBinds, List preStatementsBinds, @@ -366,9 +366,9 @@ public class SqlTask extends AbstractTask { } /** - * send mail as an attachment - * @param title - * @param content + * send mail as an attachment + * @param title title + * @param content content */ public void sendAttachment(String title,String content){ @@ -416,9 +416,11 @@ public class SqlTask extends AbstractTask { } /** - * regular expressions match the contents between two specified strings - * @param content - * @return + * regular expressions match the contents between two specified strings + * @param content content + * @param rgex rgex + * @param sqlParamsMap sql params map + * @param paramsPropsMap params props map */ public void setSqlParamsMap(String content, String rgex, Map sqlParamsMap, Map paramsPropsMap){ Pattern pattern = Pattern.compile(rgex); @@ -435,11 +437,11 @@ public class SqlTask extends AbstractTask { } /** - * print replace sql - * @param content - * @param formatSql - * @param rgex - * @param sqlParamsMap + * print replace sql + * @param content content + * @param formatSql format sql + * @param rgex rgex + * @param sqlParamsMap sql params map */ public void printReplacedSql(String content, String formatSql,String rgex, Map sqlParamsMap){ //parameter print style diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 4dde369d8d..253e5502d0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -49,8 +49,14 @@ import java.util.concurrent.ThreadFactory; */ public class ZKMasterClient extends AbstractZKClient { + /** + * logger + */ private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); + /** + * thread factory + */ private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Master-Main-Thread"); /** @@ -72,23 +78,36 @@ public class ZKMasterClient extends AbstractZKClient { */ private static ZKMasterClient zkMasterClient = null; - + /** + * master path children cache + */ private PathChildrenCache masterPathChildrenCache; + /** + * worker path children cache + */ private PathChildrenCache workerPathChildrenCache; - + /** + * constructor + * + * @param processDao process dao + */ private ZKMasterClient(ProcessDao processDao){ this.processDao = processDao; init(); } + /** + * default constructor + */ private ZKMasterClient(){} /** - * get zkMasterClient - * @param processDao - * @return + * get zkMasterClient + * + * @param processDao process dao + * @return ZKMasterClient zookeeper master client */ public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao){ if(zkMasterClient == null){ @@ -100,7 +119,7 @@ public class ZKMasterClient extends AbstractZKClient { } /** - * init + * init */ public void init(){ // init dao @@ -163,7 +182,8 @@ public class ZKMasterClient extends AbstractZKClient { } /** * get alert dao - * @return + * + * @return AlertDao */ public AlertDao getAlertDao() { return alertDao; @@ -226,6 +246,13 @@ public class ZKMasterClient extends AbstractZKClient { } } + /** + * remove zookeeper node path + * + * @param path zookeeper node path + * @param zkNodeType zookeeper node type + * @param failover is failover + */ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { logger.info("{} node deleted : {}", zkNodeType.toString(), path); InterProcessMutex mutex = null; @@ -253,6 +280,13 @@ public class ZKMasterClient extends AbstractZKClient { } } + /** + * failover server when server down + * + * @param serverHost server host + * @param zkNodeType zookeeper node type + * @throws Exception exception + */ private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { if(StringUtils.isEmpty(serverHost)){ return ; @@ -268,6 +302,12 @@ public class ZKMasterClient extends AbstractZKClient { } } + /** + * get failover lock path + * + * @param zkNodeType zookeeper node type + * @return fail over lock path + */ private String getFailoverLockPath(ZKNodeType zkNodeType){ switch (zkNodeType){ @@ -280,6 +320,12 @@ public class ZKMasterClient extends AbstractZKClient { } } + /** + * send alert when server down + * + * @param serverHost server host + * @param zkNodeType zookeeper node type + */ private void alertServerDown(String serverHost, ZKNodeType zkNodeType) { String serverType = zkNodeType.toString(); @@ -289,7 +335,7 @@ public class ZKMasterClient extends AbstractZKClient { } /** - * monitor worker + * monitor worker */ public void listenerWorker(){ workerPathChildrenCache = new PathChildrenCache(zkClient, @@ -320,8 +366,9 @@ public class ZKMasterClient extends AbstractZKClient { /** - * get master znode - * @return + * get master znode + * + * @return master zookeeper node */ public String getMasterZNode() { return masterZNode; @@ -330,8 +377,8 @@ public class ZKMasterClient extends AbstractZKClient { /** * task needs failover if task start before worker starts * - * @param taskInstance - * @return + * @param taskInstance task instance + * @return true if task instance need fail over */ private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { @@ -354,15 +401,16 @@ public class ZKMasterClient extends AbstractZKClient { /** * check task start after the worker server starts. - * @param taskInstance - * @return + * + * @param taskInstance task instance + * @return true if task instance start time after worker server start date */ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { if(StringUtils.isEmpty(taskInstance.getHost())){ return false; } Date workerServerStartDate = null; - List workerServers= getServersList(ZKNodeType.WORKER); + List workerServers = getServersList(ZKNodeType.WORKER); for(Server workerServer : workerServers){ if(workerServer.getHost().equals(taskInstance.getHost())){ workerServerStartDate = workerServer.getCreateTime(); @@ -379,10 +427,22 @@ public class ZKMasterClient extends AbstractZKClient { /** * failover worker tasks + * * 1. kill yarn job if there are yarn jobs in tasks. * 2. change task state from running to need failover. * 3. failover all tasks when workerHost is null - * @param workerHost + * @param workerHost worker host + */ + + /** + * failover worker tasks + * + * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * @param workerHost worker host + * @param needCheckWorkerAlive need check worker alive + * @throws Exception exception */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { logger.info("start worker[{}] failover ...", workerHost); @@ -410,7 +470,8 @@ public class ZKMasterClient extends AbstractZKClient { /** * failover master tasks - * @param masterHost + * + * @param masterHost master host */ private void failoverMaster(String masterHost) { logger.info("start master failover ..."); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java index d161d33783..31dc1dab42 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java @@ -37,23 +37,31 @@ import java.util.concurrent.ThreadFactory; */ public class ZKWorkerClient extends AbstractZKClient { + /** + * logger + */ private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class); - + /** + * thread factory + */ private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Worker-Main-Thread"); /** - * worker znode + * worker znode */ private String workerZNode = null; /** - * zkWorkerClient + * zookeeper worker client */ private static ZKWorkerClient zkWorkerClient = null; + /** + * worker path children cache + */ private PathChildrenCache workerPathChildrenCache; private ZKWorkerClient(){ @@ -61,7 +69,7 @@ public class ZKWorkerClient extends AbstractZKClient { } /** - * init + * init */ private void init(){ @@ -88,9 +96,9 @@ public class ZKWorkerClient extends AbstractZKClient { /** - * get zkWorkerClient + * get zookeeper worker client * - * @return + * @return ZKWorkerClient */ public static synchronized ZKWorkerClient getZKWorkerClient(){ if(zkWorkerClient == null){ @@ -153,15 +161,15 @@ public class ZKWorkerClient extends AbstractZKClient { /** * get worker znode - * @return + * @return worker zookeeper node */ public String getWorkerZNode() { return workerZNode; } /** - * get worker lock path - * @return + * get worker lock path + * @return worker lock path */ public String getWorkerLockPath(){ return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS);